消费端在消费数据时,可以通过返回消息结果,通知服务端当前的消息是否消费成功,如果消费不成功,则可以返回RECONSUME_LATER。则消费端会将该消息重新加入重试队列。
源码分析
消费端在调用 MessageListener后,会进入到 ConsumeMessageConcurrentlyService 的 processConsumeResult 方法
- 第一步标识消费ackIndex = -1,说明从当前的哪个消息开始要重试
- 第二步会重新调用 sendMessageBack 方法进行消息发送
该方法最终是发送 CONSUMER_SEND_MSG_BACK,同时设置最大的重试次数为16。
接着看下服务端是如何处理这个消息的?
服务端最终会调用SendMessageProcessor 中的 asyncConsumerSendMsgBack
从上面的代码知道,当接收这个命令进来的消息,服务器会创建一个 %RETRY%+consumeGroup 的新的topic,如果重试次数已经达到16次,进行将该消息加入到以 %DLQ%+consumeGroup的死信topic队列,而重试与死信队列,默认都是1个队列。
如果未超过16次,则会设置消息的delayTimeLevel 字段,从 RocketMQ消息类型中的延时消息中可以知道,设置了该字段,该消息就会以延时消息来处理了,每重试一次,就会往后推迟时间,具体可以参阅RocketMQ源码分析下延时消息的实现。
那既然有创建一个新的topic,消费端又是如何消费的呢?
消息端在启动时,会调用方法copySubscription
- 如果调用第二步失败,则会将该消息重新加入本地的消费线程池重新进行处理数据