雲栖号資訊:【 點選檢視更多行業資訊】
在這裡您可以找到不同行業的第一手的上雲資訊,還在等什麼,快來!
在分布式系統中,為了保證資料一緻性是必須使用分布式事務。分布式事務實作方式就很多種,今天主要介紹一下使用 RocketMQ 事務消息,實作分布事務。
為什麼需要事務消息?
很多同學可能不知道事務消息是什麼,沒關系,舉一個真實業務場景,先來帶你了解一下普通的消息存在問題。

上面業務場景中,當使用者支付成功,将會更新支付訂單,然後發送 MQ 消息。手續費系統将會通過拉取消息,計算手續費然後儲存到另外一個手續費資料庫中。
由于計算手續費這個步驟可以離線計算,是以這裡采用 MQ 解耦支付與計算手續費的流程。
流程主要涉及三個步驟:
- 更新訂單資料
- 發送消息給 MQ
- 手續費系統拉取消息
上面提到的步驟,任何一個都會失敗,如果我們沒有處理,就會使兩邊資料不一緻,将會造成下面兩種情況:
- 訂單資料更新了,手續費資料沒有生成
- 手續費資料生成,訂單資料卻沒有更新
這可是涉及到真正的錢,一旦少計算,就會造成資損,真的賠不起!
對于最後一步來講,比較簡單。如果消費消息失敗,隻要沒有送出消息确認,MQ 服務端将會自動重試。
最大的問題在于我們無法保證更新操作與發送消息一緻性。無論我們采用先更新訂單資料,再發送消息,還是先發送消息,再更新訂單資料,都在存在一個成功,一個失敗的可能。
如下所示,采用先發送消息,然後再更新資料庫的方式。
上面流程消息發送成功之後,再進行本地事務的送出。這個流程看起來很完美,但是想象一下,如果在送出事務時資料庫執行失敗,導緻事務復原了。
然而此時消息已經發送出去,無法撤回。這就導緻手續費系統緊接會消費消息,計算手續費并更新到資料庫中。這就造成支付資料未更新,手續費系統卻生成的不一緻的情況。
那如果我們流程反一下,是不是就好了呢?
我們使用下面的僞碼表示:
這裡如果事務送出成功,但是 mq 消息發送失敗,就會導緻支付資料更新但是手續費資料未生成的的不一緻情況。
這裡有的同學可能會想到,将發送 mq 消息步驟移動到事務中,消息發送失敗,復原事務,不就完美了嗎?
僞碼如下:
上面代碼看起來确實沒什麼問題,消息發送失敗,復原事務。
但是實際上第二步有可能存在消息已經發送到 MQ 服務端,但是由于網絡問題未及時收到 MQ 的響應消息,進而導緻消息發送端認為消息消息發送失敗。
這就會導緻訂單事務復原了,但是手續費系統卻能消費消息,兩邊資料庫又不一緻了。
熟悉 MQ 的同學,可能會想到,消息發送失敗,可以重試啊。
是的,我們可以增加重試次數,重新發送消息。但是這裡我們需要注意,由于消息發送耦合在事務中,過多的重試會拉長資料庫事務執行時間,事務處理時間過長,導緻事務中鎖的持有時間變長,影響整體的資料庫吞吐量。
實際業務中,不太建議将消息發送耦合在資料庫事務中。
事務消息
事務消息是 RocketMQ 提供的事務功能,可以實作分布式事務,進而保證上面事務操作與消息發送要麼都成功,要麼都失敗。
使用事務消息,整體流程如下:
首先我們将會發送一個半(half) 消息到 MQ 中,通知其開啟一個事務。這裡半消息并不是說消息内容不完整,實際上它包含所有完整的消息内容。
這個半消息與普通的消息唯一的差別在于,在事物送出之前,這個消息對消費者來說是不可見的,消費者不會消費這個消息。
一旦半消息發送成功,我們就可以執行資料庫事務。然後根據事務的執行結果再決定送出或復原事務消息。
如果事務送出成功,将會發送确認消息至 MQ,手續費系統就可以成功消費到這條消息。
如果事務被復原,将會發送復原通知至 MQ,然後 MQ 将會删除這條消息。對于手續費系統來說,都不會知道這條消息的存在。
這就解決了要麼都成功,要麼都失敗的一緻性要求。
實際上面的流程還是存在問題,如果我們送出/復原事務消息失敗怎麼辦?
對于這個問題,RocketMQ 給出一種事務反查的機制。我們需要需要注冊一個回調接口,用于反查本地事務狀态。
RocketMQ 若未收到送出或復原的請求,将會定期去反查回調接口,然後可以根據反查結果決定復原還是送出事務。
RocketMQ 事務消息流程整體如下:
事務消息示例代碼如下:
public class TransactionMQProducerExample {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");
// 不定義将會使用預設的
ExecutorService executorService =
new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
// 改成自己的位址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Order order = new Order("66666", "books");
Message msg =
new Message("transaction_tp",
JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發送半消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println(sendResult.getSendStatus());
producer.shutdown();
}
public static class TransactionListenerImpl implements TransactionListener {
/**
* 半消息發送成功将會自動執行該邏輯
*
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執行本地事務
Order order = null;
try {
order = JSON.parseObject(new String(msg.getBody(),
RemotingHelper.DEFAULT_CHARSET), Order.class);
boolean isSuccess = updateOrder(order);
if (isSuccess) {
// 本地事務執行成功,送出半消息
System.out.println("本地事務執行成功,送出事務事務消息");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事務執行成功,復原半消息
System.out.println("本地事務執行失敗,復原事務消息");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
System.out.println("本地事務執行異常");
}
// 異常情況傳回未知狀态
return LocalTransactionState.UNKNOW;
}
/**
* 更新訂單
* 這裡模拟資料庫更新,傳回事務執行成功
*
* @param order
* @return
*/
private boolean updateOrder(Order order) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return true;
}
/***
* 若送出/復原事務消息失敗,rocketmq 自動反查事務狀态
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
try {
Order order = JSON.parseObject(new String(msg.getBody(),
RemotingHelper.DEFAULT_CHARSET), Order.class);
boolean isSuccess = queryOrder(order.getOrderId());
if (isSuccess) {
// 本地事務執行成功,送出半消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事務執行成功,復原半消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
System.out.println("查詢失敗");
}
// 異常情況傳回未知狀态
return LocalTransactionState.UNKNOW;
}
/**
* 查詢訂單狀态
* 模拟傳回查詢成功
*
* @param orderId
* @return
*/
private boolean queryOrder(String orderId) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return true;
}
}
@Data
public static class Order {
private String orderId;
private String goods;
public Order(String orderId, String goods) {
this.orderId = orderId;
this.goods = goods;
}
}
}
上面代碼中:
1.我們需要為生産者指定一個唯一的 ProducerGroup
2.需要繼承 TransactionListener 注解回調接口,其中 executeLocalTransaction 方法執行本地事務,checkLocalTranscation 用來執行檢查本地事務。
3.傳回事務狀态有三種:
- LocalTransactionState.UNKNOW 中間狀态,RocketMQ 将會反查
- LocalTransactionState.COMMIT_MESSAGE 送出事務,消息這後續将會消費這條消息
- LocalTransactionState.ROLLBACK_MESSAGE,復原事務,RocketMQ 将會删除這條消息
事務消息使用注意點
事務消息最大反查次數
由于單個消息反查次數過多,将會導緻半消息隊列堆積,影響性能。 RocketMQ 預設将單個消息的檢查次數限制為 15 次。
我們可以通過修改 broker 配置檔案,增加如下配置:
當檢查次數超過最大次數後,RocketMQ 将會丢棄消息并且列印錯誤日志。
若想自定義丢棄消息行為,需要修改 RocketMQ broker 端代碼,繼承 AbstractTransactionalMessageCheckListener 重寫 resolveDiscardMsg 方法,加入自定義邏輯。
同步的雙重寫入機制
為了確定事務消息不丢失,并且保證事務完整性,需要将事務消息複制到叢集其他節點,建議使用同步雙重寫入機制。
事務反查時間設定
我們可以設定以下參數,設定 MQ 服務端多久之後開始反查事務消息(自事務消息儲存成功之後開始計算)。
或者我們可以在 broker.conf 設定以下參數:
發送端主動設定配置參數優先級大于 broker 端配置。
另外 RocketMQ 還有一個配置用于控制事務性消息檢查間隔:
如果自定義配置如上,事務消息檢查間隔為 5 秒,事務消息設定檢查時間為 60 s。
這就代表 broker 每隔 5s 檢查一次事務消息,如果此時事務消息到 MQ 服務端時間還未超過 60s,此時将不會反查,直到時間大于 60s。
彩蛋
查找事務消息資料的時候,發現 RocketMQ 文檔存在相關錯誤。
如上兩處實際是錯誤的,應該修改為:AbstractTransactionalMessageCheckListener 與 transactionTimeout。
順手修改了一下,送出 PR 。哈哈,也為開源項目貢獻了一份力量。
最後說一句
以前總以為參加開源項目很難,直到最近接連參與兩次開源項目修改,才發現其實并沒有想象中那麼難。由于版本變更,開源項目文檔有些是存在錯誤的,如果我們看到了,順手修複一下,這也是為開源項目貢獻一份力。
才疏學淺,難免會有纰漏,如果你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。
【雲栖号線上課堂】每天都有産品技術專家分享!
課程位址:
https://yqh.aliyun.com/zhibo立即加入社群,與專家面對面,及時了解課程最新動态!
【雲栖号線上課堂 社群】
https://c.tb.cn/F3.Z8gvnK
原文釋出時間:2020-03-30
本文作者:樓下小黑哥
本文來自:“掘金”,了解相關資訊可以關注“
掘金”