消息队列消费场景下,经常会发生预期外的异常,比如:消息处理时业务报错(比如数据格式异常或下游服务短暂不可用)、业务处理消息耗时超过ack最大等待时间等。为应对这些场景,Pulsar 提供了消息重试和死信机制,通过消费者客户端不同的配置,在处理消息出现异常时,可以实现有限重试和无限重试两种效果。
有限重试
有限重试是利用 Pulsar 的重试队列和死信队列机制,保证业务的最终一致性。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。当消息进入到死信队列中,一般这时就需要人为介入来处理这批消息。可以通过编写专门的客户端来订阅死信 Topic,处理这批之前处理失败的消息。
实现原理
客户端处理消息失败后,调用consumer.reconsumeLater接口开始走重试策略。首先,客户端检查消息对应的重试次数,如果达到指定的最大重试次数,消息被投递到死信队列(投递到死信队列的消息不会自动消费,如果需要,用户自己创建额外的消费者进行消费);如果没有达到最大重试次数,消费被投递到重试队列。重试间隔是通过延迟消息实现的,投递到重试队列的实际上是一个延迟消息,延迟时间就是用户在reconsumeLater中指定的时间。
使用重试队列实现自动重试的关键点总结
发送到 Retry Topic:消息被发送到 Retry Topic,并设置 deliverAfter(delayTime, unit) 延迟投递
自动 ACK:发送成功后,通过 doAcknowledge() 自动确认原始消息
原消息状态:原始 Topic 中的消息变为已确认(Acknowledged)状态
延迟重试:延迟时间到达后,消费者会从 Retry Topic 收到该消息
注意事项:当使用 Token 访问重试/死信队列时,需要为消费者所使用角色赋予生产消息权限。
代码示例
自动重试的代码示例,消费过程中出现某些异常,进入重试 Topic 重试,最后进入死信Topic中。
注:如果消费的消息 ack 超时,会触发重新投递(Redelivery),消息会从原 Topic 重新发送给消费者
消费者参数设置
注意要重试队列和死信队列的topic需要在hulk云平台创建好,并给token配置读写权限。
PulsarClient pulsarClient = Constant.getPulsarClient(); Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic("persistent://my-property/my-ns/test_retry_p2") .subscriptionName("sub1") .subscriptionType(SubscriptionType.Key_Shared) .enableRetry(true)//开启重试消费 .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度,默认 1 秒 .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(3)//可以指定最大重试次数 .retryLetterTopic("persistent://my-property/my-ns/sub1-retry") //指定重试队列 .deadLetterTopic("persistent://my-property/my-ns/sub1-dlq") //指定死信队列 .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); while (true) { Message<String> message = null; try { message = consumer.receive(); log.info("Received message: {}, Properties:{}", message.getValue(), message.getProperties()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info("Ack: {},{}", message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error("Consumer exception:{}", e.getMessage(), e); consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);//延迟重试 } }Message对象中有个字段 property,包含了重试相关的属性
{ REAL_TOPIC=persistent://my-property/my-ns/real-topic, #原 Topic REAL_SUBSCRIPTION=sub_topic1, ORIGIN_MESSAGE_ID=8143097:5:0, #最初生产的消息 ID ORIGIN_MESSAGE_IDY_TIME=8143097:5:0, DELAY_TIME=1000, RECONSUMETIMES=2 #消息重试的次数 }可以使用属性中的重试的次数,实现指数级回退重试。
无限重试
无限重试是指客户端在处理消息失败后,主动发一条否定应答,让服务端重新推送。如果一直发送否定应答,服务端会一直重推,因此实现无限重试的效果。仅需2步即可开启无限重试
1、初始化consumer时,指定重试间隔(negativeAckRedeliveryDelay,默认1min)
2、捕获业务异常,对处理失败的消息,发送否定应答(consumer.negativeAcknowledge)
以下为主动重试的代码示例:
Consumer<byte[]> consumer = client.newConsumer() .topic("persistent://my-property/my-ns/real-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Key_Shared) .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度,默认 1 秒 .negativeAckRedeliveryDelay(1, TimeUnit.MINUTES) // 默认1min .subscribe(); while (true) { Message<String> message = null; try { message = consumer.receive(); log.info("Received message: {}, Redelivery count:{}", message.getValue(), message.getRedeliveryCount()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info("ack: {},{}", message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error("consumer exception", e); //否定应答 consumer.negativeAcknowledge(message); } }值得注意的是,当消费者 unack 的消息过多时,Broker 会停止向消费者发送增量消息,而是一直推送 unack 的消息。直到 unack 的消息消息数量低于阈值,才会继续推送新消息。默认单个消费者unack消息数量上限是5万,订阅的 unack 消息数量上限是 20万
参考文档
[1]. Pulsar 消费者客户端官方文档 https://pulsar.apache.org/docs/4.0.x/client-libraries-consumers/