天天看點

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

作者:網際網路進階架構師

首先說明本次源碼分析僅分析時間輪之前的延時消息設計

現在的RocketMQ已經支援基于時間輪的任意級别延時消息

延時消息基礎知識

預設RocketMQ延時消息共有18個等級

RocketMQ 5.x延時消息源碼分析(不包含時間輪)
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

延時消息的簡單使用

發送延時消息

DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }

        Message msg = new Message(TOPIC /* Topic */,
                TAG /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );

        msg.setDelayTimeLevel(2);
        producer.send(msg);           

和普通消息不同的我們新增了延時消息等級的設定

msg.setDelayTimeLevel(2);           

源碼分析

消息發送本身和普通消息發送沒有太大差别,但是新增了一個DELAY屬性

public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";           

消息發送的請求code是

public static final int SEND_BATCH_MESSAGE = 320;           

源碼入口

這裡的我們先從消息的發送開始

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

消息發送最終的請求狀态碼是

public static final int SEND_MESSAGE_V2 = 310;           

可以看到實際消息發送和普通消息沒有什麼差別,是以我們還是要去broker那邊看看有沒有什麼特殊處理

broker處理請求

消息處理方法 org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

實際的消息發送邏輯在方法

this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                        (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));           
RocketMQ 5.x延時消息源碼分析(不包含時間輪)

這裡我們進去這個方法看看

由于是分析延時消息,是以我們不關注消息發送的一些其他細節。主要關注延時消息和普通消息的處理差別 我們看方法

org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage           
RocketMQ 5.x延時消息源碼分析(不包含時間輪)

這裡可以看到現在的topic還是我們正常指定的xiao-zou-topic

但是這裡會執行多個消息的後置處理器putMessageHookList

我們看看putMessageHookList有哪些

public void registerMessageStoreHook() {
        List<PutMessageHook> putMessageHookList = messageStore.getPutMessageHookList();

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "checkBeforePutMessage";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                return HookUtils.checkBeforePutMessage(BrokerController.this, msg);
            }
        });

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "innerBatchChecker";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                if (msg instanceof MessageExtBrokerInner) {
                    return HookUtils.checkInnerBatch(BrokerController.this, msg);
                }
                return null;
            }
        });

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "handleScheduleMessage";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                if (msg instanceof MessageExtBrokerInner) {
                    return HookUtils.handleScheduleMessage(BrokerController.this, (MessageExtBrokerInner) msg);
                }
                return null;
            }
        });

        SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() {
            @Override
            public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) {
                return HookUtils.sendMessageBack(BrokerController.this, msgList, brokerName, brokerAddr);
            }
        };

        if (messageStore != null) {
            messageStore.setSendMessageBackHook(sendMessageBackHook);
        }
    }           

這裡可以看到主要是三個

  1. checkBeforePutMessage
  2. innerBatchChecker
  3. handleScheduleMessage

通過方法名我們可以很确定的看到主要是handleScheduleMessage這個方法出處理延時消息,是以我們重點看看handleScheduleMessage

handleScheduleMessage

  • org.apache.rocketmq.broker.util.HookUtils#handleScheduleMessage
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
        final MessageExtBrokerInner msg) {
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        // 事務消息 暫時不管
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            if (!isRolledTimerMessage(msg)) {
                if (checkIfTimerMessage(msg)) {
                    if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
                        //wheel timer is not enabled, reject the message
                        return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
                    }
                    PutMessageResult transformRes = transformTimerMessage(brokerController, msg);
                    if (null != transformRes) {
                        return transformRes;
                    }
                }
            }
            // Delay Delivery 延時消息
            if (msg.getDelayTimeLevel() > 0) {
                transformDelayLevelMessage(brokerController, msg);
            }
        }
        return null;
    }           

這裡的核心邏輯還是

if (msg.getDelayTimeLevel() > 0) {
                transformDelayLevelMessage(brokerController, msg);
            }           

我們看看transformDelayLevelMessage方法

public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
        // 判斷是否超過最大延時級别18 如果超過則設定為最大延時等級
        if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
        }

        // Backup real topic, queueId 備份真實的topic queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        // 設定延時消息為SCHEDULE_TOPIC_XXXX 這個固定的topic
        msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);    
        // 設定延時消息的queueID delayLevel - 1 
        msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
    }           

備份前的message資訊

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

備份後

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

總結就是

  1. 将原始topic替換為延遲消息固定的topic:SCHEDULE_TOPIC_XXXX
  2. 将原始queueid替換為delayLevel - 1
  3. 備份原始topic/queueid, 儲存到原始消息的properties屬性中

延時消息消費

我們通過檢視SCHEDULE_TOPIC_XXXX的調用發現有一個方法

  • org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
