天天看點

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

作者:架構師之道

在 RocketMQ 4.x 版本,使用延時消息來實作消息的定時消費。延時消息可以一定程度上實作定時發送,但是有一些局限。

RocketMQ 新版本基于時間輪算法引入了定時消息,目前,精确到秒級的定時消息實作的 pr 已經送出到社群,今天來介紹一下。

1 延時消息

1.1 簡介

RocketMQ 的延時消息是指 Producer 發送消息後,Consumer 不會立即消費,而是需要等待固定的時間才能消費。在一些場景下,延時消息是很有用的,比如電商場景下關閉 30 分鐘内未支付的訂單。

使用延時消息非常簡單,隻需要給消息的 delayTimeLevel 屬性指派就可以。參考下面代碼:

Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//第 3 個級别,10s
message.setDelayTimeLevel(3);
producer.send(message);
           

延時消息有 18 個級别,如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
           

1.2 實作原理

延時消息的實作原理如下圖:

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

Producer 把消息發送到 Broker 後,Broker 判斷到是延時消息,首先會把消息投遞到延時隊列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定時任務線程池會有 18 個線程來對延時隊列進行排程,每個線程排程一個延時級别,排程任務把延時消息再投遞到原始隊列,這樣 Consumer 就可以拉取到了。

1.3 存在不足

延時消息存在着一些不足:

1.延時級别隻有 18 個,并不能滿足所有場景;

2.如果通過修改 messageDelayLevel 配置來自定義延時級别,并不靈活,比如一個在大規模的平台上,延時級别成百上千,而且随時可能增加新的延時時間;

3.延時時間不準确,背景的定時線程可能會因為處理消息量大導緻延時誤差大。

2 定時消息

為了彌補延時消息的不足,RocketMQ 5.0 引入了定時消息。

2.1 時間輪算法

為了解決定時任務隊列周遊任務導緻的性能開銷,RocketMQ 定時消息引入了秒級的時間輪算法。如下圖:

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

圖中是一個 60s 的時間輪,時間輪上會有一個指向目前時間的指針定時地移動到下一個時間(秒級)。

時間輪算法的優勢是不用去周遊所有的任務,每一個時間節點上的任務用連結清單串起來,當時間輪上的指針移動到目前的時間時,這個時間節點上的全部任務都執行。

雖然上面隻是一個 60s 的時間輪,但是對于所有的時間延時,都是支援的。可以在每個時間節點增加一個 round 字段,記錄時間輪轉動的圈數,比如對于延時 130s 的任務,round 就是 2,放在第 10 個時間刻度的連結清單中。這樣當時間輪轉到一個節點,執行節點上的任務時,首先判斷 round 是否等于 0,如果等于 0,則把這個任務從任務連結清單中移出交給異步線程執行,否則将 round 減 1 繼續檢查後面的任務。

2.2 使用方式

基于時間輪算法的思想,RocketMQ 實作了精準的定時消息。使用 RocketMQ 定時消息時,用戶端定義消息的示例代碼如下:

org.apache.rocketmq.common.message.Message messageExt = this.sendMessageActivity.buildMessage(null,
 Lists.newArrayList(
  Message.newBuilder()
   .setTopic(Resource.newBuilder()
    .setName(TOPIC)
    .build())
   .setSystemProperties(SystemProperties.newBuilder()
    .setMessageId(msgId)
    .setQueueId(0)
    .setMessageType(MessageType.DELAY)
    .setDeliveryTimestamp(Timestamps.fromMillis(deliveryTime))
    //定義消息投遞時間
    .setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
    .setBornHost(StringUtils.defaultString(RemotingUtil.getLocalAddress(), "127.0.0.1:1234"))
    .build())
   .setBody(ByteString.copyFromUtf8("123"))
   .build()
 ),
Resource.newBuilder().setName(TOPIC).build()).get(0);
           

2.3 實作原理

2.3.1 消息投遞

上面的代碼構中,Producer 建立消息時給消息傳了一個系統屬性 deliveryTimestamp,這個屬性指定了消息投遞的時間,并且封裝到消息的 TIMER_DELIVER_MS 屬性,代碼如下:

protected void fillDelayMessageProperty(apache.rocketmq.v2.Message message, org.apache.rocketmq.common.message.Message messageWithHeader) {
 if (message.getSystemProperties().hasDeliveryTimestamp()) {
  Timestamp deliveryTimestamp = message.getSystemProperties().getDeliveryTimestamp();
  //delayTime 這個延時時間預設不能超過 1 天,可以配置
  long deliveryTimestampMs = Timestamps.toMillis(deliveryTimestamp);
  validateDelayTime(deliveryTimestampMs);
        //...
  String timestampString = String.valueOf(deliveryTimestampMs);
  //MessageConst.PROPERTY_TIMER_DELIVER_MS="TIMER_DELIVER_MS"
  MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_TIMER_DELIVER_MS, timestampString);
 }
}
           

