天天看點

RocketMq重試及消息不丢失機制

1、消息重試機制

由于MQ經常處于複雜的分布式系統中,考慮網絡波動、服務當機、程式異常因素,很有可能出現消息發送或者消費失敗的問題。是以,消息的重試就是所有MQ中間件必須考慮到的一個關鍵點。如果沒有消息重試,就可能産生消息丢失的問題,可能對系統産生很大的影響。是以,秉承甯可多發消息,也不可丢失消息的原則,大部分MQ都對消息重試提供了很好的支援。

RocketMQ為使用者封裝了消息重試的處理流程,無需開發人員手動處理。RocketMQ支援了生産端和消費端兩類重試機制。

1.1 生産端重試

生産端配置的有發送失敗重試次數,預設為2。使用了set方法對外進行暴露,producer用戶端可以改寫這個預設值。

public DefaultMQProducer(String producerGroup, RPCHook rpcHook) {
		this.createTopicKey = "TBW102";

		this.defaultTopicQueueNums = 4;

		this.sendMsgTimeout = 3000;

		this.compressMsgBodyOverHowmuch = 4096;
		//發送失敗,重試次數
		this.retryTimesWhenSendFailed = 2;

		this.retryAnotherBrokerWhenNotStoreOK = false;

		this.maxMessageSize = 131072;

		this.unitMode = false;

		this.producerGroup = producerGroup;
		this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
	}
           

1.2 消費端重試

消費者消費消息後,需要給Broker傳回消費狀态。以MessageListenerConcurrently監聽器為例,Consumer消費完成後需要傳回ConsumeConcurrentlyStatus并發消費狀态。檢視源碼,ConsumeConcurrentlyStatus是一個枚舉,共有兩種狀态:

public enum ConsumeConcurrentlyStatus {
   //消費成功
   ConsumeConcurrentlyStatus,

   //消費失敗,一段時間後重試
   RECONSUME_LATER;
}
           

RECONSUME_LATER代表因為某種原因,消費失敗,稍後再試。後續會再次消費

官方文檔介紹如下:

RocketMq重試及消息不丢失機制

RocketMQ中的消息無法無限次重新消費,當然了,手動修改重試次數是可以的,不介入的話不行。當重試次數超過所有延遲級别之後。消息會進入死信,死信Topic的命名為:%DLQ% + Consumer組名。

進入死信之後的消息肯定不會再投遞了,不過可以通過接口去查詢目前RocketMQ中死信隊列的消息。如果在上層實作自有指令,那麼可以将消息從死信中移出并重新投遞。

死信消息具有以下特性:

  • 不會再被消費者正常消費。
  • 有效期與正常消息相同,均為 3 天,3 天後會被自動删除。是以,請在死信消息産生後的 3 天内及時處理。

2、保證消息不丢失

分别從Producer發送機制、Broker的持久化機制,以及消費者的offSet機制來最大程度保證消息不易丢失

  • 從Producer的視角來看:如果消息未能正确的存儲在MQ中,或者消費者未能正确的消費到這條消息,都是消息丢失。
  • 從Broker的視角來看:如果消息已經存在Broker裡面了,如何保證不會丢失呢(當機、磁盤崩潰)
  • 從Consumer的視角來看:如果消息已經完成持久化了,但是Consumer取了,但是未消費成功且沒有回報,就是消息丢失

從Producer分析:如何確定消息正确的發送到了Broker?

  • 預設情況下,可以通過同步的方式阻塞式的發送,check SendStatus,狀态是OK,表示消息一定成功的投遞到了Broker,狀态逾時或者失敗,則會觸發預設的2次重試。此方法的發送結果,可能Broker存儲成功了,也可能沒成功
  • 采取事務消息的投遞方式,并不能保證消息100%投遞成功到了Broker,但是如果消息發送Ack失敗的話,此消息會存儲在CommitLog當中,但是對ConsumerQueue是不可見的。可以在日志中檢視到這條異常的消息,嚴格意義上來講,也并沒有完全丢失
  • RocketMQ支援 日志的索引,如果一條消息發送之後逾時,也可以通過查詢日志的API,來check是否在Broker存儲成功

從Broker分析:如果確定接收到的消息不會丢失?

  • 消息支援持久化到Commitlog裡面,即使當機後重新開機,未消費的消息也是可以加載出來的
  • Broker自身支援同步刷盤、異步刷盤的政策,可以保證接收到的消息一定存儲在本地的記憶體中
  • Broker叢集支援 1主N從的政策,支援同步複制和異步複制的方式,同步複制可以保證即使Master 磁盤崩潰,消息仍然不會丢失

從Cunmser分析:如何確定拉取到的消息被成功消費?

  • 消費者可以根據自身的政策批量Pull消息
  • Consumer自身維護一個持久化的offset(對應MessageQueue裡面的min offset),标記已經成功消費或者已經成功發回到broker的消息下标
  • 如果Consumer消費失敗,那麼它會把這個消息發回給Broker,發回成功後,再更新自己的offset
  • 如果Consumer消費失敗,發回給broker時,broker挂掉了,那麼Consumer會定時重試這個操作
  • 如果Consumer和broker一起挂了,消息也不會丢失,因為consumer 裡面的offset是定時持久化的,重新開機之後,繼續拉取offset之前的消息到本地

繼續閱讀