延遲隊列是為了存放那些延遲執行的消息,待消息過期之後消費端從隊列裡拿出來執行。
DLX + TTL 方式存在的時序問題

對于延遲隊列不管是 AMQP 協定或者 RabbitMQ 本身是不支援的,之前有介紹過如何使用 RabbitMQ 死信隊列(DLX) + TTL 的方式來模拟實作延遲隊列,這也是通常的一種做法,可參見我的另一篇文章《利用 RabbitMQ 死信隊列和 TTL 實作定時任務》。
今天我想說的是這種方式會存在一個時序問題,看下圖:
左側隊列 queue1 分别兩條消息 msg1、msg2 過期時間都為 1s,輸出順序為 msg1、msg2 是沒問題的。
右側隊列 queue2 分别兩條消息 msg1、msg2 注意問題來了,msg2 的消息過期時間為 1s 而 msg1 的消息過期為 2s,你可能想誰先過期就誰先消費呗,顯然不是這樣的,因為這是在同一個隊列,必須前一個消費,第二個才能消費,是以就出現了時序問題。
如果你的消息過期時間是有規律的,例如,有的 1s、有的 2s,那麼我們可以以時間為次元設計為兩個隊列,如下所示:
上面我們将 1S 過期的消息拆分為隊列 queue_1s,2S 過期的消息拆分為隊列 queue_2s,事情得到進一步解決。如果此時消息的過期時間不确定或者消息過期時間次元過多,在消費端我們就要去監聽多個消息隊列且對于消息過期時間不确定的也是很難去設計的。
針對消息無序的不妨看下以下解決方案。
Delayed Message 插件

