天天看點

基于Redis實作特殊的消息隊列

原創 默達 淘系技術  4月29日

基于Redis實作特殊的消息隊列
說到消息隊列,首先映入腦海的就是Kafka等,消息隊列在各個領域都發揮了很大的作用。但是,在一些場景下,傳統的消息隊列Kafka無法滿足需求,比如以下場景:

  • 消息重複機率比較高時,需要對重複消息進行合并處理避免浪費有限的資源,減少消費延遲;
  • 需要根據業務自定義優先級進行消息處理,高優先級的消息比低優先級的消息先處理;
  • 消息需要定時消費的場景,消息隻有在設定的消費時間到了之後立馬被消費。

本文将介紹一種基于Redis實作的消息隊列(Redis message queue, RMQ),RMQ可以作為傳統消息隊列的互補選擇,在傳統消息隊列沒有涉及的場景中使用RMQ。

功能介紹

RMQ設計為一個二方庫,可以幫助使用者基于Redis快速實作消息隊列的功能,RMQ消息隊列具有消息合并、區分優先級、支援定時消息等特性。RMQ消息隊列可以用于異步解耦、削峰填谷,支援億級資料堆積。RMQ消息隊列目前支援三種類型的消息,分别是RangeMergeMessage(區間重複合并消息)、PriorityMessage(優先級消息)、FixedTimeMessage(任意定時消息)。

▐  區間重複合并消息

RangeMergeMessage支援區間重複消息合并,發送消息時需要設定時間區間,消息延遲該時間區間長度後被消費,在該時間區間内如果發送重複的消息,重複消息将會被合并。如果消息在Redis服務端發生堆積,重複到來的消息依然會被合并處理。

該類型消息适用于消息重複率較高且希望重複消息合并處理的場景,對重複消息進行合并可以減少下遊消費系統的壓力,減少不必要的資源消耗,将有限的資源最大化的利用,提升消費效率。

▐  優先級消息

PriorityMessage支援給消息設定任意等級的優先級,優先級高的消息會被優先消費,相同優先級的消息被随機消費。如果消息在Redis服務端發生堆積,重複的消息将被合并處理,合并後消息的優先級等于最後存儲的消息的優先級。該類型消息适用于希望重複消息合并處理且需要設定優先級的場景,下遊消費者資源有限時,合并重複消息且優先處理優先級高的消息将可以合理利用有限的資源。

▐  任意定時消息

FixedTimeMessage支援給消息設定任意消費時間,隻有消費時間到了之後消息才被消費,消費時間可精确到秒。消息到期後沒有及時被消費時,消費者将按照時間由遠及近進行消費。如果消息在Redis服務端發生堆積,重複的消息将被合并處理,合并後消息的消費時間等于最後存儲的消息的消費時間。

該類型消息适用于希望重複消息合并處理且需要定時消費的場景,定時消息應用場景非常豐富,比如定時打标去标、活動結束後清理動作、訂單逾時關閉等。

▐  并發消費控制

使用傳統消息中間件進行叢集消費的時候,為了避免并發處理同一進制資料導緻不一緻問題,通常需要對中繼資料加分布式鎖,頻繁的鎖沖突會導緻消費效率低下。加分布式鎖的最終目的其實就是保障屬于同一進制資料的消息被串行消費。加分布式鎖并不是最好的方案,最好的方案應該是從根上解決并發問題,讓屬于同一進制資料的消息串行消費。

RMQ消息隊列具有并發消費控制能力,屬于同一進制資料的消息隻會被配置設定給全局唯一一個線程進行消費,是以屬于同一進制資料的消息将被串行消費。使用方如果需要該能力,除了需要提供Redis,還需要提供ZooKeeper。

▐  重試次數控制

RMQ消息隊列支援失敗重試消費16次,業務傳回消費失敗後,消息會被復原并等待重試消費,重試16次後消息進入死信隊列,消息不再被消費,除非人工幹預。

技術原理

▐  總體架構

基于Redis實作特殊的消息隊列

RMQ消息隊列由三部分組成,分别為ZooKeeper、RMQ二方庫、Redis。ZooKeeper負責維護叢集worker的資訊,将topic的所有slot配置設定給全局的woker。Redis負責存儲消息,采用Sorted Set結構存儲,Store Queue是消息存放的隊列,Prepare Queue是采用二階段消費方式正在消費的消息存放隊列,Dead Queue是死信隊列。RMQ二方庫由RmqClient、Consumer、Producer三部分組成。RmqClient負責RMQ的啟動工作,包括上報TopicDef、Worker給ZooKeeper,配置設定Slot給Worker,掃描業務定義的MessageListener Bean。Producer負責根據不用消息類型将消息按照指定的方式存儲到Redis。Consumer負責根據不用消息類型按照指定方式從Redis彈出消息并調用業務的MessageListener。

