天天看點

RocketMQ延遲消息的代碼實戰及原理分析

RocketMQ是一款開源的分布式消息系統,基于高可用分布式叢集技術,提供低延時的、高可靠、萬億級容量、靈活可伸縮的消息釋出與訂閱服務。

RocketMQ簡介

它前身是MetaQ,是阿裡基于Kafka的設計使用Java進行自主研發的。在2012年,阿裡将其開源, 在2016年,阿裡将其捐獻給Apache軟體基金會(Apache Software Foundation,簡稱為ASF),正式成為孵化項目。2017 年,Apache軟體基金會宣布RocketMQ已孵化成為 Apache頂級項目(Top Level Project,簡稱為TLP ),是國内首個網際網路中間件在 Apache上的頂級項目。

延遲消息

生産者把消息發送到消息隊列中以後,并不期望被立即消費,而是等待指定時間後才可以被消費者消費,這類消息通常被稱為延遲消息。

在RocketMQ中,支援延遲消息,但是不支援任意時間精度的延遲消息,隻支援特定級别的延遲消息。如果要支援任意時間精度,不能避免在Broker層面做消息排序,再涉及到持久化的考量,那麼消息排序就不可避免産生巨大的性能開銷。

消息延遲級别分别為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級别。在發送消息時,設定消息延遲級别即可,設定消息延遲級别時有以下3種情況:

  1. 設定消息延遲級别等于0時,則該消息為非延遲消息。
  2. 設定消息延遲級别大于等于1并且小于等于18時,消息延遲特定時間,如:設定消息延遲級别等于1,則延遲1s;設定消息延遲級别等于2,則延遲5s,以此類推。
  3. 設定消息延遲級别大于18時,則該消息延遲級别為18,如:設定消息延遲級别等于20,則延遲2h。

文章持續更新,微信搜尋「

萬貓學社

第一時間閱讀,關注後回複「

電子書

」,免費擷取12本Java必讀技術書籍。

延遲消息示例

首先,寫一個消費者,用于消費延遲消息:

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 執行個體化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");

        // 設定NameServer的位址
        consumer.setNamesrvAddr("localhost:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe("OneMoreTopic", "*");
        // 注冊回調實作類來處理從broker拉取回來的消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s %s Receive New Messages:%n"
                    , sdf.format(new Date())
                    , Thread.currentThread().getName());
            for (MessageExt msg : msgs) {
                System.out.printf("\tMsg Id: %s%n", msg.getMsgId());
                System.out.printf("\tBody: %s%n", new String(msg.getBody()));
            }
            // 标記該消息已經被成功消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 啟動消費者執行個體
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
           

再寫一個延遲消息的生産者,用于發送延遲消息:

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 執行個體化消息生産者Producer
        DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");
        // 設定NameServer的位址
        producer.setNamesrvAddr("localhost:9876");
        // 啟動Producer執行個體
        producer.start();

        Message msg = new Message("OneMoreTopic"
                , "DelayMessage", "This is a delay message.".getBytes());

        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        //設定消息延遲級别為3,也就是延遲10s。
        msg.setDelayTimeLevel(3);

        // 發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 通過sendResult傳回消息是否成功送達
        System.out.printf("%s Send Status: %s, Msg Id: %s %n"
                , sdf.format(new Date())
                , sendResult.getSendStatus()
                , sendResult.getMsgId());

        // 如果不再發送消息,關閉Producer執行個體。
        producer.shutdown();
    }
}
           

運作生産者以後,就會發送一條延遲消息:

10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000
           

10秒鐘後,消費者收到的這條延遲消息:

10:37:25.026 ConsumeMessageThread_1 Receive New Messages:
	Msg Id: C0A8006D5AB018B4AAC216E0DB690000
	Body: This is a delay message.
           

延遲消息的原理分析

以下分析的RocketMQ源碼的版本号是4.7.1,版本不同源碼略有差别。

CommitLog

在org.apache.rocketmq.store.CommitLog中,針對延遲消息做了一些處理:

// 延遲級别大于0,就是延時消息
if (msg.getDelayTimeLevel() > 0) {
    // 判斷目前延遲級别,如果大于最大延遲級别,
    // 就設定目前延遲級别為最大延遲級别。
    if (msg.getDelayTimeLevel() > this.defaultMessageStore
            .getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore
                .getScheduleMessageService().getMaxDelayLevel());
    }

    // 擷取延遲消息的主題,
    // 其中RMQ_SYS_SCHEDULE_TOPIC的值為SCHEDULE_TOPIC_XXXX
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    // 根據延遲級别擷取延遲消息的隊列Id,
    // 隊列Id其實就是延遲級别減1
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // 備份真正的主題和隊列Id
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    // 設定延時消息的主題和隊列Id
    msg.setTopic(topic);
    msg.setQueueId(queueId);
}
           