RocketMQ 5.x延時消息源碼分析(不包含時間輪)

我們檢視調用鍊

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

發現很明顯應該是使用了一個定時器去執行的,我們可以看看

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

這裡可以看到把18個延時等級的任務都加進去了

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

deliverExecutorService的核心線程數也是18個

this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));           

我們這裡看看丢進去的DeliverDelayedMessageTimerTask的任務裡面裡面的邏輯

  • org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

核心邏輯都被封裝在executeOnTimeup

我們進去看看executeOnTimeup

代碼邏輯有點長我們,慢慢看

public void executeOnTimeup() {
            // 根據延遲topic和延遲queueid 去擷取Consumequeue
            ConsumeQueueInterface cq =
                ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 如果 Consumequeue 為空則新增一個 DeliverDelayedMessageTimerTask 丢到 deliverExecutorService再定時執行,延時時間預設為100毫秒
            // Consumequeue存在但是沒有消費檔案 一樣延時100毫秒後繼續重複執行
            if (cq == null) {
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
                return;
            }

            ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
            if (bufferCQ == null) {
                long resetOffset;
                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
                        this.offset, resetOffset, cq.getQueueId());
                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
                        this.offset, resetOffset, cq.getQueueId());
                } else {
                    resetOffset = this.offset;
                }

                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
                return;
            }

            long nextOffset = this.offset;
            try {
                while (bufferCQ.hasNext() && isStarted()) {
                    CqUnit cqUnit = bufferCQ.next();
                    long offsetPy = cqUnit.getPos();
                    int sizePy = cqUnit.getSize();
                    // 這裡的 tagCode實際是投遞時間
                    long tagsCode = cqUnit.getTagsCode();

                    if (!cqUnit.isTagsCodeValid()) {
                        //can't find ext content.So re compute tags code.
                        log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                            tagsCode, offsetPy, sizePy);
                        long msgStoreTime = ScheduleMessageService.this.brokerController.getMessageStore().getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                        tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                    }

                    long now = System.currentTimeMillis();
                    // 延時消息時間校驗 如果超過deliverTimestamp now+delayLevel 時間則矯正為 now+delayLevel
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    long currOffset = cqUnit.getQueueOffset();
                    assert cqUnit.getBatchNum() == 1;
                    nextOffset = currOffset + cqUnit.getBatchNum();

                    long countdown = deliverTimestamp - now;
                    // 如果延時時間超過目前時間,證明還未到消息處理時間
                    if (countdown > 0) {
                        this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);      
                        ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
                        return;
                    }
                    // 加載出延時消息
                    MessageExt msgExt = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(offsetPy, sizePy);
                    if (msgExt == null) {
                        continue;
                    }
                    // 建構新的消息體,将原來的消息資訊設定到這裡,并将topic和queueid設定為原始的topic和queueid(前面備份過)
                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                        continue;
                    }

                    boolean deliverSuc;
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
                    } else {
                        // 将消息寫入到 commitlog中
                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
                    }

                    if (!deliverSuc) {
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                }
            } catch (Exception e) {
                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
            } finally {
                bufferCQ.release();
            }

            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }           

上面的代碼我們将核心邏輯做了注釋,我們總結一下大緻流程

  1. 校驗對應的延時等級消息是否存在ConsumeQueue,如果不存在則延時100ms繼續輪訓
  2. 校驗ConsumeQueue中的索引是否存在,如果不存在則延時100ms繼續輪訓
  3. 校驗索引裡面的tagsCode 實際是投遞時間是否到了需要投遞的時間,如果是則取出延時消息
  4. 将延時消息還原為原始消息,投遞到commitLog中繼續消費

這裡我們通過debug看看消息轉換前後的參數

轉換前的延時消息

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

轉換後的原始消息

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

消息的重新投遞是通過syncDeliver方法

deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);           
private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
            int sizePy) {
            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
            PutMessageResult result = resultProcess.get();
            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
            if (sendStatus) {
                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
            }
            return sendStatus;
        }           

如果成功則更新延時消息的消費進度,注意延時消息的消費進度是存儲在

private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
        new ConcurrentHashMap<>(32);           

實際的配置檔案是delayOffset.json

RocketMQ 5.x延時消息源碼分析(不包含時間輪)

自此RocketMQ的延時消息我們就分析完了

總結

總的來說延時消息就是預設分為18個隊列,然後啟動18個線程一直去掃描這18個隊列。如果沒有消息就過100毫秒繼續重複掃描,周而複始 如果掃描到消息則将延時消息Topic:SCHEDULE_TOPIC_XXXX中的消息取出來,然後重新轉換為原始消息,包括原始消息的Topic和queueId,然後重新寫入到commitLog中

作者:小奏技術

連結:https://juejin.cn/post/7262372539522400317

來源:稀土掘金