▐  消息存儲

基于Redis實作特殊的消息隊列
  • Topic的設計

Topic的定義有三部分組成,topic表示主題名稱,slotAmount表示消息存儲劃分的槽數量,topicType表示消息的類型。主題名稱是一個Topic的唯一标示,相同主題名稱Topic的slotAmount和topicType一定是一樣的。

消息存儲采用Redis的Sorted Set結構,為了支援大量消息的堆積,需要把消息分散存儲到很多個槽中,slotAmount表示該Topic消息存儲共使用的槽數量,槽數量一定需要是2的n次幂。在消息存儲的時候,采用對指定資料或者消息體哈希求餘得到槽位置。

  • StoreQueue的設計

上圖中topic劃分了8個槽位,編号0-7。如果發送方指定了消息的slotBasis,則計算slotBasis的CRC32值,CRC32值對槽數量進行取模得到槽序号,SlotKey設計為#{topic}_#{index}(也即Redis的鍵),其中#{}表示占位符。

發送方需要保證相同内容的消息的slotBasis相同,如果沒有指定slotBasis則采用消息内容計算SlotKey,這樣内容相同的消息體就會落在同一個Sorted Set裡面,是以内容相同的消息會進行合并。

Redis的Sorted Set中的資料按照分數排序,實作不同類型的消息的關鍵就在于如何利用分數、如何添加消息到Sorted Set、如何從Sorted Set中彈出消息。優先級消息将優先級作為分數,消費時每次彈出分數最大的消息。任意定時消息将時間戳作為分數,消費時每次彈出分數大于目前時間戳的一個消息。

區間重複合并消息将時間戳作為分數,添加消息時将(目前時間戳+時間區間)作為分數,消費時每次彈出分數大于目前時間戳的一個消息。

  • PrepareQueue的設計

為了保障RMQ消息隊列的可用性,做到每條消息至少消費一次,消費者不是直接pop有序集合中的元素,而是将元素從StoreQueue移動到PrepareQueue并傳回消息給消費者,等消費成功後再從PrepareQueue從删除,或者消費失敗後從PreapreQueue重新移動到StoreQueue,這便是根據二階段送出的思想實作的二階段消費。

