天天看點

RocketMQ 消費者如何實作負載均衡

作者:勇哥java實戰分享

RocketMQ 支援兩種消息模式:叢集消費( Clustering )和廣播消費( Broadcasting )。

叢集消費:同一 Topic 下的一條消息隻會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者執行個體上。

RocketMQ 消費者如何實作負載均衡

廣播消費:當使用廣播消費模式時,每條消息推送給叢集内所有的消費者,保證消息至少被每個消費者消費一次。

RocketMQ 消費者如何實作負載均衡

我們重點講解下叢集消費的消費流程 ,因為叢集消費是使用最普遍的消費模式,了解了叢集消費,廣播消費也就能順理成章的掌握了。

RocketMQ 消費者如何實作負載均衡

叢集消費示例代碼裡,啟動消費者,我們需要配置三個核心屬性:消費組名、訂閱主題、消息監聽器,最後調用 start 方法啟動。

消費者啟動後,我們可以将整個流程簡化成:

RocketMQ 消費者如何實作負載均衡

4 負載均衡

消費端的負載均衡是指将 Broker 端中多個隊列按照某種算法配置設定給同一個消費組中的不同消費者。

負載均衡是每個用戶端獨立進行計算,那麼何時觸發呢 ?

RocketMQ 消費者如何實作負載均衡
  • 消費端啟動時,立即進行負載均衡;
  • 消費端定時任務每隔 20 秒觸發負載均衡;
  • 消費者上下線,Broker 端通知消費者觸發負載均衡。

負載均衡流程如下:

1、發送心跳

消費者啟動後,它就會通過定時任務不斷地向 RocketMQ 叢集中的所有 Broker 執行個體發送心跳包(消息消費分組名稱、訂閱關系集合、消息通信模式和用戶端執行個體編号等資訊)。

Broker 端在收到消費者的心跳消息後,會将它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時并将封裝後的用戶端網絡通道資訊儲存在本地緩存變量 channelInfoTable 中,為之後做消費端的負載均衡提供可以依據的中繼資料資訊。

2、啟動負載均衡服務

下圖展示了按照主題負載均衡的代碼片段:

RocketMQ 消費者如何實作負載均衡

負載均衡服務會根據消費模式為”廣播模式”還是“叢集模式”做不同的邏輯處理,這裡主要來看下叢集模式下的主要處理流程:

(1) 擷取該主題下的消息消費隊列集合;

(2) 查詢 Broker 端擷取該消費組下消費者 Id 清單;

(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然後用消息隊列配置設定政策算法(預設為:消息隊列的平均配置設定算法),計算出待拉取的消息隊列;

RocketMQ 消費者如何實作負載均衡

這裡的平均配置設定算法,類似于分頁的算法,将所有 MessageQueue 排好序類似于記錄,将所有消費端排好序類似頁數,并求出每一頁需要包含的平均 size 和每個頁面記錄的範圍 range ,最後周遊整個 range 而計算出目前消費端應該配置設定到的記錄。

(4) 配置設定到的消息隊列集合與 processQueueTable 做一個過濾比對操作

RocketMQ 消費者如何實作負載均衡

消費者執行個體内 ,processQueueTable 對象存儲着目前負載均衡的隊列 ,以及該隊列的消費快照。

标紅的部分表示與配置設定到的消息隊列集合互不包含,則需要将這些紅色隊列 Dropped 屬性為 true , 然後從 processQueueTable 對象中移除。

綠色的部分表示與配置設定到的消息隊列集合的交集,processQueueTable 對象中已經存在該隊列。

黃色的部分表示這些隊列需要添加到 processQueueTable 對象中,建立這些隊列的消費快照。最後建立拉取消息請求清單,并将請求分發到消息拉取服務,進入拉取消息環節。

繼續閱讀