文章目錄
- 前言
- RocketMQ
-
- 組成
- 提高性能地方
- 過期機制
- 高可用
- NameSpace
- 刷盤
- 消息儲存
- 消息發送
- 消息消費
- 事務消息
前言
本部落格是觀看中通架構師丁威老師《rocketmq技術内幕》總結,以及網上一些技術部落格,僅用于個人學習以及分享個人知識盲區。
RocketMQ
組成
producer,consumer,broker,namespace
提高性能地方
Message Queue 是broker上分片的最小機關,也能提高消費性能
過期機制
commitlog不管有沒有消費,72小時之後會删除
高可用
producer:重試,broker不可用暫時跳過
broker:同步、異步刷盤
consumer:ack,至少一次消費
NameSpace
為broker提供注冊服務,讓producer可以拿到對應的ip然後請求儲存資料。
10s檢測存活
broker當機,用戶端去拉取,30s後才能感應
如果broker當機會咋樣?
A:namespace叢集是不會進行互相同步的,就是各自有各自的資料。broker當機不會立刻删除,而是會超過120s心跳之後再删除。
那如果不删除producer怎麼保證消息正常發送?
A:producer會重試去發送消息,然後也會避開之後失敗的broker。
刷盤
同步刷盤
MapperFile flush
異步刷盤
先追加ByteBuffer,但是沒有刷盤,過一段時間再刷盤
消息儲存
為了性能,Message Queue 很多檔案
Index File加快消息的檢索
offset關系:key topic#msgId
commitlog:儲存所有消息
consume queue :供消費者消費,會儲存commitlog offset|size|tag hashcode
通過offset去檢索commitlog
MQ寫入過程
- 寫入commitlog
- 建立消息全局唯一id(需要擷取寫鎖)
- IP+端口号+消息偏移量
- msgId–>内容
- 改之前的偏移量
- 釋放鎖
- 異步寫入consumer queue,以及index file。(通過)
提高性能所做操作
記憶體映射檔案,提高IO
異步寫入consumer queue不同步的問題
比如說commitlog寫入成功,但是在異步寫入consume queue的時候,當機了。這時會根據abort檔案去判斷,啟動的時候如果存在則是異常當機,不存在則是正常關機,會有個關閉回調鈎子去删除這個檔案。
如果abort檔案存在,則需要檢測是否同步正常。
消息發送
順序消息
全局順序:隻能設定止盈一個主題隊列,犧牲性能
局部順序:把它放在同一個消息隊列裡頭
send(List<MessageQueue> queue){
orderId%queue.size = ?;
return ?;
}
消息發送流程
校驗消息 校驗消息 namespace會根據broker去排序 消息發送
消息消費
具體原理
rocketmq采用長輪詢去拉取消息,5s一次,避免消費端消息堆積、壓力啥的。
依據:消息隊列最大偏移量>待拉取偏移量
無法實時監聽消息到達?
到達的時候會去喚醒線程出發檢查
消費模式
叢集:監聽一個topic的消費組,隻有一個能消費
廣播:都能消費一次
拉取流程
叢集:會向broker加鎖(隊列)比如說忽然新增消費者,broker進行queue重新配置設定,由于queue目前鎖沒有釋放,是以之前配置設定的隊列不用當心會被其他新的消費者消費,會延遲等到下一次負載

消費之後會将本地offersetSrote跟broker offerset進行同步
消費流程
消費成功 consume_success ack 消費失敗 consume_later 發送成功,消費失敗的話,重新消費 發送失敗,會延遲5S繼續消費
接收到consume_later
會建立跟之前一樣的消息,唯一的msgid,放到commitlog,異步到consume queue
消息重複消費場景
消息消費之後,ack因為網絡問題沒有送出成功,需要做消息幂等性問題
多次消費失敗
如果多次消費失敗會進入DLQ,然後進行人工處理
消費進度,offerset儲存
我們可以看到不管是消息儲存,或者消費等等都是依據offerset去處理的,在叢集模式下是儲存在broker,為了叢集一緻。在廣播模式是儲存在消費端各自儲存。
tag過濾
consume queue
|- 8 Byte -|-4 Byte-|- 8 Byte -|
commitlog offerset size tag hashcode
事務消息
采用兩段式,以及定時掃描消息狀态表來復原或者送出(主要防範逾時問題)