自己記錄下rocketmq事物,每次面試都要回頭翻閱,這裡自己再次總結下
ps:這裡是網上的一張圖,其實還是很不全的,broker存儲還是挺重要的,沒标出來
- 發送消息到broker
- broker儲存消息成功,傳回給client端。消息已經成功儲存了
- client端執行本地事物
- 再次發送消息給broker,client端事物處理的結果。broker根據client發來的結果進行消息的處理
- 未收到client端的消息,broker這邊有一個線程,會定時去撈取待處理的消息,進行回查
- 确認client事物的結果狀态
- 把client端事物結果發送給broker,borker進行處理
這裡我把 2,4擴充下
broker如何儲存消息,以緻于 consumer 端沒有發現消息呢
這裡就在于 發送事物消息的時候,client端把topic給換掉了,換成half消息的topic。這樣consumer沒查詢到自己定于的topic的消息,自然不會去消費
還有一點,我覺得挺重要的。消息都是寫入到commit log裡(消息是不會進行修改的,rocketmq是順序寫入的不會去做修改操作),那如果去commit log裡去查詢消息呢,消息到了broker會生成 一個 ConsumeQueue
簡單介紹下ConsumeQueue
Consumequeue類對應的是每個topic和queuId下面的所有檔案。Consumequeue類檔案的存儲路徑預設為$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每個檔案由30W條資料組成,每條資料的結構如下所示:
commitLog offset (8) | Size(4) | Message Tag HashCode(8) |
消息的起始實體偏移量physical offset(long 8位元組)+消息大小size(int 4位元組)+tagsCode(long 8位元組),每條資料的大小為20個位元組,進而每個檔案的預設大小為600萬個位元組。
就是通過偏移量和位元組大小 在 commit log裡面尋找到那條記錄。
第二點的擴充
- 事物消息在clinet端把消息的topic更換為RMQ_SYS_TRANS_HALF_TOPIC,把原來的topic的名字存在msg的屬性當中
- 事物在broker存儲msg,并生成 topic為RMQ_SYS_TRANS_HALF_TOPIC的Consumequeue,用于尋找 commit log 裡面生成的半消息
第四點的擴充
- client處理完事物,再次發送給broker的
- broker 根據Consumequeue 查找出那條 half消息
- 根據處理結果,成功 就在commitlog裡面生成一條原先topic的消息,并生成相應的 Consumequeue,再生成一條 topic為 RMQ_SYS_TRANS_OP_HALF_TOPIC的op消息和 op消息的 Consumequeue。失敗了就隻生成 op消息。(ps:為什麼需要op消息呢,因為消息需要一個狀态來表示是否處理成功)
- 注意這裡是生成消息,不是在原來的消息體上進行修改
- 發送消息
- 發送消息成功傳回producer
- producer把本地執行的結果commit/rollback給broker。
- broker收到producer的消息,成功則生成nomal和op。失敗則隻生成op
- half消息會有一個timer,定時去producer去擷取事物的狀态。
以上是RocketMQ事務消息實作的示意圖:
- 通過寫Half消息的方式來實作一階段消息對使用者不可見
- 通過Op消息來标記事務消息的狀态
- 通過讀取Half消息來生成一條新的Normal消息來完成二階段Commit之後消息對Consumer可見
- 通過Half消息來執行回查