Broker 收到這個消息後,如果判斷到 TIMER_DELIVER_MS 這個屬性有值,就會把這個消息投遞到 Topic 是 rmq_sys_wheel_timer 的隊列中,queueId 是 0,同時會儲存原始消息的 Topic、queueId、投遞時間(TIMER_OUT_MS)。

TimerMessageStore 中有個定時任務 TimerEnqueueGetService 會從 rmq_sys_wheel_timer 這個 Topic 中讀取消息,然後封裝 TimerRequest 請求并放到隊列 enqueuePutQueue。

2.3.2 綁定時間輪

RocketMQ 使用 TimerLog 來儲存消息的原始資料綁定到時間輪上。首先看一下 TimerLog 儲存的資料結構,如下圖:

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

參考下面代碼:

//TimerMessageStore類
ByteBuffer tmpBuffer = timerLogBuffer;
tmpBuffer.clear();
tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
tmpBuffer.putLong(slot.lastPos); //prev pos
tmpBuffer.putInt(magic); //magic
tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
tmpBuffer.putLong(offsetPy); //offset
tmpBuffer.putInt(sizePy); //size
tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
tmpBuffer.putLong(0); //reserved value, just set to 0 now
long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
if (-1 != ret) {
 // If it's a delete message, then slot's total num -1
 // TODO: check if the delete msg is in the same slot with "the msg to be deleted".
 timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
  isDelete ? slot.num - 1 : slot.num + 1, slot.magic);

}
           

TimerEnqueuePutService 這個定時任務從上面的 enqueuePutQueue(2.3.1節) 取出 TimerRequest 然後封裝成 TimerLog。

那時間輪是怎麼跟 TimerLog 關聯起來的呢?RocketMQ 使用 TimerWheel 來描述時間輪,TimerWheel 中每一個時間節點是一個 Slot,Slot 儲存了這個延時時間的 TimerLog 資訊。資料結構如下圖:

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

參考下面代碼:

//類 TimerWheel
public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) {
 localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
 localBuffer.get().putLong(timeMs / precisionMs);
 localBuffer.get().putLong(firstPos);
 localBuffer.get().putLong(lastPos);
 localBuffer.get().putInt(num);
 localBuffer.get().putInt(magic);
}
           

這樣時間輪跟 TimerLog 就關聯起來了,見下圖:

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

如果時間輪的一個時間節點(Slot)上有一條新的消息到來,那隻要建立一個 TimerLog,然後把它的指針指向該時間節點的最後一個 TimerLog,然後把 Slot 的 lastPos 屬性指向建立的這個 TimerLog,如下圖:

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

從源碼上看,RocketMQ 定義了一個 7 天的以秒為機關的時間輪。

2.3.3 時間輪轉動

轉動時間輪時,TimerDequeueGetService 這個定時任務從目前時間節點(Slot)對應的 TimerLog 中取出資料,封裝成 TimerRequest 放入 dequeueGetQueue 隊列。

2.3.4 CommitLog 中讀取消息

定時任務 TimerDequeueGetMessageService 從隊列 dequeueGetQueue 中拉取 TimerRequest 請求,然後根據 TimerRequest 中的參數去 CommitLog(MessageExt) 中查找消息,查出後把消息封裝到 TimerRequest 中,然後把 TimerRequest 寫入 dequeuePutQueue 這個隊列。

2.3.5 寫入原隊列

定時任務 TimerDequeuePutMessageService 從 dequeuePutQueue 隊列中擷取消息,把消息轉換成原始消息,投入到原始隊列中,這樣消費者就可以拉取到了。

3 總結

RocketMQ 4.x 版本隻支援延時消息,有一些局限性。而 RocketMQ 新版本引入了定時消息,彌補了延時消息的不足。定時消息的處理流程如下圖:

彌補延時消息的不足,RocketMQ 基于時間輪算法實作了定時消息

可以看到,RocketMQ 的定時消息的實作還是有一定複雜度的,這裡用到 5 個定時任務和 3 個隊列來實作。

最後,對于定時時間的定義,用戶端、Broker 和時間輪的預設最大延時時間定義是不同的,使用的時候需要注意。

來源:https://mp.weixin.qq.com/s/I91QRel-7CraP7zCRh0ISw

作者:朱晉君

繼續閱讀