天天看點

為了保證資料一緻性發送消息時先寫庫還是先發消息?

作者:架構師成長曆程
為了保證資料一緻性發送消息時先寫庫還是先發消息?

随着分布式服務架構的流行與普及,原來在單體應用中執行的多個邏輯操作,現在被拆分成了多個服務之間的遠端調用。雖然服務化為我們的系統帶來了水準伸縮的能力,然而随之而來挑戰就是分布式事務問題,多個服務之間使用自己單獨維護的資料庫,它們彼此之間不在同一個事務中,假如A執行成功了,B執行卻失敗了,而A的事務此時已經送出,無法復原,那麼最終就會導緻兩邊資料不一緻性的問題;盡管很早之前就有基于兩階段送出的XA分布式事務,但是這類方案因為需要資源的全局鎖定,導緻性能極差;是以後面就逐漸衍生出了消息最終一緻性、TCC等柔性事務的分布式事務方案,本文主要分析的是基于消息的最終一緻性方案。

普通消息的處理流程

為了保證資料一緻性發送消息時先寫庫還是先發消息?
  1. 消息生成者發送消息
  2. MQ收到消息,将消息進行持久化,在存儲中新增一條記錄
  3. 傳回ACK給生産者
  4. MQ push 消息給對應的消費者,然後等待消費者傳回ACK
  5. 如果消息消費者在指定時間内成功傳回ack,那麼MQ認為消息消費成功,在存儲中删除消息,即執行第6步;如果MQ在指定時間内沒有收到ACK,則認為消息消費失敗,會嘗試重新push消息,重複執行4、5、6步驟
  6. MQ删除消息

先問一個問題:RocketMQ是如何保證消息與資料庫事務的一緻性?

第一時間可能會想到RocketMQ的事務消息

我們以日常開發中的案例來進行分析:下單送積分。使用者在下單後,訂單系統儲存訂單資料,然後發送消息到MQ,積分系統訂閱這個消息,然後給使用者加積分。這就引出了一個問題,從生産者訂單系統角度看,到底是先寫庫還是先發消息 呢?那我們接下來就分别看下這兩種情況。

1. 先寫庫後發消息

我們先通過一段僞代碼來分析下:

public void createOrder(final Order order) throws Exception {
    //模拟spirng的tx模闆
    transactionTemplate.execute(new TransactionCallback<Boolean>() {
        @Override
        public Boolean doInTransaction(TransactionStatus status) {
            //本地資料插入
            orderMapper.save(order);
            orderDetailMapper.save(order.getOrderDetail());

            //模拟 mq 發送消息
            SendResult send = producer.send(orderMessage);;
            if (send.getSendStatus() == SendStatus.SEND_OK) {
                status.setRollbackOnly();
            }

            return Boolean.TRUE;
        }
    });

}
           

我們來分析下它的過程:

首先,執行本地資料庫事務,插入資料,注意此時還沒有commit, 緊接着發送消息到MQ, 這中間可能由于網絡波動等原因,導緻生産者遲遲沒有收到broker的響應結果,比如5s内都沒有傳回SendResult給生産者,這也就意味着這5s内本地資料庫事務是無法commit的,如果在高并發的場景下,資料庫連接配接資源很快就會被耗盡,後續的請求則無法處理,最終系統将會崩潰。

既然我們知道了先寫庫後發消息有這樣的問題,那麼如果是先發消息後寫庫呢?

為了保證資料一緻性發送消息時先寫庫還是先發消息?

2.先發消息後寫庫

我們還是先看下代碼:

java複制代碼public void createOrder(Order order) {
    try {
        //先發送消息
        SendResult send = producer.send(orderMessage);
        if (send.getSendStatus() == SendStatus.SEND_OK) {
            orderMapper.save(order);
            orderDetailMapper.save(order.getOrderDetail());

            //送出事務
            connection.commit();
        }
    } catch (Exception e) {
        //復原
        connection.rollback();
    }
}    
           

