天天看點

原來RocketMQ的重試機制是這樣

作者:甯靜知行者

消費端在消費資料時,可以通過傳回消息結果,通知服務端目前的消息是否消費成功,如果消費不成功,則可以傳回RECONSUME_LATER。則消費端會将該消息重新加入重試隊列。

源碼分析

消費端在調用 MessageListener後,會進入到 ConsumeMessageConcurrentlyService 的 processConsumeResult 方法

原來RocketMQ的重試機制是這樣
  • 第一步辨別消費ackIndex = -1,說明從目前的哪個消息開始要重試
  • 第二步會重新調用 sendMessageBack 方法進行消息發送

該方法最終是發送 CONSUMER_SEND_MSG_BACK,同時設定最大的重試次數為16。

接着看下服務端是如何處理這個消息的?

服務端最終會調用SendMessageProcessor 中的 asyncConsumerSendMsgBack

原來RocketMQ的重試機制是這樣
原來RocketMQ的重試機制是這樣

從上面的代碼知道,當接收這個指令進來的消息,伺服器會建立一個 %RETRY%+consumeGroup 的新的topic,如果重試次數已經達到16次,進行将該消息加入到以 %DLQ%+consumeGroup的死信topic隊列,而重試與死信隊列,預設都是1個隊列。

如果未超過16次,則會設定消息的delayTimeLevel 字段,從 RocketMQ消息類型中的延時消息中可以知道,設定了該字段,該消息就會以延時消息來處理了,每重試一次,就會往後推遲時間,具體可以參閱RocketMQ源碼分析下延時消息的實作。

那既然有建立一個新的topic,消費端又是如何消費的呢?

原來RocketMQ的重試機制是這樣

消息端在啟動時,會調用方法copySubscription

原來RocketMQ的重試機制是這樣
  • 如果調用第二步失敗,則會将該消息重新加入本地的消費線程池重新進行處理資料

繼續閱讀