天天看點

還不知道事務消息嗎?這篇文章帶你全面掃盲!

雲栖号資訊:【 點選檢視更多行業資訊

在這裡您可以找到不同行業的第一手的上雲資訊,還在等什麼,快來!

在分布式系統中,為了保證資料一緻性是必須使用分布式事務。分布式事務實作方式就很多種,今天主要介紹一下使用 RocketMQ 事務消息,實作分布事務。

為什麼需要事務消息?

很多同學可能不知道事務消息是什麼,沒關系,舉一個真實業務場景,先來帶你了解一下普通的消息存在問題。

還不知道事務消息嗎?這篇文章帶你全面掃盲!

上面業務場景中,當使用者支付成功,将會更新支付訂單,然後發送 MQ 消息。手續費系統将會通過拉取消息,計算手續費然後儲存到另外一個手續費資料庫中。

由于計算手續費這個步驟可以離線計算,是以這裡采用 MQ 解耦支付與計算手續費的流程。

流程主要涉及三個步驟:

  • 更新訂單資料
  • 發送消息給 MQ
  • 手續費系統拉取消息

上面提到的步驟,任何一個都會失敗,如果我們沒有處理,就會使兩邊資料不一緻,将會造成下面兩種情況:

  • 訂單資料更新了,手續費資料沒有生成
  • 手續費資料生成,訂單資料卻沒有更新

這可是涉及到真正的錢,一旦少計算,就會造成資損,真的賠不起!

對于最後一步來講,比較簡單。如果消費消息失敗,隻要沒有送出消息确認,MQ 服務端将會自動重試。

最大的問題在于我們無法保證更新操作與發送消息一緻性。無論我們采用先更新訂單資料,再發送消息,還是先發送消息,再更新訂單資料,都在存在一個成功,一個失敗的可能。

如下所示,采用先發送消息,然後再更新資料庫的方式。

還不知道事務消息嗎?這篇文章帶你全面掃盲!

上面流程消息發送成功之後,再進行本地事務的送出。這個流程看起來很完美,但是想象一下,如果在送出事務時資料庫執行失敗,導緻事務復原了。

然而此時消息已經發送出去,無法撤回。這就導緻手續費系統緊接會消費消息,計算手續費并更新到資料庫中。這就造成支付資料未更新,手續費系統卻生成的不一緻的情況。

那如果我們流程反一下,是不是就好了呢?

還不知道事務消息嗎?這篇文章帶你全面掃盲!

我們使用下面的僞碼表示:

還不知道事務消息嗎?這篇文章帶你全面掃盲!

這裡如果事務送出成功,但是 mq 消息發送失敗,就會導緻支付資料更新但是手續費資料未生成的的不一緻情況。

這裡有的同學可能會想到,将發送 mq 消息步驟移動到事務中,消息發送失敗,復原事務,不就完美了嗎?

僞碼如下:

還不知道事務消息嗎?這篇文章帶你全面掃盲!

上面代碼看起來确實沒什麼問題,消息發送失敗,復原事務。

但是實際上第二步有可能存在消息已經發送到 MQ 服務端,但是由于網絡問題未及時收到 MQ 的響應消息,進而導緻消息發送端認為消息消息發送失敗。

這就會導緻訂單事務復原了,但是手續費系統卻能消費消息,兩邊資料庫又不一緻了。

熟悉 MQ 的同學,可能會想到,消息發送失敗,可以重試啊。

是的,我們可以增加重試次數,重新發送消息。但是這裡我們需要注意,由于消息發送耦合在事務中,過多的重試會拉長資料庫事務執行時間,事務處理時間過長,導緻事務中鎖的持有時間變長,影響整體的資料庫吞吐量。

實際業務中,不太建議将消息發送耦合在資料庫事務中。

事務消息

事務消息是 RocketMQ 提供的事務功能,可以實作分布式事務,進而保證上面事務操作與消息發送要麼都成功,要麼都失敗。

使用事務消息,整體流程如下:

還不知道事務消息嗎?這篇文章帶你全面掃盲!

首先我們将會發送一個半(half) 消息到 MQ 中,通知其開啟一個事務。這裡半消息并不是說消息内容不完整,實際上它包含所有完整的消息内容。

這個半消息與普通的消息唯一的差別在于,在事物送出之前,這個消息對消費者來說是不可見的,消費者不會消費這個消息。

一旦半消息發送成功,我們就可以執行資料庫事務。然後根據事務的執行結果再決定送出或復原事務消息。

如果事務送出成功,将會發送确認消息至 MQ,手續費系統就可以成功消費到這條消息。

如果事務被復原,将會發送復原通知至 MQ,然後 MQ 将會删除這條消息。對于手續費系統來說,都不會知道這條消息的存在。

這就解決了要麼都成功,要麼都失敗的一緻性要求。

實際上面的流程還是存在問題,如果我們送出/復原事務消息失敗怎麼辦?

對于這個問題,RocketMQ 給出一種事務反查的機制。我們需要需要注冊一個回調接口,用于反查本地事務狀态。

RocketMQ 若未收到送出或復原的請求,将會定期去反查回調接口,然後可以根據反查結果決定復原還是送出事務。

RocketMQ 事務消息流程整體如下:

還不知道事務消息嗎?這篇文章帶你全面掃盲!

事務消息示例代碼如下:

public class TransactionMQProducerExample {

public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
    TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");
    // 不定義将會使用預設的
    ExecutorService executorService =
            new ThreadPoolExecutor(2, 5, 100,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), 
                                   new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
    producer.setExecutorService(executorService);
    TransactionListener transactionListener = new TransactionListenerImpl();
    producer.setTransactionListener(transactionListener);
    // 改成自己的位址
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();

