若您對RocketMQ技術感興趣,請加入 RocketMQ技術交流群 本文将重點分析RocketMQ Broker如何處理事務消息送出、復原指令,根據前面的介紹,其入口EndTransactionProcessor#processRequest:
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // @2
if (result.getResponseCode() == ResponseCode.SUCCESS) { // @3
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); // @4
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // @5
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // @6
RemotingCommand sendResult = sendFinalMessage(msgInner); // @7
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @8
}
return sendResult;
}
return res;
}
}
代碼@1:如果請求為送出事務,進入事務消息送出處理流程。
代碼@2:送出消息,别被這名字誤導了,該方法主要是根據commitLogOffset從commitlog檔案中查找消息傳回OperationResult執行個體:

- private MessageExt prepareMessage :消息對象。
- private int responseCode:查找結果。
- private String responseRemark :錯誤提示。
代碼@3:如果成功查找到消息,則繼續處理,否則傳回給用戶端,消息未找到錯誤資訊。
代碼@4:驗證消息必要字段。
驗證消息的生産組與請求資訊中的生産者組是否一緻。
驗證消息的隊列偏移量(queueOffset)與請求資訊中的偏移量是否一緻。
驗證消息的commitLogOffset與請求資訊中的CommitLogOffset是否一緻。
代碼@5:調用endMessageTransaction方法,該方法主要的目的就是恢複事務消息的真實的主題、隊列,并設定事務ID。
代碼@6:設定消息的相關屬性,這一步應該直接在endMessageTransaction中實作就好,統一恢複原消息的數量,特别關注的是取消了事務相關的系統标記。
代碼@7:發送最終消息,其實作原理非常簡單,調用MessageStore将消息存儲在commitlog檔案中,此時的消息,會被轉發到原消息主題對應的消費隊列,被消費者消費。
代碼@8:删除預處理消息(prepare),其實是将消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經被處理(送出或復原)。
上述就是事務消息送出的流程,事務復原類似,接下來大概分析一下事務消息復原的流程。
EndTransactionProcessor#processRequest
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); // @1
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @2
}
return res;
}
}
代碼@1:復原消息,其實内部就是根據commitlogOffset查找消息。
代碼@2:将消息存儲在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表該消息已被處理,與送出事務消息不同的是,送出事務消息會将消息恢複原主題與隊列,再次存儲在commitlog檔案中。
事務消息在Broker服務端的送出復原流程就介紹到這了。其核心實作就是根據commitlogOffset找到消息,如果是送出動作,就恢複原消息的主題與隊列,再次存入commitlog檔案進而轉到消息消費隊列,供消費者消費,然後将原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;復原消息與送出事務消息不同的是,送出事務消息會将消息恢複原主題與隊列,再次存儲在commitlog檔案中。