天天看点

原来RocketMQ的重试机制是这样

作者:宁静知行者

消费端在消费数据时,可以通过返回消息结果,通知服务端当前的消息是否消费成功,如果消费不成功,则可以返回RECONSUME_LATER。则消费端会将该消息重新加入重试队列。

源码分析

消费端在调用 MessageListener后,会进入到 ConsumeMessageConcurrentlyService 的 processConsumeResult 方法

原来RocketMQ的重试机制是这样
  • 第一步标识消费ackIndex = -1,说明从当前的哪个消息开始要重试
  • 第二步会重新调用 sendMessageBack 方法进行消息发送

该方法最终是发送 CONSUMER_SEND_MSG_BACK,同时设置最大的重试次数为16。

接着看下服务端是如何处理这个消息的?

服务端最终会调用SendMessageProcessor 中的 asyncConsumerSendMsgBack

原来RocketMQ的重试机制是这样
原来RocketMQ的重试机制是这样

从上面的代码知道,当接收这个命令进来的消息,服务器会创建一个 %RETRY%+consumeGroup 的新的topic,如果重试次数已经达到16次,进行将该消息加入到以 %DLQ%+consumeGroup的死信topic队列,而重试与死信队列,默认都是1个队列。

如果未超过16次,则会设置消息的delayTimeLevel 字段,从 RocketMQ消息类型中的延时消息中可以知道,设置了该字段,该消息就会以延时消息来处理了,每重试一次,就会往后推迟时间,具体可以参阅RocketMQ源码分析下延时消息的实现。

那既然有创建一个新的topic,消费端又是如何消费的呢?

原来RocketMQ的重试机制是这样

消息端在启动时,会调用方法copySubscription

原来RocketMQ的重试机制是这样
  • 如果调用第二步失败,则会将该消息重新加入本地的消费线程池重新进行处理数据

继续阅读