這樣也是有問題的:

  1. 首先他也存在先寫庫後發消息的問題,一旦MQ由于網絡等原因長時間沒有傳回SendResult給生産者,将會導緻本地事務無法被送出或復原,高并發下資源将會被快速耗盡。
  2. 其次,生産者将消息發送出去并快速響應了,但是執行本地資料庫事務時出現了錯誤,比如上述代碼中的orderMapper.save(order)執行出錯了,這也就意味着消息已經發送出去,消費者可以消費了,但是此時本地事務失敗了,為了彌補錯誤,此時可能需要“復原”之前發送的消息,但是此時這條消息可能已經被消費了,就算沒有被消費,每次我都在發送消息後判斷是否出現了異常,如果出現了異常在發送條"復原"的消息,這無疑是增加了開發的複雜度,也顯得備援。

那麼有沒有什麼更好的方式,既可以不阻塞本地資料庫事務,還能保證最終一緻性呢?

這就是接下來我們要說的RocketMQ的事務消息,它可以保證本地事務與MQ消息的最終一緻性。

事務消息我們之前有分析過它的源碼和流程,這裡我們簡單看下

為了保證資料一緻性發送消息時先寫庫還是先發消息?

知道了事務消息的大緻流程後,接下來我們還是通過僞代碼來看下它的實作過程。

3.事務消息

  1. 發送事務消息
發送的topic是 “tx_order_topic”,消費者訂閱的也是這個,但是在發送到broker時,他會在内部将我們的topic做一次修改,這樣對消費者就不可見了。
@Slf4j
@Controller
public class OrderCreateController {

    //rocketmq 發送消息的模闆
    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @ResponseBody
    @GetMapping("/order/{buyer}")
    public String createOrder(@PathVariable String buyer) {

        //@Accessors(chain = true)
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setPhone("18883858508").setAddress("上海外灘xxxxx").setOrderDetailId(UUID.randomUUID().toString());

        Order order = new Order();
        order.setOrderId(UUID.randomUUID().toString()).setBuyer(buyer).setOrderDetail(orderDetail);

        Message<Order> message = MessageBuilder.withPayload(order).build();

        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_order_topic", message, null);
        if (SendStatus.SEND_OK == result.getSendStatus()) {
            log.info("發送消息成功, result: {}", result);
        }
        //回查訂單表
        return "order create success";
    }
}
           
rocketMQTemplate.sendMessageInTransaction(...)要等本地事務執行完畢,才會傳回 TransactionSendResult
  1. 執行本地事務
java複制代碼@Slf4j
@RocketMQTransactionListener
public class CreateOrderCheckerListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private OrderDetailMapper orderDetailMapper;

    @Autowired
    private TransactionTemplate transactionTemplate;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("message: {}, args: {}", msg, arg);

        String orderMsg = new String((byte[]) msg.getPayload());
        final Order order = JSON.parseObject(orderMsg, Order.class);
        log.info("order info : {}", order);
        try {
            //放到同一個本地事務中
            this.transactionTemplate.executeWithoutResult(status -> {
                this.orderMapper.saveOrder(order);
               // int x = 1 / 0;
                this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
            });

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("儲存訂單失敗", e);
            //觸發回查
            return RocketMQLocalTransactionState.UNKNOWN;
            //如果是ROLLBACK,則復原消息,rocketmq将廢棄這條消息
        }
    }

    //先忽略回查的邏輯
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {}
}
           

如果本地事務執行成功(訂單正常入庫),producer将給Broker發送一個COMMIT的辨別,此時broker會将之前被替換了的topic給替換回去,這樣消費者就可以消費了。

java複制代碼@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "qiuguan_test_consumer_group", topic = "tx_order_topic")
public class RewardsPoints implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order message) {
        log.info("積分系統根據訂單增加積分 : {}", message);
    }
}
           

