天天看點

分布式消息隊列RocketMQ--事務消息--解決分布式事務的最佳實踐 錯誤的方案0 方案1–業務方自己實作 方案2 – RocketMQ 事務消息 人工介入

說到分布式事務,就會談到那個經典的”賬号轉賬”問題:2個賬号,分布處于2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性?

一般的思路都是通過消息中間件來實作“最終一緻性”:A系統扣錢,然後發條消息給中間件,B系統接收此消息,進行加錢。

但這裡面有個問題:A是先update DB,後發送消息呢? 還是先發送消息,後update DB?

假設先update DB成功,發送消息網絡失敗,重發又失敗,怎麼辦? 

假設先發送消息成功,update DB失敗。消息已經發出去了,又不能撤回,怎麼辦?

是以,這裡下個結論: 隻要發送消息和update DB這2個操作不是原子的,無論誰先誰後,都是有問題的。

那這個問題怎麼解決呢??

錯誤的方案0

有人可能想到了,我可以把“發送消息”這個網絡調用和update DB放在同1個事務裡面,如果發送消息失敗,update DB自動復原。這樣不就保證2個操作的原子性了嗎?

這個方案看似正确,其實是錯誤的,原因有2:

(1)網絡的2将軍問題:發送消息失敗,發送方并不知道是消息中間件真的沒有收到消息呢?還是消息已經收到了,隻是傳回response的時候失敗了?

如果是已經收到消息了,而發送端認為沒有收到,執行update db的復原操作。則會導緻A賬号的錢沒有扣,B賬号的錢卻加了。

(2)把網絡調用放在DB事務裡面,可能會因為網絡的延時,導緻DB長事務。嚴重的,會block整個DB。這個風險很大。

基于以上分析,我們知道,這個方案其實是錯誤的!

方案1–業務方自己實作

假設消息中間件沒有提供“事務消息”功能,比如你用的是Kafka。那如何解決這個問題呢?

解決方案如下: 

(1)Producer端準備1張消息表,把update DB和insert message這2個操作,放在一個DB事務裡面。

(2)準備一個背景程式,源源不斷的把消息表中的message傳送給消息中間件。失敗了,不斷重試重傳。允許消息重複,但消息不會丢,順序也不會打亂。

(3)Consumer端準備一個判重表。處理過的消息,記在判重表裡面。實作業務的幂等。但這裡又涉及一個原子性問題:如果保證消息消費 + insert message到判重表這2個操作的原子性?

消費成功,但insert判重表失敗,怎麼辦?關于這個,在Kafka的源碼分析系列,第1篇, exactly once問題的時候,有過讨論。

通過上面3步,我們基本就解決了這裡update db和發送網絡消息這2個操作的原子性問題。

但這個方案的一個缺點就是:需要設計DB消息表,同時還需要一個背景任務,不斷掃描本地消息。導緻消息的處理和業務邏輯耦合額外增加業務方的負擔。

方案2 – RocketMQ 事務消息

為了能解決該問題,同時又不和業務耦合,RocketMQ提出了“事務消息”的概念。

具體來說,就是把消息的發送分成了2個階段:Prepare階段和确認階段。

具體來說,上面的2個步驟,被分解成3個步驟: 

(1) 發送Prepared消息 

(2) update DB 

(3) 根據update DB結果成功或失敗,Confirm或者取消Prepared消息。

可能有人會問了,前2步執行成功了,最後1步失敗了怎麼辦?這裡就涉及到了RocketMQ的關鍵點:RocketMQ會定期(預設是1分鐘)掃描所有的Prepared消息,詢問發送方,到底是要确認這條消息發出去?還是取消此條消息?

具體代碼實作如下:

也就是定義了一個checkListener,RocketMQ會回調此Listener,進而實作上面所說的方案。

// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實作的政策來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務消息的生産者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設定事務決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務的處理邏輯,相當于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構造MSG,省略構造參數
Message msg = new Message(......);
// 發送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
public TransactionSendResult sendMessageInTransaction(.....)  {
    // 邏輯代碼,非實際代碼
    // 1.發送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息發送成功,處理與消息關聯的本地事務單元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.結束事務
    this.endTransaction(sendResult, localTransactionState, localException);
}           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

總結:對比方案2和方案1,RocketMQ最大的改變,其實就是把“掃描消息表”這個事情,不讓業務方做,而是消息中間件幫着做了。

至于消息表,其實還是沒有省掉。因為消息中間件要詢問發送方,事物是否執行成功,還是需要一個“變相的本地消息表”,記錄事物執行狀态。

人工介入

可能有人又要說了,無論方案1,還是方案2,發送端把消息成功放入了隊列,但消費端消費失敗怎麼辦?

消費失敗了,重試,還一直失敗怎麼辦?是不是要自動復原整個流程?

答案是人工介入。從工程實踐角度講,這種整個流程自動復原的代價是非常巨大的,不但實作複雜,還會引入新的問題。比如自動復原失敗,又怎麼處理?

對應這種極低機率的case,采取人工處理,會比實作一個高複雜的自動化復原系統,更加可靠,也更加簡單。

繼續閱讀