天天看點

rocketmq事物

自己記錄下rocketmq事物,每次面試都要回頭翻閱,這裡自己再次總結下
rocketmq事物

 ps:這裡是網上的一張圖,其實還是很不全的,broker存儲還是挺重要的,沒标出來

  1. 發送消息到broker
  2. broker儲存消息成功,傳回給client端。消息已經成功儲存了
  3. client端執行本地事物
  4. 再次發送消息給broker,client端事物處理的結果。broker根據client發來的結果進行消息的處理
  5. 未收到client端的消息,broker這邊有一個線程,會定時去撈取待處理的消息,進行回查
  6. 确認client事物的結果狀态
  7. 把client端事物結果發送給broker,borker進行處理

這裡我把 2,4擴充下

broker如何儲存消息,以緻于 consumer 端沒有發現消息呢

這裡就在于 發送事物消息的時候,client端把topic給換掉了,換成half消息的topic。這樣consumer沒查詢到自己定于的topic的消息,自然不會去消費

還有一點,我覺得挺重要的。消息都是寫入到commit log裡(消息是不會進行修改的,rocketmq是順序寫入的不會去做修改操作),那如果去commit log裡去查詢消息呢,消息到了broker會生成 一個 ConsumeQueue 

簡單介紹下ConsumeQueue

Consumequeue類對應的是每個topic和queuId下面的所有檔案。Consumequeue類檔案的存儲路徑預設為$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每個檔案由30W條資料組成,每條資料的結構如下所示:

commitLog offset (8) Size(4) Message Tag HashCode(8)

消息的起始實體偏移量physical offset(long 8位元組)+消息大小size(int 4位元組)+tagsCode(long 8位元組),每條資料的大小為20個位元組,進而每個檔案的預設大小為600萬個位元組。

就是通過偏移量和位元組大小 在 commit log裡面尋找到那條記錄。

第二點的擴充

  1. 事物消息在clinet端把消息的topic更換為RMQ_SYS_TRANS_HALF_TOPIC,把原來的topic的名字存在msg的屬性當中
  2. 事物在broker存儲msg,并生成 topic為RMQ_SYS_TRANS_HALF_TOPIC的Consumequeue,用于尋找 commit log 裡面生成的半消息
  3. rocketmq事物

第四點的擴充

  1. client處理完事物,再次發送給broker的
  2. broker 根據Consumequeue 查找出那條 half消息
  3. 根據處理結果,成功 就在commitlog裡面生成一條原先topic的消息,并生成相應的 Consumequeue,再生成一條 topic為 RMQ_SYS_TRANS_OP_HALF_TOPIC的op消息和 op消息的 Consumequeue。失敗了就隻生成 op消息。(ps:為什麼需要op消息呢,因為消息需要一個狀态來表示是否處理成功)
  4. 注意這裡是生成消息,不是在原來的消息體上進行修改
rocketmq事物
  1. 發送消息
  2. 發送消息成功傳回producer
  3. producer把本地執行的結果commit/rollback給broker。
  4. broker收到producer的消息,成功則生成nomal和op。失敗則隻生成op
  5. half消息會有一個timer,定時去producer去擷取事物的狀态。

以上是RocketMQ事務消息實作的示意圖:

  • 通過寫Half消息的方式來實作一階段消息對使用者不可見
  • 通過Op消息來标記事務消息的狀态
  • 通過讀取Half消息來生成一條新的Normal消息來完成二階段Commit之後消息對Consumer可見
  • 通過Half消息來執行回查