news 2026/4/23 12:44:14

Pulsar 消息重试与死信机制

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Pulsar 消息重试与死信机制

消息队列消费场景下,经常会发生预期外的异常,比如:消息处理时业务报错(比如数据格式异常或下游服务短暂不可用)、业务处理消息耗时超过ack最大等待时间等。为应对这些场景,Pulsar 提供了消息重试和死信机制,通过消费者客户端不同的配置,在处理消息出现异常时,可以实现有限重试和无限重试两种效果。

有限重试

有限重试是利用 Pulsar 的重试队列和死信队列机制,保证业务的最终一致性。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。当消息进入到死信队列中,一般这时就需要人为介入来处理这批消息。可以通过编写专门的客户端来订阅死信 Topic,处理这批之前处理失败的消息。

实现原理

客户端处理消息失败后,调用consumer.reconsumeLater接口开始走重试策略。首先,客户端检查消息对应的重试次数,如果达到指定的最大重试次数,消息被投递到死信队列(投递到死信队列的消息不会自动消费,如果需要,用户自己创建额外的消费者进行消费);如果没有达到最大重试次数,消费被投递到重试队列。重试间隔是通过延迟消息实现的,投递到重试队列的实际上是一个延迟消息,延迟时间就是用户在reconsumeLater中指定的时间。

使用重试队列实现自动重试的关键点总结

  1. 发送到 Retry Topic:消息被发送到 Retry Topic,并设置 deliverAfter(delayTime, unit) 延迟投递

  2. 自动 ACK:发送成功后,通过 doAcknowledge() 自动确认原始消息

  3. 原消息状态:原始 Topic 中的消息变为已确认(Acknowledged)状态

  4. 延迟重试:延迟时间到达后,消费者会从 Retry Topic 收到该消息

注意事项:当使用 Token 访问重试/死信队列时,需要为消费者所使用角色赋予生产消息权限。

代码示例

自动重试的代码示例,消费过程中出现某些异常,进入重试 Topic 重试,最后进入死信Topic中。

注:如果消费的消息 ack 超时,会触发重新投递(Redelivery),消息会从原 Topic 重新发送给消费者

消费者参数设置

注意要重试队列和死信队列的topic需要在hulk云平台创建好,并给token配置读写权限。

PulsarClient pulsarClient = Constant.getPulsarClient(); Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic("persistent://my-property/my-ns/test_retry_p2") .subscriptionName("sub1") .subscriptionType(SubscriptionType.Key_Shared) .enableRetry(true)//开启重试消费 .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度,默认 1 秒 .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(3)//可以指定最大重试次数 .retryLetterTopic("persistent://my-property/my-ns/sub1-retry") //指定重试队列 .deadLetterTopic("persistent://my-property/my-ns/sub1-dlq") //指定死信队列 .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); while (true) { Message<String> message = null; try { message = consumer.receive(); log.info("Received message: {}, Properties:{}", message.getValue(), message.getProperties()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info("Ack: {},{}", message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error("Consumer exception:{}", e.getMessage(), e); consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);//延迟重试 } }

Message对象中有个字段 property,包含了重试相关的属性

{ REAL_TOPIC=persistent://my-property/my-ns/real-topic, #原 Topic REAL_SUBSCRIPTION=sub_topic1, ORIGIN_MESSAGE_ID=8143097:5:0, #最初生产的消息 ID ORIGIN_MESSAGE_IDY_TIME=8143097:5:0, DELAY_TIME=1000, RECONSUMETIMES=2 #消息重试的次数 }

可以使用属性中的重试的次数,实现指数级回退重试。

无限重试

无限重试是指客户端在处理消息失败后,主动发一条否定应答,让服务端重新推送。如果一直发送否定应答,服务端会一直重推,因此实现无限重试的效果。仅需2步即可开启无限重试

1、初始化consumer时,指定重试间隔(negativeAckRedeliveryDelay,默认1min)

2、捕获业务异常,对处理失败的消息,发送否定应答(consumer.negativeAcknowledge)

以下为主动重试的代码示例:

Consumer<byte[]> consumer = client.newConsumer() .topic("persistent://my-property/my-ns/real-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Key_Shared) .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度,默认 1 秒 .negativeAckRedeliveryDelay(1, TimeUnit.MINUTES) // 默认1min .subscribe(); ​while (true) { Message<String> message = null; try { message = consumer.receive(); log.info("Received message: {}, Redelivery count:{}", message.getValue(), message.getRedeliveryCount()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info("ack: {},{}", message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error("consumer exception", e); //否定应答 consumer.negativeAcknowledge(message); } }

值得注意的是,当消费者 unack 的消息过多时,Broker 会停止向消费者发送增量消息,而是一直推送 unack 的消息。直到 unack 的消息消息数量低于阈值,才会继续推送新消息。默认单个消费者unack消息数量上限是5万,订阅的 unack 消息数量上限是 20万

参考文档

[1]. Pulsar 消费者客户端官方文档 https://pulsar.apache.org/docs/4.0.x/client-libraries-consumers/

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 11:56:53

2024提示工程架构师认证指南:Agentic AI方向的3大权威证书与备考攻略

2024提示工程架构师认证指南&#xff1a;Agentic AI方向的3大权威证书与备考攻略关键词&#xff1a;提示工程、Agentic AI、智能代理、大语言模型、工具调用、认证攻略、LLM应用 摘要&#xff1a;当大语言模型&#xff08;LLM&#xff09;从“对话助手”进化为“自主行动体”&a…

作者头像 李华
网站建设 2026/4/23 11:36:22

校平机:金属板材的“整形医生“

什么是校平机&#xff1f;校平机&#xff08;Leveling Machine&#xff09;是金属加工行业中用于消除板材内部应力、矫正弯曲和波浪形缺陷的专用设备。它通过一系列交错排列的辊轮对金属板材进行反复弯曲&#xff0c;使材料内部纤维组织均匀延伸&#xff0c;最终获得平整、无内…

作者头像 李华
网站建设 2026/4/23 9:54:54

springboot+vue开发的个人健康运动健身饮食人体血糖监测系统应用和研究

文章目录摘要关键词项目简介大数据系统开发流程主要运用技术介绍爬虫核心代码展示结论源码文档获取定制开发/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;摘要 随着现代生活节奏加快&#xff0c;慢性疾病如糖尿病发病率逐年上升&#xff0c;个人健…

作者头像 李华
网站建设 2026/4/23 7:47:17

计算机毕业设计springboot人力资源管理系统的核心设计与实现 基于Spring Boot框架的人力资源管理系统核心功能开发与实践 Spring Boot驱动下的人力资源管理系统核心模块设计与实现

计算机毕业设计springboot人力资源管理系统的核心设计与实现6332p &#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。 随着信息技术的飞速发展&#xff0c;企业对人力资源管理的数…

作者头像 李华
网站建设 2026/4/23 11:19:25

Rust代码打包为WebAssembly二进制文件详解

Cargo打包Rust代码为WebAssembly二进制文件详解 1. cargo介绍 Cargo是Rust编程语言的官方包管理器和构建工具&#xff0c;自Rust诞生起便作为其核心组件。它极大地简化了Rust项目的创建、构建、测试和发布流程&#xff0c;是Rust生态系统的基石。对于前端开发者而言&#xff…

作者头像 李华