天天看點

Apache Pulsar 延遲消息投遞解析

一、什麼是延遲消息投遞

延遲消息投遞在MQ應用場景中十分普遍,它是指消息在發送到 MQ 服務端後并不會立馬投遞,而是根據消息中的屬性延遲固定時間後才投遞給消費者,一般分為定時消息和延遲消息兩種:

  • 定時消息:Producer 将消息發送到 MQ 服務端,但并不期望這條消息立馬投遞,而是推遲到在目前時間點之後的某一個時間投遞到 Consumer 進行消費。
  • 延遲消息:Producer 将消息發送到 MQ 服務端,但并不期望這條消息立馬投遞,而是延遲一定時間後才投遞到 Consumer 進行消費。

目前在業界,騰訊雲的 CMQ 和阿裡雲的 RocketMQ 也都支援延遲消息投遞:

  • CMQ:将消息延遲期間定義為”飛行狀态“,可通過設定 DelaySeconds 配置延遲範圍,取值範圍為 0 - 3600 秒,即消息最長不可見時長為 1 小時。
  • RocketMQ:開源版本延遲消息臨時存儲在一個内部主題中,支援特定的 level,例如定時 5s,10s,1m 等,商業版本支援任意時間精度。

開源的 NSQ、RabbitMQ、ActiveMQ 和 Pulsar 也都内置了延遲消息的處理能力。雖然每個 MQ 項目的使用和實作方式不同,但核心實作思路都一樣:Producer 将一個延遲消息發送到某個 Topic 中,Broker 将延遲消息放到臨時存儲進行暫存,延遲跟蹤服務(Delayed Tracker Service)會檢查消息是否到期,将到期的消息進行投遞。

二、延遲消息投遞的使用場景

延遲消息投遞是要暫緩對目前消息的處理,在未來的某個時間點再觸發投遞,實際的應用場景非常多,比如異常檢測重試、訂單逾時取消、預約提醒等。

  • 服務請求異常,需要将異常請求放到單獨的隊列,隔 5 分鐘後進行重試;
  • 使用者購買商品,但一直處于未支付狀态,需要定期提醒使用者支付,逾時則關閉訂單;
  • 面試或者會議預約,在面試或者會議開始前半小時,發送通知再次提醒;

TDMQ 最近就有個使用 Pulsar 延遲消息的 Case:業務要對兩套系統的日志消息進行關聯,其中一套系統由于查詢 Hbase 可能會逾時或失敗,需要将失敗的關聯任務在叢集空閑的時候再次排程。

三、如何使用Pulsar延遲消息投遞

Pulsar 最早是在 2.4.0 引入了延遲消息投遞的特性,在 Pulsar 中使用延遲消息,可以精确指定延遲投遞的時間,有 deliverAfter 和 deliverAt 兩種方式。其中 deliverAt 可以指定具體的時間戳;deliverAfter 可以指定在目前多長時間後執行。兩種方式的本質是一樣的,Client 會計算出時間戳送到 Broker。

1. deliverAfter發送

producer.newMessage()        .deliverAfter(long time, TimeUnit unit)        .send();      

2. deliverAt發送

producer.newMessage()        .deliverAt(long timestamp)        .send();      

在 Pulsar 中,可以支援跨度很大的延時消息,比方說一個月、半年;同時在一個 Topic 裡,既支援延時消息,也支援非延時消息。下圖展示了 Pulsar 中延遲消息的具體過程:

producer 發送的 m1/m3/m4/m5 有不同的延遲時間,m2 是不需要延遲投遞的正常消息,consumer 消費時會根據不同的延遲時間進行 ack。

四、Pulsar延遲消息投遞實作原理

從上面的使用方式可以看出,Pulsar 支援的是秒級精度的延遲消息投遞,不同于開源 RocketMQ 支援固定時間 level 的延遲。

Pulsar 實作延遲消息投遞的方式比較簡單,所有延遲投遞的消息會被 Delayed Message Tracker 記錄對應的 index。index 是由 timestamp | LedgerID | EntryID 三部分組成,其中 LedgerID | EntryID 用于定位該消息,timestamp 除了記錄需要投遞的時間,還用于 delayed index 優先級隊列排序。

Delayed Message Tracker 在堆外記憶體維護着一個 delayed index 優先級隊列,根據延遲時間進行堆排序,延遲時間最短的會放在頭上,時間越長越靠後。consumer 在消費時,會先去 Delayed Message Tracker 檢查,是否有到期需要投遞的消息,如果有到期的消息,則從 Tracker 中拿出對應的 index,找到對應的消息進行消費;如果沒有到期的消息,則直接消費正常的消息。

如果叢集出現 Broker 當機或者 topic 的 ownership 轉移,Pulsar 會重建 delayed index 隊列,來保證延遲投遞的消息能夠正常工作。

五、Pulsar延遲消息投遞面臨的挑戰

從 Pulsar 的延遲消息投遞實作原理可以看出,該方法簡單高效,對 Pulsar 核心侵入性較小,可以支援到任意時間的延遲消息。但同時發現,Pulsar 的實作方案無法支援大規模使用延遲消息,主要有以下兩個原因:

1. delayed index隊列受到記憶體限制

一條延遲消息的 delayed index 由三個 long 組成,對于小規模的延遲消息來說,記憶體開銷并不大。但由于 index 隊列是 subscription 級别,對于 topic 的同一個 partition 來說,有多少個 subscription 就需要維護多少個 index 隊列;同時,由于延遲消息越多、延遲的時間越長,index 隊列記憶體占用也會更多。

2. delayed index隊列重建時間開銷

上面有提到,如果叢集出現 Broker 當機或者 topic 的 ownership 轉移,Pulsar 會重建 delayed index 隊列。對于跨度時間長的大規模延遲消息,重建時間可能會到小時級别。為了減小 delayed index 隊列重建時間,雖然可以給 topic 分更多的 partition 提高重建的并發度,但沒有徹底解決重建時間開銷問題。

六、Pulsar延遲消息投遞未來工作

Pulsar 目前的延遲消息投遞方案簡單高效,但處理大規模延遲消息時仍然存在風險。關于延遲消息投遞,社群和資料平台部 MQ 團隊下一步将聚焦在支援大規模延遲消息。目前讨論的方案是在 delayed index 隊列加入時間分區,Broker 隻加載目前較近的時間片 delayed index 到記憶體,其餘時間片分區持久化磁盤,示例圖如下圖所示:

Apache Pulsar 延遲消息投遞解析

上圖中,我們按 5 分鐘的間隔對 delayed index 隊列進行分區,m5 和 m1 放在了 time partition 1,由于延遲時間最近,放在了記憶體;m4 和 m3 在 time partition 2,延遲時間比較靠後,index 存儲在了磁盤。該方案不僅可以減少 delayed index 隊列重建時間開銷,還可以降低對記憶體的依賴。

結語

本文為大家介紹了延遲消息投遞的相關概念和使用場景,并詳細拓展了 Apache Pulsar 的實作原理。Pulsar 目前方案簡單高效,支援秒級精度的延遲消息投遞,但在處理大規模延遲消息時還有一些局限。