天天看點

RocketMQ的事務消息原理及源碼解讀

目錄

1:實作思想

2:事務消息發送流程

3:消息送出,復原

4:回查事務狀态

5:總結

1:實作思想

RocketMQ事務消息的實作原理是基于兩階段送出(可以去了解一下XA)和定時事務狀态回查來決定消息最終是送出還是復原。一般地,應用程式在事務内完成相應的DB後,需要同步來調用mq相關的接口來發送消息,發送狀态為prepare的消息(筆者稱之為預消息),消息發送成功後,Rocketmq伺服器會回調RocketMQ消息發送者的事件監聽程式,記錄消息的本地事務狀态(相當于打一個标),該标與本地業務操作同屬一個事務,確定消息發送與本地事務的原子性。

mq收到prepare的消息,會先備份消息的原主題和原消息消費隊列(消息的載體),然後将消息存儲為主題為 RMQ_SYS_TRANS_HALF_TOPIC(筆者稱之為半主題)的消息消費隊列中。

RocketMQ消息伺服器開啟一個定時任務,消費RMQ_SYS_TRANS_HALF_TOPIC消息隊列的中消息,向消息發送端發起事務狀态回查,應用程式根據儲存的事務狀态來回饋消息伺服器事務的狀态(commit,rollback,unkown),如果是送出或者是復原,則服務送出或者復原消息,如果是未知的消息,等待下一次回查,(為什麼會存在未知的消息,一次輪詢過去,而這個消息剛剛出來,還沒有被消費),RocketMQ允許一條消息的回查間隔(輪詢周期)與回查次數(超過多少次復原),如果達到了n(最大回查次數)次,将會復原消息

2:事務消息發送流程

1:入口很重要:發送事務消息的開始和普通的消息有點不同,事務消息的producer是TransactionMQProducer,如果看過筆者之前寫過的話,應該知道是DefaultMQProducer,其實TransactionMQProducer是DefaultMQProducer的增強,在TransactionMQProducer的内部有一個transactionlistener,主要實作了本地事務狀态執行,和本地事務事務回查兩個接口

RocketMQ的事務消息原理及源碼解讀

2:事務消息發送依賴的是TransactionMQProducer

@Override
//通俗易懂,在事務中發送消息
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
//這裡必須要依賴事務監聽器,要是這個都沒有話,相當于不可能存在事務狀态回查和本地事務執行
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }
//設定系統的半topic
        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
//最後還是調用的預設的mqProducer
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
           

3:設定消息的主題,和生産者組

public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        //設定消息為事務消息,标記頭
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        //設定消息的生産者組
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
           

4:執行本地事務

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    //從消息中擷取事務id,為後續事務回查提供依據
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        //執行本地事務
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    //擷取事務狀态
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
                    //事務狀态不是 送出狀态,列印日志,等待下一次輪詢
                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
           

5:check消息,發送消息

//是否設定了事務消息
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    //如果是事務消息的話,會給消息一個 PRE 的标志,後續會添加一個事務消息的專用處理器
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
           
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        //看看是不是事務消息
        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
                return response;
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }

        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
           

6:由于上述的消息的主題已經被改變成一個系統的主題,并且放入到對應的消息隊列中,那麼在消費的時候肯定是一個專門的線程去消費這個消息。

3:消息送出,復原

1:在事務消息結束(沒有真正的結束,隻是一階段的結束)事務消息的送出或者是復原均由sendResult這個字段辨別來确定,對應的處理器為org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest,因為涉及到傳輸效率,在RocketMq中大量的使用Netty來做為io架構,是以涉及到資料的加解碼(就是各種handler啦)

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
                //加解碼
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.info("Transaction request:{}", requestHeader);
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }
           

2:确定是否送出或者是復原事務

perationResult result = new OperationResult();
        //送出事務
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                //傳回成功
                if (res.getCode() == ResponseCode.SUCCESS) {
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    //獲得事務消息的偏移量,然後要将此消息放入到commitlog中進行檔案的存儲,最終是要存盤的
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    //存儲時間戳,一般用于消息的時間查找
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
            //復原事務
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
           

4:回查事務狀态

1:可以很絕對的認為,在RocketMQ中的事務消息隻會存在兩種狀态(分别是 commit,rollback,)至于unkown的話隻能做為一個中間的狀态,也就是說,随着時間的推移,這種狀态是一定會不存在的,要是真的不行,那就搬出來微積分--極限思想,世間萬物最終歸于平靜,其實大數定律也是如此(扯得有點哲學的味道了,偏題了,偏題了)。

2:定時任務去掃描HALF_TOPIC,回查消息的事務狀态

protected void onWaitEnd() {
        //逾時時間
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        //最大的配置檢測時間
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        //目前時間
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }
           

3:拉取HALF_TOPIC主題下的所有消息隊列,然後依次處理

public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                MessageQueue opQueue = getOpQueue(messageQueue);
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
           

4:事務回查實作

@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
           

5:總結

在RocketMQ中事務消息底層包裝的是普通消息,在發送的時候更換其TOPIC,和對應的消息隊列,然後有一個線程來消費這個消息隊列上面的消息,分為兩階段送出,第一階段是本地執行,第二階段才是真正的送出,不過第二階段的送出也是由條件的,而是通過事務狀态回查的方式的來是實作的,最後會更新事務消息的ID,已達到将原來的消息更換成普通的消息進行發送,接着就會存入磁盤中持久化。

繼續閱讀