天天看點

RocketMQ源碼分析之RocketMQ事務消息實作原下篇(事務送出或復原)

若您對RocketMQ技術感興趣,請加入 RocketMQ技術交流群 本文将重點分析RocketMQ Broker如何處理事務消息送出、復原指令,根據前面的介紹,其入口EndTransactionProcessor#processRequest:

OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {        // @1
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);    // @2
      if (result.getResponseCode() == ResponseCode.SUCCESS) {  // @3
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);    // @4
          if (res.getCode() == ResponseCode.SUCCESS) {
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());     // @5
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());    // @6
                RemotingCommand sendResult = sendFinalMessage(msgInner);                              // @7
                if (sendResult.getCode() == ResponseCode.SUCCESS) {             
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());    // @8
                }
                return sendResult;
           }
          return res;
     }
}           

代碼@1:如果請求為送出事務,進入事務消息送出處理流程。

代碼@2:送出消息,别被這名字誤導了,該方法主要是根據commitLogOffset從commitlog檔案中查找消息傳回OperationResult執行個體:

RocketMQ源碼分析之RocketMQ事務消息實作原下篇(事務送出或復原)
  • private MessageExt prepareMessage :消息對象。
  • private int responseCode:查找結果。
  • private String responseRemark :錯誤提示。

代碼@3:如果成功查找到消息,則繼續處理,否則傳回給用戶端,消息未找到錯誤資訊。

代碼@4:驗證消息必要字段。

驗證消息的生産組與請求資訊中的生産者組是否一緻。

驗證消息的隊列偏移量(queueOffset)與請求資訊中的偏移量是否一緻。

驗證消息的commitLogOffset與請求資訊中的CommitLogOffset是否一緻。

代碼@5:調用endMessageTransaction方法,該方法主要的目的就是恢複事務消息的真實的主題、隊列,并設定事務ID。

代碼@6:設定消息的相關屬性,這一步應該直接在endMessageTransaction中實作就好,統一恢複原消息的數量,特别關注的是取消了事務相關的系統标記。

代碼@7:發送最終消息,其實作原理非常簡單,調用MessageStore将消息存儲在commitlog檔案中,此時的消息,會被轉發到原消息主題對應的消費隊列,被消費者消費。

代碼@8:删除預處理消息(prepare),其實是将消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經被處理(送出或復原)。

上述就是事務消息送出的流程,事務復原類似,接下來大概分析一下事務消息復原的流程。

EndTransactionProcessor#processRequest
 else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
       result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);    // @1
       if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());   // @2
            }
           return res;
       }
}           

代碼@1:復原消息,其實内部就是根據commitlogOffset查找消息。

代碼@2:将消息存儲在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表該消息已被處理,與送出事務消息不同的是,送出事務消息會将消息恢複原主題與隊列,再次存儲在commitlog檔案中。

事務消息在Broker服務端的送出復原流程就介紹到這了。其核心實作就是根據commitlogOffset找到消息,如果是送出動作,就恢複原消息的主題與隊列,再次存入commitlog檔案進而轉到消息消費隊列,供消費者消費,然後将原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;復原消息與送出事務消息不同的是,送出事務消息會将消息恢複原主題與隊列,再次存儲在commitlog檔案中。