RocketMQ 是筆者非常喜歡的消息隊列,4.9.X 版本是目前使用最廣泛的版本,但它的消費邏輯相對較重,很多同學學習起來沒有頭緒。
這篇文章,筆者梳理了 RocketMQ 的消費邏輯,希望對大家有所啟發。
1 架構概覽
在展開叢集消費邏輯細節前,我們先對 RocketMQ 4.X 架構做一個概覽。
整體架構中包含四種角色 :
1、NameServer
名字服務是是一個幾乎無狀态節點,可叢集部署,節點之間無任何資訊同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支援 Broker 的動态注冊與發現。
2、BrokerServer
Broker 主要負責消息的存儲、投遞和查詢以及服務高可用保證 。
3、Producer
消息釋出的角色,Producer 通過 MQ 的負載均衡子產品選擇相應的 Broker 叢集隊列進行消息投遞,投遞的過程支援快速失敗并且低延遲。
4、Consumer
消息消費的角色,支援以 push 推,pull 拉兩種模式對消息進行消費。
RocketMQ 叢集工作流程:
1、啟動 NameServer,NameServer 起來後監聽端口,等待 Broker、Producer 、Consumer 連上來,相當于一個路由控制中心。
2、Broker 啟動,跟所有的 NameServer 保持長連接配接,定時發送心跳包。心跳包中包含目前 Broker資訊( IP+端口等 )以及存儲所有 Topic 資訊。注冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的映射關系。
3、收發消息前,先建立 Topic,建立 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發送消息時自動建立 Topic。
4、Producer 發送消息,啟動時先跟 NameServer 叢集中的其中一台建立長連接配接,并從 NameServer 中擷取目前發送的 Topic 存在哪些 Broker 上,輪詢從隊列清單中選擇一個隊列,然後與隊列所在的 Broker 建立長連接配接進而向 Broker 發消息。
5、Consumer 跟 Producer 類似,跟其中一台 NameServer 建立長連接配接,擷取目前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接配接通道,開始消費消息。
2 釋出訂閱
RocketMQ 的傳輸模型是:釋出訂閱模型 。
釋出訂閱模型具有如下特點:
- 消費獨立
- 相比隊列模型的匿名消費方式,釋出訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關系),不同訂閱組之間互相獨立不會互相影響。
- 一對多通信
- 基于獨立身份的設計,同一個主題内的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。是以釋出訂閱模型可以實作一對多通信。
RocketMQ 支援兩種消息模式:叢集消費( Clustering )和廣播消費( Broadcasting )。
叢集消費:同一 Topic 下的一條消息隻會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者執行個體上。
廣播消費:當使用廣播消費模式時,每條消息推送給叢集内所有的消費者,保證消息至少被每個消費者消費一次。
為了實作這種釋出訂閱模型 , RocketMQ 精心設計了它的存儲模型。先進入 Broker 的檔案存儲目錄。
RocketMQ 采用的是混合型的存儲結構。
1、Broker 單個執行個體下所有的隊列共用一個資料檔案(commitlog)來存儲
生産者發送消息至 Broker 端,然後 Broker 端使用同步或者異步的方式對消息刷盤持久化,儲存至 commitlog 檔案中。隻要消息被刷盤持久化至磁盤檔案 commitlog 中,那麼生産者發送的消息就不會丢失。
單個檔案大小預設 1G , 檔案名長度為 20 位,左邊補零,剩餘為起始偏移量,比如 00000000000000000000 代表了第一個檔案,起始偏移量為 0 ,檔案大小為1 G = 1073741824 。
這種設計有兩個優點:
- 充分利用順序寫,大大提升寫入資料的吞吐量;
- 快讀定位消息。
- 因為消息是一條一條寫入到 commitlog 檔案 ,寫入完成後,我們可以得到這條消息的實體偏移量。
- 每條消息的實體偏移量是唯一的, commitlog 檔案名是遞增的,可以根據消息的實體偏移量通過二分查找,定位消息位于那個檔案中,并擷取到消息實體資料。
2、Broker 端的背景服務線程會不停地分發請求并異步建構 consumequeue(消費檔案)和 indexfile(索引檔案)
進入索引檔案存儲目錄 :
1、消費檔案按照主題存儲,每個主題下有不同的隊列,圖中主題 my-mac-topic 有 16 個隊列 (0 到 15) ;
2、每個隊列目錄下 ,存儲 consumequeue 檔案,每個 consumequeue 檔案也是順序寫入,資料格式見下圖。
每個 consumequeue 檔案包含 30 萬個條目,每個條目大小是 20 個位元組,每個檔案的大小是 30 萬 * 20 = 60萬位元組,每個檔案大小約 5.72M 。
和 commitlog 檔案類似,consumequeue 檔案的名稱也是以偏移量來命名的,可以通過消息的邏輯偏移量定位消息位于哪一個檔案裡。
消費檔案按照主題-隊列來儲存 ,這種方式特别适配釋出訂閱模型。
消費者從 Broker 擷取訂閱消息資料時,不用周遊整個 commitlog 檔案,隻需要根據邏輯偏移量從 consumequeue 檔案查詢消息偏移量 , 最後通過定位到 commitlog 檔案, 擷取真正的消息資料。
要實作釋出訂閱模型,還需要一個重要檔案:消費進度檔案。原因有兩點:
- 不同消費組之間互相獨立,不會互相影響 ;
- 消費者下次拉取資料時,需要知道從哪個進度開始拉取 ,就像我們小時候玩單機遊戲存盤一樣。
是以消費進度檔案需要儲存消費組所訂閱主題的消費進度。
我們浏覽下叢集消費場景下的 Broker 端的消費進度檔案 consumerOffset.json 。
在進度檔案 consumerOffset.json 裡,資料以 key-value 的結構存儲,key 表示:主題@消費者組 , value 是 consumequeue 中每個隊列對應的邏輯偏移量 。
寫到這裡,我們粗糙模拟下 RocketMQ 存儲模型如何滿足釋出訂閱模型 。
1、發送消息:生産者發送消息到 Broker ;
2、儲存消息:Broker 将消息存儲到 commitlog 檔案 ,異步線程會建構消費檔案 consumequeue ;
3、消費流程:消費者啟動後,會通過負載均衡配置設定對應的隊列,然後向 Broker 發送拉取消息請求。Broker 收到消費者拉取請求之後,根據訂閱組,消費者編号,主題,隊列名,邏輯偏移量等參數 ,從該主題下的 consumequeue 檔案查詢消息消費條目,然後從 commitlog 檔案中擷取消息實體。消費者在收到消息資料之後,執行消費監聽器,消費完消息;
4、儲存進度:消費者将消費進度送出到 Broker ,Broker 會将該消費組的消費進度存儲在進度檔案裡。
3 消費流程
我們重點講解下叢集消費的消費流程 ,因為叢集消費是使用最普遍的消費模式,了解了叢集消費,廣播消費也就能順理成章的掌握了。
叢集消費示例代碼裡,啟動消費者,我們需要配置三個核心屬性:消費組名、訂閱主題、消息監聽器,最後調用 start 方法啟動。
消費者啟動後,我們可以将整個流程簡化成:
4 負載均衡
消費端的負載均衡是指将 Broker 端中多個隊列按照某種算法配置設定給同一個消費組中的不同消費者,負載均衡是用戶端開始消費的起點。
RocketMQ 負載均衡的核心設計理念是
- 消費隊列在同一時間隻允許被同一消費組内的一個消費者消費
- 一個消費者能同時消費多個消息隊列
負載均衡是每個用戶端獨立進行計算,那麼何時觸發呢 ?
- 消費端啟動時,立即進行負載均衡;
- 消費端定時任務每隔 20 秒觸發負載均衡;
- 消費者上下線,Broker 端通知消費者觸發負載均衡。
負載均衡流程如下:
1、發送心跳
消費者啟動後,它就會通過定時任務不斷地向 RocketMQ 叢集中的所有 Broker 執行個體發送心跳包(消息消費分組名稱、訂閱關系集合、消息通信模式和用戶端執行個體編号等資訊)。
Broker 端在收到消費者的心跳消息後,會将它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時并将封裝後的用戶端網絡通道資訊儲存在本地緩存變量 channelInfoTable 中,為之後做消費端的負載均衡提供可以依據的中繼資料資訊。
2、啟動負載均衡服務
負載均衡服務會根據消費模式為”廣播模式”還是“叢集模式”做不同的邏輯處理,這裡主要來看下叢集模式下的主要處理流程:
(1) 擷取該主題下的消息消費隊列集合;
(2) 查詢 Broker 端擷取該消費組下消費者 Id 清單;
(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然後用消息隊列配置設定政策算法(預設為:消息隊列的平均配置設定算法),計算出待拉取的消息隊列;
這裡的平均配置設定算法,類似于分頁的算法,将所有 MessageQueue 排好序類似于記錄,将所有消費端排好序類似頁數,并求出每一頁需要包含的平均 size 和每個頁面記錄的範圍 range ,最後周遊整個 range 而計算出目前消費端應該配置設定到的記錄。
(4) 配置設定到的消息隊列集合與 processQueueTable 做一個過濾比對操作。
消費者執行個體内 ,processQueueTable 對象存儲着目前負載均衡的隊列 ,以及該隊列的處理隊列 processQueue (消費快照)。
- 标紅的 Entry 部分表示與配置設定到的消息隊列集合互不包含,則需要将這些紅色隊列 Dropped 屬性為 true , 然後從 processQueueTable 對象中移除。
- 綠色的 Entry 部分表示與配置設定到的消息隊列集合的交集,processQueueTable 對象中已經存在該隊列。
- 黃色的 Entry 部分表示這些隊列需要添加到 processQueueTable 對象中,為每個配置設定的新隊列建立一個消息拉取請求 pullRequest , 在消息拉取請求中儲存一個處理隊列 processQueue (隊列消費快照),内部是紅黑樹(TreeMap),用來儲存拉取到的消息。
最後建立拉取消息請求清單,并将請求分發到消息拉取服務,進入拉取消息環節。
5 長輪詢
在負載均衡這一小節,我們已經知道負載均衡觸發了拉取消息的流程。
消費者啟動的時候,會建立一個拉取消息服務 PullMessageService ,它是一個單線程的服務。
核心流程如下:
1、負載均衡服務将消息拉取請求放入到拉取請求隊列 pullRequestQueue , 拉取消息服務從隊列中擷取拉取消息請求 ;
2、拉取消息服務向 Brorker 服務發送拉取請求 ,拉取請求的通訊模式是異步回調模式 ;
消費者的拉取消息服務本身就是一個單線程,使用異步回調模式,發送拉取消息請求到 Broker 後,拉取消息線程并不會阻塞 ,可以繼續處理隊列 pullRequestQueue 中的其他拉取任務。
3、Broker 收到消費者拉取消息請求後,從存儲中查詢出消息資料,然後傳回給消費者;
4、消費者的網絡通訊層會執行拉取回調函數相關邏輯,首先會将消息資料存儲在隊列消費快照 processQueue 裡;
消費快照使用紅黑樹 msgTreeMap 存儲拉取服務拉取到的消息 。
5、回調函數将消費請求送出到消息消費服務 ,而消息消費服務會異步的消費這些消息;
6、回調函數會将進行中隊列的拉取請放入到定時任務中;
7、定時任務再次将消息拉取請求放入到隊列 pullRequestQueue 中,形成了閉環:負載均衡後的隊列總會有任務執行拉取消息請求,不會中斷。
細心的同學肯定有疑問:既然消費端是拉取消息,為什麼是長輪詢呢 ?
雖然拉模式的主動權在消費者這一側,但是缺點很明顯。
因為消費者并不知曉 Broker 端什麼時候有新的消息 ,是以會不停地去 Broker 端拉取消息,但拉取頻率過高, Broker 端壓力就會很大,頻率過低則會導緻消息延遲。
是以要想消費消息的延遲低,服務端的推送必不可少。
下圖展示了 RocketMQ 如何通過長輪詢減小拉取消息的延遲。
核心流程如下:
1、Broker 端接收到消費者的拉取消息請求後,拉取消息處理器開始處理請求,根據拉取請求查詢消息存儲 ;
2、從消息存儲中擷取消息資料 ,若存在新消息 ,則将消息資料通過網絡傳回給消費者。若無新消息,則将拉取請求放入到拉取請求表 pullRequestTable 。
3、長輪詢請求管理服務 pullRequestHoldService 每隔 5 秒從拉取請求表中判斷拉取消息請求的隊列是否有新的消息。
判定标準是:拉取消息請求的偏移量是否小于目前消費隊列最大偏移量,如果條件成立則說明有新消息了。
若存在新的消息 , 長輪詢請求管理服務會觸發拉取消息處理器重新處理該拉取消息請求。
4、當 commitlog 中新增了新的消息,消息分發服務會建構消費檔案和索引檔案,并且會通知長輪詢請求管理服務,觸發拉取消息處理器重新處理該拉取消息請求。
6 消費消息
在拉取消息的流程裡, Broker 端傳回消息資料,消費者的通訊架構層會執行回調函數。
回調線程會将資料存儲在隊列消費快照 processQueue(内部使用紅黑樹 msgTreeMap)裡,然後将消息送出到消費消息服務,消費消息服務會異步消費這些消息。
消息消費服務有兩種類型:并發消費服務和順序消費服務 。
6.1 并發消費
并發消費是指消費者将并發消費消息,消費的時候可能是無序的。
消費消息并發服務啟動後,會初始化三個元件:消費線程池、清理過期消息定時任務、處理失敗消息定時任務。
核心流程如下:
0、通訊架構回調線程會将資料存儲在消費快照裡,然後将消息清單 msgList 送出到消費消息服務
1、 消息清單 msgList 組裝成消費對象
2、将消費對象送出到消費線程池
我們看到10 條消息被組裝成三個消費請求對象,不同的消費線程會執行不同的消費請求對象。
3、消費線程執行消息監聽器
執行完消費監聽器,會傳回消費結果。
4、處理異常消息
當消費異常時,異常消息将重新發回 Broker 端的重試隊列( RocketMQ 會為每個 topic 建立一個重試隊列,以 %RETRY% 開頭),達到重試時間後将消息投遞到重試隊列中進行消費重試。
我們将在重試機制這一節重點講解 RocketMQ 如何實作延遲消費功能 。
假如異常的消息發送到 Broker 端失敗,則重新将這些失敗消息通過處理失敗消息定時任務重新送出到消息消費服務。
5、更新本地消費進度
消費者消費一批消息完成之後,需要儲存消費進度到進度管理器的本地記憶體。
首先我們會從隊列消費快照 processQueue 中移除消息,傳回消費快照 msgTreeMap 第一個偏移量 ,然後調用消費消息進度管理器 offsetStore 更新消費進度。
待更新的偏移量是如何計算的呢?
- 場景1:快照中1001(消息1)到1010(消息10)消費了,快照中沒有了消息,傳回已消費的消息最大偏移量 + 1 也就是1011。
- 場景2:快照中1001(消息1)到1008(消息8)消費了,快照中隻剩下兩條消息了,傳回最小的偏移量 1009。
- 場景3:1001(消息1)在消費對象中因為某種原因一直沒有被消費,即使後面的消息1005-1010都消費完成了,傳回的最小偏移量是1001。
在場景3,RocketMQ 為了保證消息肯定被消費成功,消費進度隻能維持在1001(消息1),直到1001也被消費完,本地的消費進度才會一下子更新到1011。
假設1001(消息1)還沒有消費完成,消費者執行個體突然退出(機器斷電,或者被 kill ),就存在重複消費的風險。
因為隊列的消費進度還是維持在1001,當隊列重新被配置設定給新的消費者執行個體的時候,新的執行個體從 Broker 上拿到的消費進度還是維持在1001,這時候就會又從1001開始消費,1001-1010這批消息實際上已經被消費過還是會投遞一次。
是以業務必須要保證消息消費的幂等性。
寫到這裡,我們會有一個疑問:假設1001(消息1)因為加鎖或者消費監聽器邏輯非常耗時,導緻極長時間沒有消費完成,那麼消費進度就會一直卡住 ,怎麼解決呢 ?
RocketMQ 提供兩種方式一起配合解決:
- 拉取服務根據并發消費間隔配置限流
- 拉取消息服務在拉取消息時候,會判斷目前隊列的 processQueue 消費快照裡消息的最大偏移量 - 消息的最小偏移量大于消費并發間隔(2000)的時候 , 就會觸發流控 , 這樣就可以避免消費者無限循環的拉取新的消息。
- 清理過期消息
- 消費消息并發服務啟動後,會定期掃描所有消費的消息,若目前時間減去開始消費的時間大于消費逾時時間,首先會将過期消息發送 sendMessageBack 指令發送到 Broker ,然後從快照中删除該消息。
6.2 順序消費
順序消息是指對于一個指定的 Topic ,消息嚴格按照先進先出(FIFO)的原則進行消息釋出和消費,即先釋出的消息先消費,後釋出的消息後消費。
順序消息分為分區順序消息和全局順序消息。
1、分區順序消息
對于指定的一個 Topic ,所有消息根據 Sharding Key 進行區塊分區,同一個分區内的消息按照嚴格的先進先出(FIFO)原則進行釋出和消費。同一分區内的消息保證順序,不同分區之間的消息順序不做要求。
- 适用場景:适用于性能要求高,以 Sharding Key 作為分區字段,在同一個區塊中嚴格地按照先進先出(FIFO)原則進行消息釋出和消費的場景。
- 示例:電商的訂單建立,以訂單 ID 作為 Sharding Key ,那麼同一個訂單相關的建立訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照釋出的先後順序來消費。
2、全局順序消息
對于指定的一個 Topic ,所有消息按照嚴格的先入先出(FIFO)的順序來釋出和消費。
- 适用場景:适用于性能要求不高,所有的消息嚴格按照 FIFO 原則來釋出和消費的場景。
- 示例:在證券進行中,以人民币兌換美元為 Topic,在價格相同的情況下,先出價者優先處理,則可以按照 FIFO 的方式釋出和消費全局順序消息。
全局順序消息實際上是一種特殊的分區順序消息,即 Topic 中隻有一個分區,是以全局順序和分區順序的實作原理相同。
因為分區順序消息有多個分區,是以分區順序消息比全局順序消息的并發度和性能更高。
消息的順序需要由兩個階段保證:
- 消息發送
- 如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息産生的順序,業務上要求同一訂單的消息保持順序,例如訂單 A 的消息發送和消費都按照 A1、A2、A3 的順序。
- 如果是普通消息,訂單A 的消息可能會被輪詢發送到不同的隊列中,不同隊列的消息将無法保持順序,而順序消息發送時 RocketMQ 支援将 Sharding Key 相同(例如同一訂單号)的消息序路由到同一個隊列中。
- 下圖是生産者發送順序消息的封裝,原理是發送消息時,實作 MessageQueueSelector 接口, 根據 Sharding Key 使用 Hash 取模法來選擇待發送的隊列。
- 消息消費
- 消費者消費消息時,需要保證單線程消費每個隊列的消息資料,進而實作消費順序和釋出順序的一緻。
順序消費服務的類是 ConsumeMessageOrderlyService ,在負載均衡階段,并發消費和順序消費并沒有什麼大的差别。
最大的差别在于:順序消費會向 Borker 申請鎖 。消費者根據配置設定的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務每隔20秒會重新嘗試。
順序消費核心流程如下:
1、 組裝成消費對象
2、 将請求對象送出到消費線程池
和并發消費不同的是,這裡的消費請求包含消費快照 processQueue ,消息隊列 messageQueue 兩個對象,并不對消息清單做任何處理。
3、 消費線程内,對消費隊列加鎖
順序消費也是通過線程池消費的,synchronized 鎖用來保證同一時刻對于同一個隊列隻有一個線程去消費它
4、 從消費快照中取得待消費的消息清單
消費快照 processQueue 對象裡,建立了一個紅黑樹對象 consumingMsgOrderlyTreeMap 用于臨時存儲的待消費的消息。
5、 執行消息監聽器
消費快照的消費鎖 consumeLock 的作用是:防止負載均衡線程把目前消費的 MessageQueue 對象移除掉。
6、 處理消費結果
消費成功時,首先計算需要送出的偏移量,然後更新本地消費進度。
消費失敗時,分兩種場景:
- 假如已消費次數小于最大重試次數,則将對象 consumingMsgOrderlyTreeMap 中臨時存儲待消費的消息,重新加入到消費快照紅黑樹 msgTreeMap 中,然後使用定時任務嘗試重新消費。
- 假如已消費次數大于等于最大重試次數,則将失敗消息發送到 Broker ,Broker 接收到消息後,會加入到死信隊列裡 , 最後計算需要送出的偏移量,然後更新本地消費進度。
我們做一個關于順序消費的總結 :
- 順序消費需要由兩個階段消息發送和消息消費協同配合,底層支撐依靠的是 RocketMQ 的存儲模型;
- 順序消費服務啟動後,隊列的資料都會被消費者執行個體單線程的執行消費;
- 假如消費者擴容,消費者重新開機,或者 Broker 當機 ,順序消費也會有一定幾率較短時間内亂序,是以消費者的業務邏輯還是要保障幂等。
7 儲存進度
RocketMQ 消費者消費完一批資料後, 會将隊列的進度儲存在本地記憶體,但還需要将隊列的消費進度持久化。
1、 叢集模式
叢集模式下,分兩種場景:
- 拉取消息服務會在拉取消息時,攜帶該隊列的消費進度,送出給 Broker 的拉取消息處理器。
- 消費者定時任務,每隔5秒将本地緩存中的消費進度送出到 Broker 的消費者管理處理器。
Broker 的這兩個處理器都調用消費者進度管理器 consumerOffsetManager 的 commitOffset 方法,定時任務異步将消費進度持久化到消費進度檔案 consumerOffset.json 中。
2、 廣播模式
廣播模式消費進度存儲在消費者本地,定時任務每隔 5 秒通過 LocalFileOffsetStore 持久化到本地檔案offsets.json ,資料格式為 MessageQueue:Offset。
廣播模式下,消費進度和消費組沒有關系,本地檔案 offsets.json 存儲在配置的目錄,檔案中包含訂閱主題中所有的隊列以及隊列的消費進度。
8 重試機制
叢集消費下,重試機制的本質是 RocketMQ 的延遲消息功能。
消費消息失敗後,消費者執行個體會通過 CONSUMER_SEND_MSG_BACK 請求,将失敗消息發回到 Broker 端。
Broker 端會為每個 topic 建立一個重試隊列 ,隊列名稱是:%RETRY% + 消費者組名 ,達到重試時間後将消息投遞到重試隊列中進行消費重試(消費者組會自動訂閱重試 Topic)。最多重試消費 16 次,重試的時間間隔逐漸變長,若達到最大重試次數後消息還沒有成功被消費,則消息将被投遞至死信隊列。
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時 |
8 | 6 分鐘 | 16 | 2 小時 |
開源 RocketMQ 4.X 支援延遲消息,預設支援18 個 level 的延遲消息,這是通過 broker 端的 messageDelayLevel 配置項确定的,如下:
Broker 在啟動時,内部會建立一個内部主題:SCHEDULE_TOPIC_XXXX,根據延遲 level 的個數,建立對應數量的隊列,也就是說18個 level 對應了18個隊列。
我們先梳理下延遲消息的實作機制。
1、生産者發送延遲消息
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設定延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
2、Broker端存儲延遲消息
延遲消息在 RocketMQ Broker 端的流轉如下圖所示:
第一步:修改消息 Topic 名稱和隊列資訊
Broker 端接收到生産者的寫入消息請求後,首先都會将消息寫到 commitlog 中。假如是正常非延遲消息,MessageStore 會根據消息中的 Topic 資訊和隊列資訊,将其轉發到目标 Topic 的指定隊列 consumequeue 中。
但由于消息一旦存儲到 consumequeue 中,消費者就能消費到,而延遲消息不能被立即消費,是以 RocketMQ 将 Topic 的名稱修改為SCHEDULE_TOPIC_XXXX,并根據延遲級别确定要投遞到哪個隊列下。
同時,還會将消息原來要發送到的目标 Topic 和隊列資訊存儲到消息的屬性中。
第二步:建構 consumequeue 檔案時,計算并存儲投遞時間
上圖是 consumequeue 檔案一條消息的格式,最後 8 個位元組存儲 Tag 的哈希值,此時存儲消息的投遞時間。
第三步:定時排程服務啟動
ScheduleMessageService 類是一個定時排程服務,讀取 SCHEDULE_TOPIC_XXXX 隊列的消息,并将消息投遞到目标 Topic 中。
定時排程服務啟動時,建立一個定時排程線程池 ,并根據延遲級别的個數,啟動對應數量的 HandlePutResultTask ,每個 HandlePutResultTask 負責一個延遲級别的消費與投遞。
第四步:投遞時間到了,将消息資料重新寫入到 commitlog
消息到期後,需要投遞到目标 Topic 。第一步已經記錄了原來的 Topic 和隊列資訊,這裡需要重新設定,再存儲到 commitlog 中。
第五步:将消息投遞到目标 Topic 中
Broker 端的背景服務線程會不停地分發請求并異步建構 consumequeue(消費檔案)和 indexfile(索引檔案)。是以消息會直接投遞到目标 Topic 的 consumequeue 中,之後消費者就可以消費到這條消息。
回顧了延遲消息的機制,消費消息失敗後,消費者執行個體會通過 CONSUMER_SEND_MSG_BACK 請求,将失敗消息發回到 Broker 端。
Broker 端 SendMessageProcessor 處理器會調用 asyncConsumerSendMsgBack 方法。
首先判斷消息的目前重試次數是否大于等于最大重試次數,如果達到最大重試次數,或者配置的重試級别小于0,則重新建立 Topic ,規則是 %DLQ% + consumerGroup,後續處理消息發送到死信隊列。
正常的消息會進入 else 分支,對于首次重試的消息,預設的 delayLevel 是 0 ,RocketMQ 會将 delayLevel + 3,也就是加到 3 ,這就是說,如果沒有顯示的配置延時級别,消息消費重試首次,是延遲了第三個級别發起的重試,也就是距離首次發送 10s 後重試,其主題的預設規則是 %RETRY% + consumerGroup。
當延時級别設定完成,重新整理消息的重試次數為目前次數加 1 ,Broker 端将該消息刷盤,邏輯如下:
延遲消息寫入到 commitlog 裡 ,這裡其實和延遲消息機制的第一步類似,後面按照延遲消息機制的流程執行即可(第二步到第六步)。
9 總結
下圖展示了叢集模式下消費者并發消費流程 :
核心流程如下:
- 消費者啟動後,觸發負載均衡服務 ,負載均衡服務為消費者執行個體配置設定對應的隊列 ;
- 配置設定完隊列後,負載均衡服務會為每個配置設定的新隊列建立一個消息拉取請求 pullRequest , 拉取請求儲存一個處理隊列 processQueue,内部是紅黑樹(TreeMap),用來儲存拉取到的消息 ;
- 拉取消息服務單線程從拉取請求隊列 pullRequestQueue 中彈出拉取消息,執行拉取任務 ,拉取請求是異步回調模式,将拉取到的消息放入到處理隊列;
- 拉取請求在一次拉取消息完成之後會複用,重新被放入拉取請求隊列 pullRequestQueue 中 ;
- 拉取完成後,調用消費消息服務 consumeMessageService 的 submitConsumeRequest 方法 ,消費消息服務内部有一個消費線程池;
- 消費線程池的消費線程從消費任務隊列中擷取消費請求,執行消費監聽器 listener.consumeMessage ;
- 消費完成後,若消費成功,則更新偏移量 updateOffset,先更新到記憶體 offsetTable,定時上報到 Broker ;若消費失敗,則将失敗消費發送到 Broker 。
- Broker 端接收到請求後, 調用消費進度管理器的 commitOffset 方法修改記憶體的消費進度,定時刷盤到 consumerOffset.json。
RocketMQ 4.X 的消費邏輯有兩個非常明顯的特點:
- 用戶端代碼邏輯較重。假如要支援一種新的程式設計語言,那麼用戶端就必須實作完整的負載均衡邏輯,此外還需要實作拉消息、位點管理、消費失敗後将消息發回 Broker 重試等邏輯。這給多語言用戶端的支援造成很大的阻礙。
- 保證幂等非常重要。當用戶端更新或者下線時,或者 Broker 當機,都要進行負載均衡操作,可能造成消息堆積,同時有一定幾率造成重複消費。
如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!