可以看到,每一個延遲消息的主題都被暫時更改為SCHEDULE_TOPIC_XXXX,并且根據延遲級别延遲消息變更了新的隊列Id。接下來,處理延遲消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。

ScheduleMessageService

ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore進行初始化的,初始化包括構造對象和調用

load

方法。最後,再執行ScheduleMessageService的

start

方法:

public void start() {
    // 使用AtomicBoolean確定start方法僅有效執行一次
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        // 周遊所有延遲級别
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // key為延遲級别
            Integer level = entry.getKey();
            // value為延遲級别對應的毫秒數
            Long timeDelay = entry.getValue();
            // 根據延遲級别獲得對應隊列的偏移量
            Long offset = this.offsetTable.get(level);
            // 如果偏移量為null,則設定為0
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 為每個延遲級别建立定時任務,
                // 第一次啟動任務延遲為FIRST_DELAY_TIME,也就是1秒
                this.timer.schedule(
                        new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        // 延遲10秒後每隔flushDelayOffsetInterval執行一次任務,
        // 其中,flushDelayOffsetInterval預設配置也為10秒
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    // 持久化每個隊列消費的偏移量
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore
        	.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}
           

周遊所有延遲級别,根據延遲級别獲得對應隊列的偏移量,如果偏移量不存在,則設定為0。然後為每個延遲級别建立定時任務,第一次啟動任務延遲為1秒,第二次及以後的啟動任務延遲才是延遲級别相應的延遲時間。

然後,又建立了一個定時任務,用于持久化每個隊列消費的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進行配置,預設為10秒。

定時任務

ScheduleMessageService的

start

方法執行之後,每個延遲級别都建立自己的定時任務,這裡的定時任務的具體實作就在DeliverDelayedMessageTimerTask類之中,它核心代碼是executeOnTimeup方法之中,我們來看一下主要部分:

// 根據主題和隊列Id擷取消息隊列
ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
                TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
                , delayLevel2QueueId(delayLevel));
           

如果沒有擷取到對應的消息隊列,則在DELAY_FOR_A_WHILE(預設為100)毫秒後再執行任務。如果擷取到了,就繼續執行下面操作:

// 根據消費偏移量從消息隊列中擷取所有有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
           

如果沒有擷取到有效消息,則在DELAY_FOR_A_WHILE(預設為100)毫秒後再執行任務。如果擷取到了,就繼續執行下面操作:

// 周遊所有消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    // 擷取消息的實體偏移量
    long offsetPy = bufferCQ.getByteBuffer().getLong();
    // 擷取消息的實體長度
    int sizePy = bufferCQ.getByteBuffer().getInt();
    long tagsCode = bufferCQ.getByteBuffer().getLong();
    
    // 省略部分代碼...

    long now = System.currentTimeMillis();
    // 計算消息應該被消費的時間
    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
	// 計算下一條消息的偏移量
    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)

	long countdown = deliverTimestamp - now;
    // 省略部分代碼...
}
           

如果目前消息不到消費的時間,則在

countdown

毫秒後再執行任務。如果到消費的時間,就繼續執行下面操作:

// 根據消息的實體偏移量和大小擷取消息
MessageExt msgExt =
    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
            offsetPy, sizePy);
           

如果擷取到消息,則繼續執行下面操作:

// 重新建構新的消息,包括:
// 1.清除消息的延遲級别
// 2.恢複真正的消息主題和隊列Id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
    log.error("[BUG] the real topic of schedule msg is {},"
                    + " discard the msg. msg={}",
            msgInner.getTopic(), msgInner);
    continue;
}
// 重新把消息發送到真正的消息隊列上
PutMessageResult putMessageResult =
        ScheduleMessageService.this.writeMessageStore
                .putMessage(msgInner);
           

清除了消息的延遲級别,并且恢複了真正的消息主題和隊列Id,重新把消息發送到真正的消息隊列上以後,消費者就可以立即消費了。

總結

經過以上對源碼的分析,可以總結出延遲消息的實作步驟:

  1. 如果消息的延遲級别大于0,則表示該消息為延遲消息,修改該消息的主題為SCHEDULE_TOPIC_XXXX,隊列Id為延遲級别減1。
  2. 消息進入SCHEDULE_TOPIC_XXXX的隊列中。
  3. 定時任務根據上次拉取的偏移量不斷從隊列中取出所有消息。
  4. 根據消息的實體偏移量和大小再次擷取消息。
  5. 根據消息屬性重新建立消息,清除延遲級别,恢複原主題和隊列Id。
  6. 重新發送消息到原主題的隊列中,供消費者進行消費。

微信公衆号:萬貓學社

微信掃描二維碼

關注後回複「 電子書」

擷取12本Java必讀技術書籍

作者:萬貓學社

出處:http://www.cnblogs.com/heihaozi/

版權聲明:本文遵循 CC 4.0 BY-NC-SA 版權協定,轉載請附上原文出處連結和本聲明。

微信掃描二維碼,關注

,回複「

繼續閱讀