如果本地執行過程中發生了異常,比如網絡抖動等,沒有正常入庫,此時給Broker發送一個UNKNOW的辨別,broker收到UNKNOW辨別後,預設按照每分鐘一次的頻率發起回查。

  1. 消息回查
java複制代碼@Slf4j
@RocketMQTransactionListener
public class CreateOrderCheckerListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private OrderDetailMapper orderDetailMapper;

    @Autowired
    private TransactionTemplate transactionTemplate;

    //執行本地事務邏輯
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {}


    //回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("執行本地事務回查:{}", LocalDateTime.now());
        String orderMsg = new String((byte[]) msg.getPayload());
        final Order order = JSON.parseObject(orderMsg, Order.class);
        log.info("回查order: {}", order);

        //回查次數
        //int checkTimes = msg.getHeaders().get("TRANSACTION_CHECK_TIMES", Integer.class);

        Order o = this.orderMapper.getOrder(order.getOrderId());
        if (o == null) {
            try {
                this.transactionTemplate.executeWithoutResult(status -> {
                    this.orderMapper.saveOrder(order);
                    this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
                });
            } catch (Exception e) {
                log.error("儲存訂單失敗", e);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }

        return RocketMQLocalTransactionState.COMMIT;
    }
}
           

在回查的時候我們可以檢查資料庫是否插入了訂單,如果沒有,此時我們可以再次嘗試入庫,如果入庫成功,則響應給Broker一個COMMIT辨別,此時該消息就可以被消費者消費了,如果依然入庫失敗,可以等待再次回查,或者復原。如果是復原,則Broker将丢棄該消費,消費者也将無法消費。

接下來我們分析下使用RocketMQ的事務消息有哪些問題:

  1. 生産者發送事務消息失敗
為了保證資料一緻性發送消息時先寫庫還是先發消息?
這種情況就直接抛出異常即可,本地事務也不會執行,更不會存在資料不一緻的問題。
  1. 生産者發送消息成功,但是本地事務執行失敗