這裡将使用的是一個 RabbitMQ 延遲消息插件 rabbitmq-delayed-message-exchange[1],目前維護在 RabbitMQ 插件社群,我們可以聲明 x-delayed-message 類型的 Exchange,消息發送時指定消息頭 x-delay 以毫秒為機關将消息進行延遲投遞。
實作原理
上面使用 DLX + TTL 的模式,消息首先會路由到一個正常的隊列,根據設定的 TTL 進入死信隊列,與之不同的是通過 x-delayed-message 聲明的交換機,它的消息在釋出之後不會立即進入隊列,先将消息儲存至 Mnesia(一個分布式資料庫管理系統,适合于電信和其它需要持續運作和具備軟實時特性的 Erlang 應用。目前資料介紹的不是很多)。
這個插件将會嘗試确認消息是否過期,首先要確定消息的延遲範圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設定的範圍為 (2^32)-1 毫秒),如果消息過期通過 x-delayed-type 類型标記的交換機投遞至目标隊列,整個消息的投遞過程也就完成了。
插件安裝
根據你的 RabbitMQ 版本來安裝相應插件版本,RabbitMQ community-plugins[2] 上面有版本對應資訊可參考。
注意:需要 RabbitMQ 3.5.3 和更高版本。
<code># 注意要下載下傳至你的 RabbitMQ 伺服器的 plugins 目錄下,例如:/usr/local/rabbitmq/plugins</code>
<code>wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip</code>
<code># 解壓</code>
<code>unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip</code>
<code># 解壓之後得到如下檔案</code>
<code>rabbitmq_delayed_message_exchange-20171215-3.6.x.ez</code>
啟用插件
使用 rabbitmq-plugins enable 指令啟用插件,啟動成功會看到如下提示:
<code>$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange</code>
<code>The following plugins have been enabled:</code>
<code> rabbitmq_delayed_message_exchange</code>
<code>Applying plugin configuration to rabbit@xxxxxxxx... started 1 plugin.</code>
管理控制台聲明 x-delayed-message 交換機
在開始代碼之前先打開 RabbitMQ 的管理 UI 界面,聲明一個 x-delayed-message 類型的交換機,否則你會遇到下面的錯誤:
這個問題困擾我了一會兒,詳情可見 GitHub Issues rabbitmq-delayed-message-exchange/issues/19[3],正确操作如下圖所示:
Nodejs 代碼實踐
上面準備工作完成了,開始我們的代碼實踐吧,官方沒有提供 Nodejs 示例,隻提供了 Java 示例,對于一個寫過 Spring Boot 項目的 Nodeer 這不是問題(此處,兄得你有點飄了啊 /:xx)其實如果有時間能多了解點些,你會發現還是有益的。
建構生産者:
幾個注意點:
交換機類型一定要設定為 x-delayed-message
設定 x-delayed-type 為 direct,當然也可以是 topic 等
發送消息時設定消息頭 headers 的 x-delay 屬性,即延遲時間,如果不設定消息将會立即投遞
<code>const amqp = require('amqplib');</code>
<code>async function producer(msg, expiration) {</code>
<code> try {</code>
<code> const connection = await amqp.connect('amqp://localhost:5672');</code>
<code> const exchange = 'my-delayed-exchange';</code>
<code> const exchangeType = 'x-delayed-message'; // x-delayed-message 交換機的類型</code>
<code> const routingKey = 'my-delayed-routingKey';</code>
<code> const ch = await connection.createChannel();</code>
<code> await ch.assertExchange(exchange, exchangeType, {</code>
<code> durable: true,</code>
<code> 'x-delayed-type': 'direct'</code>
<code> });</code>
<code> console.log('producer msg:', msg);</code>
<code> await ch.publish(exchange, routingKey, Buffer.from(msg), {</code>
<code> headers: {</code>
<code> 'x-delay': expiration, // 一定要設定,否則無效</code>
<code> }</code>
<code> ch.close();</code>
<code> } catch(err) {</code>
<code> console.log(err)</code>
<code> }</code>
<code>}</code>
<code>producer('msg0 1S Expire', 1000) // 1S</code>
<code>producer('msg1 30S Expire', 1000 * 30) // 30S</code>
<code>producer('msg2 10S Expire', 1000 * 10) // 10S</code>
<code>producer('msg3 5S Expire', 1000 * 5) // 5S</code>
建構消費端:
消費端改變不大,交換機聲明處同生産者保持一樣,設定交換機類型(x-delayed-message)和 x-delayed-type。
<code>async function consumer() {</code>
<code> const exchange = 'my-delayed-exchange';</code>
<code> const exchangeType = 'x-delayed-message';</code>
<code> const routingKey = 'my-delayed-routingKey';</code>
<code> const queueName = 'my-delayed-queue';</code>
<code> await ch.assertQueue(queueName);</code>
<code> await ch.bindQueue(queueName, exchange, routingKey);</code>
<code> await ch.consume(queueName, msg => {</code>
<code> console.log('consumer msg:', msg.content.toString());</code>
<code> }, { noAck: true });</code>
<code> console.log('Consumer Error: ', err);</code>
<code>consumer()</code>
以上示例源碼位址:https://github.com/Q-Angelo/project-training/tree/master/rabbitmq/rabbitmq-delayed-message-node
最後,讓我們對以上程式做個測試,左側視窗展示了生産端資訊,右側視窗展示了消費端資訊,這次實作了同一個隊列裡不同過期時間的消息,可以按照我們預先設定的 TTL 時間順序性消費,我們的目的達到了。
局限性
Delayed Message 插件實作 RabbitMQ 延遲隊列這種方式也不完全是一個銀彈,它将延遲消息存在于 Mnesia 表中,并且在目前節點上具有單個磁盤副本,它們将在節點重新開機之後幸存。
目前該插件的目前設計并不真正适合包含大量延遲消息(例如數十萬或數百萬)的場景,詳情參見 #/issues/72[4] 另外該插件的一個可變性來源是依賴于 Erlang 計時器,在系統中使用了一定數量的長時間計時器之後,它們開始争用排程程式資源,并且時間漂移不斷累積。
插件的禁用要慎重,以下方式可以實作将插件禁用,但是注意如果此時還有延遲消息未消費,那麼禁掉此插件後所有的未消費的延遲消息将丢失。
如果你采用了 Delayed Message 插件這種方式來實作,對于消息可用性要求較高的,在發現消息之前可以先落入 DB 打标記,消費之後将消息标記為已消費,中間可以加入定時任務做檢測,這可以進一步保證你的消息的可靠性。
總結

經過一番實踐測試、學習之後發現,DLX + TTL 和 Delayed Message 插件這兩種 RabbitMQ 延遲消息解決方案都有一定的局限性。
如果你的消息 TTL 是相同的,使用 DLX + TTL 的這種方式是沒問題的,對于我來說目前還是優選。
如果你的消息 TTL 過期值是可變的,可以嘗試下使用 Delayed Message 插件,對于某些應用而言它可能很好用,對于那些可能會達到高容量延遲消息應用而言,則不是很好。
相關連結:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
https://www.rabbitmq.com/community-plugins.html
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/19
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72
文章來源:Nodejs技術棧,點選檢視原文。