    Order order = new Order("66666", "books");

    Message msg =
            new Message("transaction_tp",
                    JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 發送半消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.println(sendResult.getSendStatus());
    producer.shutdown();
}

public static class TransactionListenerImpl implements TransactionListener {

    /**
     * 半消息發送成功将會自動執行該邏輯
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 執行本地事務
        Order order = null;
        try {
            order = JSON.parseObject(new String(msg.getBody(),
                    RemotingHelper.DEFAULT_CHARSET), Order.class);
            boolean isSuccess = updateOrder(order);
            if (isSuccess) {
                // 本地事務執行成功,送出半消息
                System.out.println("本地事務執行成功,送出事務事務消息");
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                // 本地事務執行成功,復原半消息
                System.out.println("本地事務執行失敗,復原事務消息");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            System.out.println("本地事務執行異常");
        }
        // 異常情況傳回未知狀态
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 更新訂單
     * 這裡模拟資料庫更新,傳回事務執行成功
     *
     * @param order
     * @return
     */
    private boolean updateOrder(Order order) throws InterruptedException {
        TimeUnit.SECONDS.sleep(1);
        return true;
    }

    /***
     * 若送出/復原事務消息失敗,rocketmq 自動反查事務狀态
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        try {
            Order order = JSON.parseObject(new String(msg.getBody(),
                    RemotingHelper.DEFAULT_CHARSET), Order.class);
            boolean isSuccess = queryOrder(order.getOrderId());
            if (isSuccess) {
                // 本地事務執行成功,送出半消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                // 本地事務執行成功,復原半消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }

        } catch (Exception e) {
            System.out.println("查詢失敗");
        }
        // 異常情況傳回未知狀态
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 查詢訂單狀态
     * 模拟傳回查詢成功
     *
     * @param orderId
     * @return
     */
    private boolean queryOrder(String orderId) throws InterruptedException {
        TimeUnit.SECONDS.sleep(1);
        return true;
    }
}

@Data
public static class Order {
    private String orderId;

    private String goods;

    public Order(String orderId, String goods) {
        this.orderId = orderId;
        this.goods = goods;
    }
}           

}

上面代碼中:

1.我們需要為生産者指定一個唯一的 ProducerGroup

2.需要繼承 TransactionListener 注解回調接口,其中 executeLocalTransaction 方法執行本地事務,checkLocalTranscation 用來執行檢查本地事務。

3.傳回事務狀态有三種:

  • LocalTransactionState.UNKNOW 中間狀态,RocketMQ 将會反查
  • LocalTransactionState.COMMIT_MESSAGE 送出事務,消息這後續将會消費這條消息
  • LocalTransactionState.ROLLBACK_MESSAGE,復原事務,RocketMQ 将會删除這條消息

事務消息使用注意點

事務消息最大反查次數

由于單個消息反查次數過多,将會導緻半消息隊列堆積,影響性能。 RocketMQ 預設将單個消息的檢查次數限制為 15 次。

我們可以通過修改 broker 配置檔案,增加如下配置:

還不知道事務消息嗎?這篇文章帶你全面掃盲!

當檢查次數超過最大次數後,RocketMQ 将會丢棄消息并且列印錯誤日志。

若想自定義丢棄消息行為,需要修改 RocketMQ broker 端代碼,繼承 AbstractTransactionalMessageCheckListener 重寫 resolveDiscardMsg 方法,加入自定義邏輯。

同步的雙重寫入機制

為了確定事務消息不丢失,并且保證事務完整性,需要将事務消息複制到叢集其他節點,建議使用同步雙重寫入機制。

事務反查時間設定

我們可以設定以下參數,設定 MQ 服務端多久之後開始反查事務消息(自事務消息儲存成功之後開始計算)。

還不知道事務消息嗎?這篇文章帶你全面掃盲!

或者我們可以在 broker.conf 設定以下參數:

還不知道事務消息嗎?這篇文章帶你全面掃盲!

發送端主動設定配置參數優先級大于 broker 端配置。

另外 RocketMQ 還有一個配置用于控制事務性消息檢查間隔:

還不知道事務消息嗎?這篇文章帶你全面掃盲!

如果自定義配置如上,事務消息檢查間隔為 5 秒,事務消息設定檢查時間為 60 s。

這就代表 broker 每隔 5s 檢查一次事務消息,如果此時事務消息到 MQ 服務端時間還未超過 60s,此時将不會反查,直到時間大于 60s。

彩蛋

查找事務消息資料的時候,發現 RocketMQ 文檔存在相關錯誤。

還不知道事務消息嗎?這篇文章帶你全面掃盲!

如上兩處實際是錯誤的,應該修改為:AbstractTransactionalMessageCheckListener 與 transactionTimeout。

還不知道事務消息嗎?這篇文章帶你全面掃盲!

順手修改了一下,送出 PR 。哈哈,也為開源項目貢獻了一份力量。

最後說一句

以前總以為參加開源項目很難,直到最近接連參與兩次開源項目修改,才發現其實并沒有想象中那麼難。由于版本變更,開源項目文檔有些是存在錯誤的,如果我們看到了,順手修複一下,這也是為開源項目貢獻一份力。

才疏學淺,難免會有纰漏,如果你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。

【雲栖号線上課堂】每天都有産品技術專家分享!

課程位址:

https://yqh.aliyun.com/zhibo

立即加入社群,與專家面對面,及時了解課程最新動态!

【雲栖号線上課堂 社群】

https://c.tb.cn/F3.Z8gvnK

原文釋出時間:2020-03-30

本文作者:樓下小黑哥

本文來自:“掘金”,了解相關資訊可以關注“

掘金