kotlin複制代碼public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    log.info("message: {}, args: {}", msg, arg);
    try {
        this.transactionTemplate.executeWithoutResult(status -> {
           this.orderMapper.saveOrder(order);
           int x = 1 / 0;
           this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
        });

        return RocketMQLocalTransactionState.COMMIT;
    } catch (Exception e) {
        log.error("儲存訂單失敗", e);
        //復原消息
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
           
一旦本地事務執行失敗,則資料庫将會復原,同時給broker發送ROLLBACK辨別,broker收到該辨別後,将廢棄掉這條消息,消費者也無法消費這條消息,這樣也不會出現資料不一緻的問題。
  1. 生産者發送消息成功,本地事務也執行成功,但是在生産者将COMMIT辨別發送給broker時,發生了網絡抖動,沒有及時收到COMMIT指令。
kotlin複制代碼public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    log.info("message: {}, args: {}", msg, arg);
    try {
        this.transactionTemplate.executeWithoutResult(status -> {
           this.orderMapper.saveOrder(order);
           this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
        });
        //網絡抖動...
        return RocketMQLocalTransactionState.COMMIT;
    } catch (Exception e) {
        log.error("儲存訂單失敗", e);
        //復原消息
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
           
本地資料庫事務執行成功,訂單資料儲存到表中,broker由于網絡抖動沒有及時收到COMMIT指令,此時消息還是一條半事務消息,消費者還是無法消費,這樣本地事務與RocketMQ消息的一緻性就被破壞了。

RocketMQ為了解決這個問題,引入了消息回查機制,對于半事務消息,如果沒有及時收到COMMIT/ROLLBACK指令,它會嘗試主動與broker進行通信,調用監聽器的 checkLocalTransaction(..) 方法再次确認之前的本地事務是否成功。

java複制代碼public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    log.info("執行本地事務回查:{}", LocalDateTime.now());

    final Order order = JSON.parseObject(new String((byte[]) msg.getPayload()), Order.class);
    log.info("回查order: {}", order);

    /**
     * 由于之前本地事務已經執行成功,資料插入了表中,隻是在給broker發送COMMIT辨別時發生了網絡閃斷
     * 是以這裡回查的時候,是可以從資料庫表中查詢到訂單資料的,此時就可以給broker發送一個COMMIT辨別
     * 這樣broker就會把這對消費者不可見的消息修改為可見,此時就可以消費了。
     */
    Order o = this.orderMapper.getOrder(order.getOrderId());


    /**
     * 如果資料庫中沒有訂單資料,說明之前的插入就是失敗的,此時這裡嘗試再次插入或者直接復原就可以了
     */
    return o == null ? RocketMQLocalTransactionState.ROLLBACK : RocketMQLocalTransactionState.COMMIT;
}
           

普通消息處理存在的一緻性問題

我們以訂單建立為例,訂單系統先建立訂單(本地事務),再發送消息給下遊處理;如果訂單建立成功,然而消息沒有發送出去,那麼下遊所有系統都無法感覺到這個事件,會出現髒資料;

public void processOrder() {
    // 訂單處理(業務操作) 
    orderService.process();
    // 發送訂單處理成功消息(發送消息) 
    sendBizMsg ();
}

           

如果先發送訂單消息,再建立訂單;那麼就有可能消息發送成功,但是在訂單建立的時候卻失敗了,此時下遊系統卻認為這個訂單已經建立,也會出現髒資料。

public void processOrder() {
   // 發送訂單處理成功消息(發送消息) 
    sendBizMsg ();
    // 訂單處理(業務操作) 
    orderService.process();
}

           

一個錯誤的想法

此時可能有同學會想,我們可否将消息發送和業務處理放在同一個本地事務中來進行處理,如果業務消息發送失敗,那麼本地事務就復原,這樣是不是就能解決消息發送的一緻性問題呢?

@Transactionnal
public void processOrder() {
    try{
        // 訂單處理(業務操作) 
        orderService.process(); 
        // 發送訂單處理成功消息(發送消息) 
        sendBizMsg ();
    }catch(Exception e){
         事務復原;   
    }
}

           

消息發送的異常情況分析

可能的情況 一緻性
訂單處理成功,然後突然當機,事務未送出,消息沒有發送出去 一緻
訂單處理成功,由于網絡原因或者MQ當機,消息沒有發送出去,事務復原 一緻
訂單處理成功,消息發送成功,但是MQ由于其他原因,導緻消息存儲失敗,事務復原 一緻
訂單處理成功,消息存儲成功,但是MQ處理逾時,進而ACK确認失敗,導緻發送方本地事務復原 不一緻

從上面的情況分析,我們可以看到,使用普通的處理方式,無論如何,都無法保證業務處理與消息發送兩邊的一緻性,其根本的原因就在于:遠端調用,結果最終可能為成功、失敗、逾時;而對于逾時的情況,處理方最終的結果可能是成功,也可能是失敗,調用方是無法知曉的。 筆者就曾經在項目中出現類似的情況,調用方先在本地寫資料,然後發起RPC服務調用,但是處理方由于DB資料量比較大,導緻處理逾時,調用方在出現逾時異常後,直接復原本地事務,進而導緻調用方這邊沒資料,而處理方那邊資料卻已經寫入了,最終導緻兩邊業務資料的不一緻。為了保證兩邊資料的一緻性,我們隻能從其他地方尋找新的突破口。

事務消息

由于傳統的處理方式無法解決消息生成者本地事務處理成功與消息發送成功兩者的一緻性問題,是以事務消息就誕生了,它實作了消息生成者本地事務與消息發送的原子性,保證了消息生成者本地事務處理成功與消息發送成功的最終一緻性問題。

事務消息處理的流程

為了保證資料一緻性發送消息時先寫庫還是先發消息?
  1. 事務消息與普通消息的差別就在于消息生産環節,生産者首先預發送一條消息到MQ(這也被稱為發送half消息)
  2. MQ接收到消息後,先進行持久化,則存儲中會新增一條狀态為待發送的消息
  3. 然後傳回ACK給消息生産者,此時MQ不會觸發消息推送事件
  4. 生産者預發送消息成功後,執行本地事務
  5. 執行本地事務,執行完成後,發送執行結果給MQ
  6. MQ會根據結果删除或者更新消息狀态為可發送
  7. 如果消息狀态更新為可發送,則MQ會push消息給消費者,後面消息的消費和普通消息是一樣的

注意點:由于MQ通常都會保證消息能夠投遞成功,是以,如果業務沒有及時傳回ACK結果,那麼就有可能造成MQ的重複消息投遞問題。是以,對于消息最終一緻性的方案,消息的消費者必須要對消息的消費支援幂等,不能造成同一條消息的重複消費的情況。

事務消息異常情況分析

異常情況 一緻性 處理異常方法
消息未存儲,業務操作未執行 一緻
存儲待發送消息成功,但是ACK失敗,導緻業務未執行(可能是MQ處理逾時、網絡抖動等原因) 不一緻 MQ确認業務操作結果,處理消息(删除消息)
存儲待發送消息成功,ACK成功,業務執行(可能成功也可能失敗),但是MQ沒有收到生産者業務處理的最終結果 不一緻 MQ确認業務操作結果,處理消息(根據就業務處理結果,更新消息狀态,如果業務執行成功,則投遞消息,失敗則删除消息)
業務處理成功,并且發送結果給MQ,但是MQ更新消息失敗,導緻消息狀态依舊為待發送 不一緻 同上

支援事務消息的MQ

現在目前較為主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,隻有RocketMQ支援事務消息。據筆者了解,早年阿裡對MQ增加事務消息也是因為支付寶那邊因為業務上的需求而産生的。是以,如果我們希望強依賴一個MQ的事務消息來做到消息最終一緻性的話,在目前的情況下,技術選型上隻能去選擇RocketMQ來解決。上面我們也分析了事務消息所存在的異常情況,即MQ存儲了待發送的消息,但是MQ無法感覺到上遊處理的最終結果。對于RocketMQ而言,它的解決方案非常的簡單,就是其内部實作會有一個定時任務,去輪訓狀态為待發送的消息,然後給producer發送check請求,而producer必須實作一個check監聽器,監聽器的内容通常就是去檢查與之對應的本地事務是否成功(一般就是查詢DB),如果成功了,則MQ會将消息設定為可發送,否則就删除消息。

常見的問題

  1. 問:如果預發送消息失敗,是不是業務就不執行了?
  2. 答:是的,對于基于消息最終一緻性的方案,一般都會強依賴這步,如果這個步驟無法得到保證,那麼最終也 就不可能做到最終一緻性了。
  3. 問:為什麼要增加一個消息預發送機制,增加兩次釋出出去消息的重試機制,為什麼不在業務成功之後,發送失敗的話使用一次重試機制?
  4. 答:如果業務執行成功,再去發消息,此時如果還沒來得及發消息,業務系統就已經當機了,系統重新開機後,根本沒有記錄之前是否發送過消息,這樣就會導緻業務執行成功,消息最終沒發出去的情況。
  5. 如果consumer消費失敗,是否需要producer做復原呢?
  6. 答:這裡的事務消息,producer不會因為consumer消費失敗而做復原,采用事務消息的應用,其所追求的是高可用和最終一緻性,消息消費失敗的話,MQ自己會負責重推消息,直到消費成功。是以,事務消息是針對生産端而言的,而消費端,消費端的一緻性是通過MQ的重試機制來完成的。
  7. 如果consumer端因為業務異常而導緻復原,那麼豈不是兩邊最終無法保證一緻性?
  8. 答:基于消息的最終一緻性方案必須保證消費端在業務上的操作沒障礙,它隻允許系統異常的失敗,不允許業務上的失敗,比如在你業務上抛出個NPE之類的問題,導緻你消費端執行事務失敗,那就很難做到一緻了。

由于并非所有的MQ都支援事務消息,假如我們不選擇RocketMQ來作為系統的MQ,是否能夠做到消息的最終一緻性呢?答案是可以的。

基于本地消息的最終一緻性

為了保證資料一緻性發送消息時先寫庫還是先發消息?

基于本地消息的最終一緻性方案的最核心做法就是在執行業務操作的時候,記錄一條消息資料到DB,并且消息資料的記錄與業務資料的記錄必須在同一個事務内完成,這是該方案的前提核心保障。在記錄完成後消息資料後,後面我們就可以通過一個定時任務到DB中去輪訓狀态為待發送的消息,然後将消息投遞給MQ。這個過程中可能存在消息投遞失敗的可能,此時就依靠重試機制來保證,直到成功收到MQ的ACK确認之後,再将消息狀态更新或者消息清除;而後面消息的消費失敗的話,則依賴MQ本身的重試來完成,其最後做到兩邊系統資料的最終一緻性。

基于本地消息服務的方案雖然可以做到消息的最終一緻性,但是它有一個比較嚴重的弊端,每個業務系統在使用該方案時,都需要在對應的業務庫建立一張消息表來存儲消息。針對這個問題,我們可以将該功能單獨提取出來,做成一個消息服務來統一處理,因而就衍生出了我們下面将要讨論的方案。

獨立消息服務的最終一緻性

為了保證資料一緻性發送消息時先寫庫還是先發消息?

獨立消息服務最終一緻性與本地消息服務最終一緻性最大的差異就在于将消息的存儲單獨地做成了一個RPC的服務,這個過程其實就是模拟了事務消息的消息預發送過程,如果預發送消息失敗,那麼生産者業務就不會去執行,是以對于生産者的業務而言,它是強依賴于該消息服務的。不過好在獨立消息服務支援水準擴容,是以隻要部署多台,做成HA的叢集模式,就能夠保證其可靠性。

在消息服務中,還有一個單獨地定時任務,它會定期輪訓長時間處于待發送狀态的消息,通過一個check補償機制來确認該消息對應的業務是否成功,如果對應的業務處理成功,則将消息修改為可發送,然後将其投遞給MQ;如果業務處理失敗,則将對應的消息更新或者删除即可。是以在使用該方案時,消息生産者必須同時實作一個check服務,來供消息服務做消息的确認。對于消息的消費,該方案與上面的處理是一樣,都是通過MQ自身的重發機制來保證消息被消費。

總結:上遊事務送出之後,在基于MQ的場景下就不考慮復原了。失敗的可能是由于網絡、服務當機所導緻,文章中提到說業務上執行是無障礙的。如果下遊服務長時間沒有恢複,那麼就應該設定告警,在這裡有幾種機制來解決一些牛皮癬類型的問題,假如上遊消息始終發送失敗(這種可能性基本不存在除非代碼是假的)這種情況我們可以設定報警機制比如發生異常時可以列印日志,發送短信,發送郵件,将異常訂單儲存到資料庫,這些措施可以同時用于下遊一些異常訂單,同時也可以在發生異常的時候建立一個異常Topic的消息提示,讓人工來介入資料訂正。

不難發現,使用RocketMQ的事務消息具有以下好處:

将發送消息和本地事務分離開,如果發送消息失敗,則整個流程失敗,不會阻塞本地事務,如果本地事務執行失敗,則可以直接復原或者回查,不會影響消費者。

好了,關于RocketMQ的事務消息的實戰就介紹到這裡,歡迎大家批評指正。

繼續閱讀