引言
前面我們已經簡單地介紹了 RocketMQ 的整體設計思路,本文着重其中HA機制部分的實作細節,更多關于 RocketMQ 的文章均收錄于
<RocketMQ系列文章>;
HA機制
為了提高消息消費的高可用性,避免 Broker 發生單點故障引起存儲在 Broker 上的消息無法及時消費,RocketMQ 引入了 Broker 主備機制,即消息消費到達主伺服器後需要将消息同步到消息從伺服器,如果主伺服器 Broker 當機後,消息消費者可以從從伺服器拉取消息。
工作機制
RocketMQ HA 的實作原理如下。
- 主伺服器啟動,并在特定端口上監聽從伺服器的連接配接
- 從伺服器主動連接配接主伺服器,主伺服器接收用戶端的連接配接,并建立相關 TCP 連接配接
- 從伺服器主動向主伺服器發送待拉取消息偏移量,主伺服器解析請求并傳回消息給從伺服器
- 從伺服器儲存消息并繼續發送新 的消息同步請求
如果是同步主從模式,消息發送者将消息刷寫到磁盤後,需要繼續等待新資料被傳輸到從伺服器,而從伺服器資料的複制是在另外一個線程中去拉取的,是以消息發送者在這裡需要等待資料傳輸的結果,RocketMQ 有一個 GroupTransferService,它的職責是負責當主從同步複制結束後通知由于等待 HA 同步結果而阻塞的消息發送者線程。
判斷主從同步是否完成的依據是 Slave 中已成功複制的最大偏移量是否大于等于消息生産者發送消息後消息服務端傳回下一條消息的起始偏移量,如果是則表示主從同步複制已經完成,喚醒消息發送線程,否則等待 1s 再次判斷,每一個任務在一批任務中循環判斷 5 次。
RocketMQ HA 主要互動流程如下圖所示。

讀寫分離
RocketMQ 根據 MessageQueue查找 Broker位址的唯一依據是 brokerName,從 RocketMQ 的 Broker 組織結構中得知同一組 Broker (M-S)伺服器,它們的 brokerName 相同但 brokerId 不同,主伺服器的 brokerId 為 0,從伺服器的 brokerId 大于 0。
在前面介紹消息拉取的時候,提過 Broker 在傳回拉取内容的同時還會傳回下一次是否要從 Slave 拉取資料,消費者收到該建議後,會找到合适的 Broker 節點進行拉取。那麼 Broker 是通過哪種政策來建議的呢?
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
上述就是建議政策,下面進行解讀:
- maxOffsetPy: 代表目前主伺服器消息存儲檔案最大偏移量。
- maxPhyOffsetPulling: 此次拉取消息最大偏移量。
- diff:對于PullMessageService線程來說,目前未被拉取到消息消費端的消息長度。
- TOTAL_PHYSICAL_MEMORY_SIZE: RocketMQ 所在伺服器總記憶體大小。
- AccessMessageInMemoryMaxRatio: 表示 RocketMQ 所能使用的最大記憶體比例,超過該記憶體,消息将被置換出記憶體
- memory: 表示 RocketMQ 消息常駐記憶體的大小,超過該大小, RocketMQ 會将舊的消息置換回磁盤
- 如果 diff 大于 memory,表示目前需要拉取的消息已經超出了常駐記憶體的大小,表示主伺服器繁忙,此時才建議從 Slave 伺服器拉取
如果主伺服器繁忙則建議下一次從從伺服器拉取消息,下次預設從标号為 1 的從節點拉取消息。如果一個 Master 擁有多台 Slave 伺服器,參與消息拉取負載的從伺服器隻會是其中一個。
文章說明
更多有價值的文章均收錄于
貝貝貓的文章目錄版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!
創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。
參考内容
[1]《RocketMQ技術内幕》
[2]《RocketMQ實戰與原了解析》
[3]
老生常談——利用消息隊列處理分布式事務[4]
RocketMQ架構解析[5]
MappedByteBuffer VS FileChannel 孰強孰弱?[6]
檔案 IO 操作的一些最佳實踐[7]
海量資料處理之Bloom Filter詳解[8]
rocketmq GitHub Wiki