在後面将會詳細介紹二階段消費的實作思路,這裡重點介紹下PrepareQueue的存儲設計。StoreQueue中每一個Slot對應PrepareQueue中的Slot,PrepareQueue的SlotKey設計為prepare{#{topic}#{index}}。PrepareQueue采用Sorted Set作為存儲,消息移動到PrepareQueue時刻對應的(秒級時間戳*1000+重試次數)作為分數,字元串存儲的是消息體内容。這裡分數的設計與重試次數的設計密切相關,是以在重試次數設計章節詳細介紹。

PrepareQueue的SlotKey設計中需要注意的一點,由于消息從StoreQueue移動到PrepareQueue是通過Lua腳本操作的,是以需要保證Lua腳本操作的Slot在同一個Redis節點上,如何保證PrepareQueue的SlotKey和對應的StoreQueue的SlotKey被hash到同一個Redis槽中呢。Redis的hash tag功能可以指定SlotKey中隻有某一部分參與計算hash,這一部分采用{}包括,是以PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey。

  • DeadQueue的設計

消息重試消費16次後,消息将進入DeadQueue。DeadQueue的SlotKey設計為prepare{#{topic}#{index}},這裡同樣采用hash tag功能保證DeadQueue的SlotKey與對應StoreQueue的SlotKey存儲在同一Redis節點。

▐  生産者

生産者的任務就是将消息添加到Redis的Sorted Set中。首先,需要計算出消息添加到Redis的SlotKey,如果發送方指定了消息的slotBasis(否則采用content代替),則計算slotBasis的CRC32值,CRC32值對槽數量進行取模得到槽序号,SlotKey設計為#{topic}_#{index},其中#{}表示占位符。然後,不同類型的消息有不同的添加方式,是以分布講述三種類型消息的添加過程。

  • 區間重複合并消息

發送該消息時需要設定timeRange,timeRange必須大于0,機關為毫秒,表示消息将延遲timeRange毫秒後被消費,期間到來的重複消息将被合并,合并後的消息依然維持原來的消費時間。

是以在存儲該類型消息的時候,采用(目前時間戳+timeRange)作為分數,添加消息采用Lua腳本執行,保證操作的原子性,Lua腳本首先采用zscore指令檢查消息是否已經存在,如果已經存在則直接傳回,如果不存在則執行zadd指令添加。

  • 優先級消息

發送該消息時需要設定priority,priority必須大于16,表示消息的優先級,數值越大表示優先級越高。是以在存儲該類型消息的時候,采用priority作為分數,采用zadd指令直接添加。

  • 任意定時消息

發送該類型消息時需要設定fixedTime,fixedTime必須大于目前時間,表示消費時間戳,目前時間大于該消費時間戳的時候,消息才會被消費。是以在存儲該類型消息的時候,采用fixedTime作為分數,采用指令zadd直接添加。

▐  消費者

  • 二階段消費方式

三種消費模式

一般消息隊列存在三種消費模式,分别是:最多消費一次、至少消費一次、隻消費一次。最多消費一次模式消息可能丢失,一般不怎麼使用。至少消費一次模式消息不會丢失,但是可能存在重複消費,比較常用。隻消費一次模式消息被精确隻消費一次,實作較困難,一般需要業務記錄幂等ID來實作。RMQ實作了至少消費一次的模式,那麼如何保證消息至少被消費一次呢?

至少消費一次模式實作的難點

從最簡單的消費模式——最多消費一次說起,消費者端隻需要從消息隊列服務中取出消息就行,即執行Redis的zpopmax指令,不倫消費者是否接收到該消息并成功消費,消息隊列服務都認為消息消費成功。最多一次消費模式導緻消息丢失的因素可能有:網絡丢包導緻消費者沒有接收到消息,消費者接收到消息但在消費的時候當機了,消費者接收到消息但消費失敗。針對消費失敗導緻消息丢失的情況比較好解決,隻需要把消費失敗的消息重新放入消息隊列服務就行,但是網絡丢包和消費系統異常導緻的消息丢失問題不好解決。

可能有人會想到,我們不把元素從有序集合中pop出來,我們先查詢優先級最高的元素,然後消費,再删除消費成功的元素,但是這樣消息服務隊列就變成了同步阻塞隊列,性能會很差。

至少消費一次模式的實作

至少消費一次的問題比較類似銀行轉賬問題,A向B賬戶轉賬100元,如何保障A賬戶扣減100同時B賬戶增加100,是以我們可以想到二階段送出的思想。第一個準備階段,A、B分别進行資源當機并持久化undo和redo日志,A、B分别告訴協調者已經準備好;第二個送出階段,協調者告訴A、B進行送出,A、B分别送出事務。RMQ基于二階段送出的思想來實作至少消費一次的模式。

RMQ存儲設計中PrepareQueue的作用就是用來當機資源并記錄事務日志,消費者端即是參與者也是協調者。第一個準備階段,消費者端通過執行Lua腳本從StoreQueue中Pop消息并存儲到PrepareQueue,同時消息傳輸到消費者端,消費者端消費該消息;第二個送出階段,消費者端根據消費結果是否成功協調消息隊列服務是送出還是復原,如果消費成功則送出事務,該消息從PrepareQueue中删除,如果消費失敗則復原事務,消費者端将該消息從PrepareQueue移動到StoreQueue,如果因為各種異常導緻PrepareQueue中消息滞留逾時,逾時後将自動執行復原操作。二階段消費的流程圖如下所示。

基于Redis實作特殊的消息隊列

實作方案的異常情況分析

我們來分析下采用二階段消費方案可能存在的異常情況,從以下分析來看二階段消費方案可以保障消息至少被消費一次。

  1. 網絡丢包導緻消費者沒有接收到消息,這時消息已經記錄到PrepareQueue,如果到了逾時時間,消息被復原放回StoreQueue,等待下次被消費,消息不丢失。
  2. 消費者接收到了消息,但是消費者還沒來得及消費完成系統就當機了,消息消費逾時到了後,消息會被重新放入StoreQueue,等待下次被消費,消息不丢失。
  3. 消費者接收到了消息并消費成功,消費者端在協調事務送出的時候當機了,消息消費逾時到了後,消息會被重新放入StoreQueue,等待下次被消費,消息被重複消費。
  4. 消費者接收到了消息但消費失敗,消費者端在協調事務送出的時候當機了,消息消費逾時到了後,消息會被重新放入StoreQueue,等待下次被消費,消息不丢失。
  5. 消費者接收到了消息并消費成功,但是由于fullgc等原因使消費時間太長,PrepareQueue中的消息由于逾時已經復原到StoreQueue,等待下次被消費,消息被重複消費。
  • 重試次數控制的實作

采用二階段消費方式,需要将消息在StoreQueue和PrepareQueue之間移動,如何實作重試次數控制呢,其關鍵在StoreQueue和PrepareQueue的分數設計。

PrepareQueue的分數需要與時間相關,正常情況下,消費者不管消費失敗還是消費成功,都會從PrepareQueue删除消息,當消費者系統發生異常或者當機的時候,消息就無法從PrepareQueue中删除,我們也不知道消費者是否消費成功,為保障消息至少被消費一次,我們需要做到逾時復原,是以分數需要與消費時間相關。當PrepareQueue中的消息發生逾時的時候,将消息從PrepareQueue移動到StoreQueue。

是以PrepareQueue的分數設計為:秒級時間戳*1000+重試次數。不同類型的消息首次存儲到StoreQueue中的分數表示的含義不盡相同,區間重複合并消息和任意定時消息存儲時的分數表示消費時間戳,優先級消息存儲時的分數表示優先級。如果消息消費失敗,消息從PrepareQueue復原到StoreQueue,所有類型的消息存儲時的分數都表示剩餘重試次數,剩餘重試次數從16次不斷降低最後為0,消息進入死信隊列。消息在StoreQueue和PrepareQueue之間移動流程如下:

基于Redis實作特殊的消息隊列
  • Pop消息

不同類型的消息在消費的時候Pop消息的方式不一樣,是以接下來分别講述三種類型消息的Pop方式。

該消息存儲的分數設計為消費時間戳,目前時間大于消息的消費時間戳時,該消息應該被消費。是以采用Redis指令ZRANGEBYSCORE彈出分數小于目前時間戳的一條消息。

該消息存儲的分數設計為優先級,優先級越高分數越大,是以采用Redis指令ZPOPMAX彈出分數最大的一條消息。

任意定時消息該消息存儲的分數設計為消費時間戳,目前時間大于消息的消費時間戳時,該消息應該被消費。是以采用Redis指令ZRANGEBYSCORE彈出分數小于目前時間戳的一條消息。

相關應用

▐  主圖價格表達項目

基于Redis實作特殊的消息隊列

在主圖價格表達中需要實作一個功能,商品價格發生變化時将商品價格列印在商品主圖上面,那麼需要在價格發生變動的時候觸發合成一張帶價格的圖檔,每一次觸發合圖時計算價格都是擷取目前最新的價格。上遊價格變化的因素很多,變化很頻繁,下遊合圖消耗GPU資源較大,處理容量較低。是以需要盡可能合并觸發合圖消息,減輕下遊處理壓力,于是使用了RMQ作為消息隊列來進行削峰填谷、消息合并。不僅如此,還可以根據商家等級劃分觸發合圖消息的等級,使KA商家能夠優先得到處理,縮短價格變化的延遲。

線上上實際環境中,叢集共130台機器,RMQ消息隊列的發送消息能力和消費消息能力均可以達到5w tps,而且這并不是峰值,理論上可以達到10w tps。

▐  線上資料圈選引擎

線上資料圈選引擎需要處理各種來源的大量動态資料,需要将一段時間區間内的消息合并處理,減少處理壓力,并且在對同一進制資料進行并發處理需要加分布式鎖,鎖沖突導緻消費效率下降。RMQ的區間重複合并消息和并發消費控制能力可以幫助解決這些問題。目前,線上資料圈選引擎已經采用了RMQ消息隊列作為核心元件,RMQ消息隊列發揮了很大的作用。

總結

本文提出了一種可實作的基于Redis的消息隊列,充分利用Sorted Set結構設計了消息合并、優先級、定時等特性,與傳統消息隊列形成互補,彌補傳統消息隊列這方面特性的缺失。為了實作高可用,本文在二階段送出的思想上進行改進設計了二階段消費方式,保障消息至少被消費一次。

未來将基于Redis的特性打造更多獨特的功能,與傳統消息中間件形成互補。在消費控制方面會增加流量自動調控能力,根據消息類型調控消費速度,減少因為某種類型消息消費瓶頸導緻整體消費性能下降。

繼續閱讀