天天看點

消息隊列設計精要 何時需要消息隊列 如何設計一個消息隊列 總結

消息隊列已經逐漸成為企業IT系統内部通信的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一緻性等一系列功能,成為異步RPC的主要手段之一。

當今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿裡巴巴自主開發的Notify、MetaQ、RocketMQ等。

本文不會一一介紹這些消息隊列的所有特性,而是探讨一下自主開發設計一個消息隊列時,你需要思考和設計的重要方面。過程中我們會參考這些成熟消息隊列的很多重要思想。

本文首先會闡述什麼時候你需要一個消息隊列,然後以Push模型為主,從零開始分析設計一個消息隊列時需要考慮到的問題,如RPC、高可用、順序和重複消息、可靠投遞、消費關系解析等。

也會分析以Kafka為代表的pull模型所具備的優點。最後是一些進階主題,如用批量/異步提高性能、pull模型的系統設計理念、存儲子系統的設計、流量控制的設計、公平排程的實作等。其中最後四個方面會放在下篇講解。

何時需要消息隊列

當你需要使用消息隊列時,首先需要考慮它的必要性。可以使用mq的場景有很多,最常用的幾種,是做業務解耦/最終一緻性/廣播/錯峰流控等。反之,如果需要強一緻性,關注業務邏輯的處理結果,則RPC顯得更為合适。

解耦

解耦是消息隊列要解決的最本質問題。所謂解耦,簡單點講就是一個事務,隻關心核心的流程。而需要依賴其他系統但不那麼重要的事情,有通知即可,無需等待結果。換句話說,基于消息的模型,關心的是“通知”,而非“處理”。

比如在美團旅遊,我們有一個産品中心,産品中心上遊對接的是主站、移動背景、旅遊供應鍊等各個資料源;下遊對接的是篩選系統、API系統等展示系統。當上遊的資料發生變更的時候,如果不使用消息系統,勢必要調用我們的接口來更新資料,就特别依賴産品中心接口的穩定性和處理能力。但其實,作為旅遊的産品中心,也許隻有對于旅遊自建供應鍊,産品中心更新成功才是他們關心的事情。而對于團購等外部系統,産品中心更新成功也好、失敗也罷,并不是他們的職責所在。他們隻需要保證在資訊變更的時候通知到我們就好了。

而我們的下遊,可能有更新索引、重新整理緩存等一系列需求。對于産品中心來說,這也不是我們的職責所在。說白了,如果他們定時來拉取資料,也能保證資料的更新,隻是實時性沒有那麼強。但使用接口方式去更新他們的資料,顯然對于産品中心來說太過于“重量級”了,隻需要釋出一個産品ID變更的通知,由下遊系統來處理,可能更為合理。

再舉一個例子,對于我們的訂單系統,訂單最終支付成功之後可能需要給使用者發送短信積分什麼的,但其實這已經不是我們系統的核心流程了。如果外部系統速度偏慢(比如短信網關速度不好),那麼主流程的時間會加長很多,使用者肯定不希望點選支付過好幾分鐘才看到結果。那麼我們隻需要通知短信系統“我們支付成功了”,不一定非要等待它處理完成。

最終一緻性

最終一緻性指的是兩個系統的狀态保持一緻,要麼都成功,要麼都失敗。當然有個時間限制,理論上越快越好,但實際上在各種異常的情況下,可能會有一定延遲達到最終一緻狀态,但最後兩個系統的狀态是一樣的。

業界有一些為“最終一緻性”而生的消息隊列,如Notify(阿裡)、QMQ(去哪兒)等,其設計初衷,就是為了交易系統中的高可靠通知。

以一個銀行的轉賬過程來了解最終一緻性,轉賬的需求很簡單,如果A系統扣錢成功,則B系統加錢一定成功。反之則一起復原,像什麼都沒發生一樣。

然而,這個過程中存在很多可能的意外:

  1. A扣錢成功,調用B加錢接口失敗。
  2. A扣錢成功,調用B加錢接口雖然成功,但擷取最終結果時網絡異常引起逾時。
  3. A扣錢成功,B加錢失敗,A想復原扣的錢,但A機器down機。

可見,想把這件看似簡單的事真正做成,真的不那麼容易。所有跨VM的一緻性問題,從技術的角度講通用的解決方案是:

  1. 強一緻性,分布式事務,但落地太難且成本太高,後文會具體提到。
  2. 最終一緻性,主要是用“記錄”和“補償”的方式。在做所有的不确定的事情之前,先把事情記錄下來,然後去做不确定的事情,結果可能是:成功、失敗或是不确定,“不确定”(例如逾時等)可以等價為失敗。成功就可以把記錄的東西清理掉了,對于失敗和不确定,可以依靠定時任務等方式把所有失敗的事情重新搞一遍,直到成功為止。

    回到剛才的例子,系統在A扣錢成功的情況下,把要給B“通知”這件事記錄在庫裡(為了保證最高的可靠性可以把通知B系統加錢和扣錢成功這兩件事維護在一個本地事務裡),通知成功則删除這條記錄,通知失敗或不确定則依靠定時任務補償性地通知我們,直到我們把狀态更新成正确的為止。

    整個這個模型依然可以基于RPC來做,但可以抽象成一個統一的模型,基于消息隊列來做一個“企業總線”。

    具體來說,本地事務維護業務變化和通知消息,一起落地(失敗則一起復原),然後RPC到達broker,在broker成功落地後,RPC傳回成功,本地消息可以删除。否則本地消息一直靠定時任務輪詢不斷重發,這樣就保證了消息可靠落地broker。

    broker往consumer發送消息的過程類似,一直發送消息,直到consumer發送消費成功确認。

    我們先不理會重複消息的問題,通過兩次消息落地加補償,下遊是一定可以收到消息的。然後依賴狀态機版本号等方式做判重,更新自己的業務,就實作了最終一緻性。

最終一緻性不是消息隊列的必備特性,但确實可以依靠消息隊列來做最終一緻性的事情。另外,所有不保證100%不丢消息的消息隊列,理論上無法實作最終一緻性。好吧,應該說理論上的100%,排除系統嚴重故障和bug。

像Kafka一類的設計,在設計層面上就有丢消息的可能(比如定時刷盤,如果掉電就會丢消息)。哪怕隻丢千分之一的消息,業務也必須用其他的手段來保證結果正确。

廣播

消息隊列的基本功能之一是進行廣播。如果沒有消息隊列,每當一個新的業務方接入,我們都要聯調一次新接口。有了消息隊列,我們隻需要關心消息是否送達了隊列,至于誰希望訂閱,是下遊的事情,無疑極大地減少了開發和聯調的工作量。

比如本文開始提到的産品中心釋出産品變更的消息,以及景點庫很多去重更新的消息,可能“關心”方有很多個,但産品中心和景點庫隻需要釋出變更消息即可,誰關心誰接入。

錯峰與流控

試想上下遊對于事情的處理能力是不同的。比如,Web前端每秒承受上千萬的請求,并不是什麼神奇的事情,隻需要加多一點機器,再搭建一些LVS負載均衡裝置和Nginx等即可。但資料庫的處理能力卻十分有限,即使使用SSD加分庫分表,單機的處理能力仍然在萬級。由于成本的考慮,我們不能奢求資料庫的機器數量追上前端。

這種問題同樣存在于系統和系統之間,如短信系統可能由于短闆效應,速度卡在網關上(每秒幾百次請求),跟前端的并發量不是一個數量級。但使用者晚上個半分鐘左右收到短信,一般是不會有太大問題的。如果沒有消息隊列,兩個系統之間通過協商、滑動視窗等複雜的方案也不是說不能實作。但系統複雜性指數級增長,勢必在上遊或者下遊做存儲,并且要處理定時、擁塞等一系列問題。而且每當有處理能力有差距的時候,都需要單獨開發一套邏輯來維護這套邏輯。是以,利用中間系統轉儲兩個系統的通信内容,并在下遊系統有能力處理這些消息的時候,再處理這些消息,是一套相對較通用的方式。

總而言之,消息隊列不是萬能的。對于需要強事務保證而且延遲敏感的,RPC是優于消息隊列的。

對于一些無關痛癢,或者對于别人非常重要但是對于自己不是那麼關心的事情,可以利用消息隊列去做。

支援最終一緻性的消息隊列,能夠用來處理延遲不那麼敏感的“分布式事務”場景,而且相對于笨重的分布式事務,可能是更優的處理方式。

當上下遊系統處理能力存在差距的時候,利用消息隊列做一個通用的“漏鬥”。在下遊有能力處理的時候,再進行分發。

如果下遊有很多系統關心你的系統發出的通知的時候,果斷地使用消息隊列吧。

如何設計一個消息隊列

綜述

我們現在明确了消息隊列的使用場景,下一步就是如何設計實作一個消息隊列了。

消息隊列設計精要 何時需要消息隊列 如何設計一個消息隊列 總結

基于消息的系統模型,不一定需要broker(消息隊列服務端)。市面上的的Akka(actor模型)、ZeroMQ等,其實都是基于消息的系統設計範式,但是沒有broker。

我們之是以要設計一個消息隊列,并且配備broker,無外乎要做兩件事情:

  1. 消息的轉儲,在更合适的時間點投遞,或者通過一系列手段輔助消息最終能送達消費機。
  2. 規範一種範式和通用的模式,以滿足解耦、最終一緻性、錯峰等需求。

    掰開了揉碎了看,最簡單的消息隊列可以做成一個消息轉發器,把一次RPC做成兩次RPC。發送者把消息投遞到服務端(以下簡稱broker),服務端再将消息轉發一手到接收端,就是這麼簡單。

一般來講,設計消息隊列的整體思路是先build一個整體的資料流,例如producer發送給broker,broker發送給consumer,consumer回複消費确認,broker删除/備份消息等。

利用RPC将資料流串起來。然後考慮RPC的高可用性,盡量做到無狀态,友善水準擴充。

之後考慮如何承載消息堆積,然後在合适的時機投遞消息,而處理堆積的最佳方式,就是存儲,存儲的選型需要綜合考慮性能/可靠性和開發維護成本等諸多因素。

為了實作廣播功能,我們必須要維護消費關系,可以利用zk/config server等儲存消費關系。

在完成了上述幾個功能後,消息隊列基本就實作了。然後我們可以考慮一些進階特性,如可靠投遞,事務特性,性能優化等。

下面我們會以設計消息隊列時重點考慮的子產品為主線,穿插灌輸一些消息隊列的特性實作方法,來具體分析設計實作一個消息隊列時的方方面面。

實作隊列基本功能

RPC通信協定

剛才講到,所謂消息隊列,無外乎兩次RPC加一次轉儲,當然需要消費端最終做消費确認的情況是三次RPC。既然是RPC,就必然牽扯出一系列話題,什麼負載均衡啊、服務發現啊、通信協定啊、序列化協定啊,等等。在這一塊,我的強烈建議是不要重複造輪子。利用公司現有的RPC架構:Thrift也好,Dubbo也好,或者是其他自定義的架構也好。因為消息隊列的RPC,和普通的RPC沒有本質差別。當然了,自主利用Memchached或者Redis協定重新寫一套RPC架構并非不可(如MetaQ使用了自己封裝的Gecko NIO架構,卡夫卡也用了類似的協定)。但實作成本和難度無疑倍增。排除對效率的極端要求,都可以使用現成的RPC架構。

簡單來講,服務端提供兩個RPC服務,一個用來接收消息,一個用來确認消息收到。并且做到不管哪個server收到消息和确認消息,結果一緻即可。當然這中間可能還涉及跨IDC的服務的問題。這裡和RPC的原則是一緻的,盡量優先選擇本機房投遞。你可能會問,如果producer和consumer本身就在兩個機房了,怎麼辦?首先,broker必須保證感覺的到所有consumer的存在。其次,producer盡量選擇就近的機房就好了。

高可用

其實所有的高可用,是依賴于RPC和存儲的高可用來做的。先來看RPC的高可用,美團的基于MTThrift的RPC架構,阿裡的Dubbo等,其本身就具有服務自動發現,負載均衡等功能。而消息隊列的高可用,隻要保證broker接受消息和确認消息的接口是幂等的,并且consumer的幾台機器處理消息是幂等的,這樣就把消息隊列的可用性,轉交給RPC架構來處理了。

那麼怎麼保證幂等呢?最簡單的方式莫過于共享存儲。broker多機器共享一個DB或者一個分布式檔案/kv系統,則處理消息自然是幂等的。就算有單點故障,其他節點可以立刻頂上。另外failover可以依賴定時任務的補償,這是消息隊列本身天然就可以支援的功能。存儲系統本身的可用性我們不需要操太多心,放心大膽的交給DBA們吧!

對于不共享存儲的隊列,如Kafka使用分區加主備模式,就略微麻煩一些。需要保證每一個分區内的高可用性,也就是每一個分區至少要有一個主備且需要做資料的同步,關于這塊HA的細節,可以參考下篇pull模型消息系統設計。

服務端承載消息堆積的能力

消息到達服務端如果不經過任何處理就到接收者了,broker就失去了它的意義。為了滿足我們錯峰/流控/最終可達等一系列需求,把消息存儲下來,然後選擇時機投遞就顯得是順理成章的了。

隻是這個存儲可以做成很多方式。比如存儲在記憶體裡,存儲在分布式KV裡,存儲在磁盤裡,存儲在資料庫裡等等。但歸結起來,主要有持久化和非持久化兩種。

持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),并且理論上能承載更大限度的消息堆積(外存的空間遠大于記憶體)。

但并不是每種消息都需要持久化存儲。很多消息對于投遞性能的要求大于可靠性的要求,且數量極大(如日志)。這時候,消息不落地直接暫存記憶體,嘗試幾次failover,最終投遞出去也未嘗不可。

市面上的消息隊列普遍兩種形式都支援。當然具體的場景還要具體結合公司的業務來看。

存儲子系統的選擇

我們來看看如果需要資料落地的情況下各種存儲子系統的選擇。理論上,從速度來看,檔案系統>分布式KV(持久化)>分布式檔案系統>資料庫,而可靠性卻截然相反。還是要從支援的業務場景出發作出最合理的選擇,如果你們的消息隊列是用來支援支付/交易等對可靠性要求非常高,但對性能和量的要求沒有這麼高,而且沒有時間精力專門做檔案存儲系統的研究,DB是最好的選擇。

但是DB受制于IOPS,如果要求單broker 5位數以上的QPS性能,基于檔案的存儲是比較好的解決方案。整體上可以采用資料檔案+索引檔案的方式處理,具體這塊的設計比較複雜,可以參考下篇的存儲子系統設計。

分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其程式設計接口較友好,性能也比較可觀,如果在可靠性要求不是那麼高的場景,也不失為一個不錯的選擇。

消費關系解析

現在我們的消息隊列初步具備了轉儲消息的能力。下面一個重要的事情就是解析發送接收關系,進行正确的消息投遞了。

市面上的消息隊列定義了一堆讓人暈頭轉向的名詞,如JMS 規範中的Topic/Queue,Kafka裡面的Topic/Partition/ConsumerGroup,RabbitMQ裡面的Exchange等等。抛開現象看本質,無外乎是單點傳播與廣播的差別。所謂單點傳播,就是點到點;而廣播,是一點對多點。當然,對于網際網路的大部分應用來說,組間廣播、組内單點傳播是最常見的情形。

消息需要通知到多個業務叢集,而一個業務叢集内有很多台機器,隻要一台機器消費這個消息就可以了。

當然這不是絕對的,很多時候組内的廣播也是有适用場景的,如本地緩存的更新等等。另外,消費關系除了組内組間,可能會有多級樹狀關系。這種情況太過于複雜,一般不列入考慮範圍。是以,一般比較通用的設計是支援組間廣播,不同的組注冊不同的訂閱。組内的不同機器,如果注冊一個相同的ID,則單點傳播;如果注冊不同的ID(如IP位址+端口),則廣播。

至于廣播關系的維護,一般由于消息隊列本身都是叢集,是以都維護在公共存儲上,如config server、zookeeper等。維護廣播關系所要做的事情基本是一緻的:

  1. 發送關系的維護。
  2. 發送關系變更時的通知。

隊列進階特性設計

上面都是些消息隊列基本功能的實作,下面來看一些關于消息隊列特性相關的内容,不管可靠投遞/消息丢失與重複以及事務乃至于性能,不是每個消息隊列都會照顧到,是以要依照業務的需求,來仔細衡量各種特性實作的成本,利弊,最終做出最為合理的設計。

可靠投遞(最終一緻性)

這是個激動人心的話題,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能會重複,并且,在異常情況下,要接受消息的延遲。

方案說簡單也簡單,就是每當要發生不可靠的事情(RPC等)之前,先将消息落地,然後發送。當失敗或者不知道成功失敗(比如逾時)時,消息狀态是待發送,定時任務不停輪詢所有待發送消息,最終一定可以送達。

具體來說:

  1. producer往broker發送消息之前,需要做一次落地。
  2. 請求到server後,server確定資料落地後再告訴用戶端發送成功。
  3. 支援廣播的消息隊列需要對每個待發送的endpoint,持久化一個發送狀态,直到所有endpoint狀态都OK才可删除消息。

對于各種不确定(逾時、down機、消息沒有送達、送達後資料沒落地、資料落地了回複沒收到),其實對于發送方來說,都是一件事情,就是消息沒有送達。

重推消息所面臨的問題就是消息重複。重複和丢失就像兩個噩夢,你必須要面對一個。好在消息重複還有處理的機會,消息丢失再想找回就難了。

Anyway,作為一個成熟的消息隊列,應該盡量在各個環節減少重複投遞的可能性,不能因為重複有解決方案就放縱的亂投遞。

最後說一句,不是所有的系統都要求最終一緻性或者可靠投遞,比如一個論壇系統、一個招聘系統。一個重複的履歷或話題被釋出,可能比丢失了一個釋出顯得更讓使用者無法接受。不斷重複一句話,任何基礎元件要服務于業務場景。

消費确認

當broker把消息投遞給消費者後,消費者可以立即響應我收到了這個消息。但收到了這個消息隻是第一步,我能不能處理這個消息卻不一定。或許因為消費能力的問題,系統的負荷已經不能處理這個消息;或者是剛才狀态機裡面提到的消息不是我想要接收的消息,主動要求重發。

把消息的送達和消息的處理分開,這樣才真正的實作了消息隊列的本質-解耦。是以,允許消費者主動進行消費确認是必要的。當然,對于沒有特殊邏輯的消息,預設Auto Ack也是可以的,但一定要允許消費方主動ack。

對于正确消費ack的,沒什麼特殊的。但是對于reject和error,需要特别說明。reject這件事情,往往業務方是無法感覺到的,系統的流量和健康狀況的評估,以及處理能力的評估是一件非常複雜的事情。舉個極端的例子,收到一個消息開始build索引,可能這個消息要處理半個小時,但消息量卻是非常的小。是以reject這塊建議做成滑動視窗/線程池類似的模型來控制,

消費能力不比對的時候,直接拒絕,過一段時間重發,減少業務的負擔。

但業務出錯這件事情是隻有業務方自己知道的,就像上文提到的狀态機等等。這時應該允許業務方主動ack error,并可以與broker約定下次投遞的時間。

重複消息和順序消息

上文談到重複消息是不可能100%避免的,除非可以允許丢失,那麼,順序消息能否100%滿足呢? 答案是可以,但條件更為苛刻:

  1. 允許消息丢失。
  2. 從發送方到服務方到接受者都是單點單線程。

是以絕對的順序消息基本上是不能實作的,當然在METAQ/Kafka等pull模型的消息隊列中,單線程生産/消費,排除消息丢失,也是一種順序消息的解決方案。

一般來講,一個主流消息隊列的設計範式裡,應該是不丢消息的前提下,盡量減少重複消息,不保證消息的投遞順序。

談到重複消息,主要是兩個話題:

  1. 如何鑒别消息重複,并幂等的處理重複消息。
  2. 一個消息隊列如何盡量減少重複消息的投遞。

先來看看第一個話題,每一個消息應該有它的唯一身份。不管是業務方自定義的,還是根據IP/PID/時間戳生成的MessageId,如果有地方記錄這個MessageId,消息到來是能夠進行比對就

能完成重複的鑒定。資料庫的唯一鍵/bloom filter/分布式KV中的key,都是不錯的選擇。由于消息不能被永久存儲,是以理論上都存在消息從持久化存儲移除的瞬間上遊還在投遞的可能(上遊因種種原因投遞失敗,不停重試,都到了下遊清理消息的時間)。這種事情都是異常情況下才會發生的,畢竟是小衆情況。兩分鐘消息都還沒送達,多送一次又能怎樣呢?幂等的處理消息是一門藝術,因為種種原因重複消息或者錯亂的消息還是來到了,說兩種通用的解決方案:

  1. 版本号。
  2. 狀态機。
版本号

舉個簡單的例子,一個産品的狀态有上線/下線狀态。如果消息1是下線,消息2是上線。不巧消息1判重失敗,被投遞了兩次,且第二次發生在2之後,如果不做重複性判斷,顯然最終狀态是錯誤的。

但是,如果每個消息自帶一個版本号。上遊發送的時候,标記消息1版本号是1,消息2版本号是2。如果再發送下線消息,則版本号标記為3。下遊對于每次消息的處理,同時維護一個版本号。

每次隻接受比目前版本号大的消息。初始版本為0,當消息1到達時,将版本号更新為1。消息2到來時,因為版本号>1.可以接收,同時更新版本号為2.當另一條下線消息到來時,如果版本号是3.則是真實的下線消息。如果是1,則是重複投遞的消息。

如果業務方隻關心消息重複不重複,那麼問題就已經解決了。但很多時候另一個頭疼的問題來了,就是消息順序如果和想象的順序不一緻。比如應該的順序是12,到來的順序是21。則最後會發生狀态錯誤。

參考TCP/IP協定,如果想讓亂序的消息最後能夠正确的被組織,那麼就應該隻接收比目前版本号大一的消息。并且在一個session周期内要一直儲存各個消息的版本号。

如果到來的順序是21,則先把2存起來,待1到來後,先處理1,再處理2,這樣重複性和順序性要求就都達到了。

狀态機

基于版本号來處理重複和順序消息聽起來是個不錯的主意,但凡事總有瑕疵。使用版本号的最大問題是:

  1. 對發送方必須要求消息帶業務版本号。
  2. 下遊必須存儲消息的版本号,對于要嚴格保證順序的。

還不能隻存儲最新的版本号的消息,要把亂序到來的消息都存儲起來。而且必須要對此做出處理。試想一個永不過期的"session",比如一個物品的狀态,會不停流轉于上下線。那麼中間環節的所有存儲

就必須保留,直到在某個版本号之前的版本一個不丢的到來,成本太高。

就剛才的場景看,如果消息沒有版本号,該怎麼解決呢?業務方隻需要自己維護一個狀态機,定義各種狀态的流轉關系。例如,"下線"狀态隻允許接收"上線"消息,“上線”狀态隻能接收“下線消息”,如果上線收到上線消息,或者下線收到下線消息,在消息不丢失和上遊業務正确的前提下。要麼是消息發重了,要麼是順序到達反了。這時消費者隻需要把“我不能處理這個消息”告訴投遞者,要求投遞者過一段時間重發即可。而且重發一定要有次數限制,比如5次,避免死循環,就解決了。

舉例子說明,假設産品本身狀态是下線,1是上線消息,2是下線消息,3是上線消息,正常情況下,消息應該的到來順序是123,但實際情況下收到的消息狀态變成了3123。

那麼下遊收到3消息的時候,判斷狀态機流轉是下線->上線,可以接收消息。然後收到消息1,發現是上線->上線,拒絕接收,要求重發。然後收到消息2,狀态是上線->下線,于是接收這個消息。

此時無論重發的消息1或者3到來,還是可以接收。另外的重發,在一定次數拒絕後停止重發,業務正确。

中間件對于重複消息的處理

回歸到消息隊列的話題來講。上述通用的版本号/狀态機/ID判重解決方案裡,哪些是消息隊列該做的、哪些是消息隊列不該做業務方處理的呢?其實這裡沒有一個完全嚴格的定義,但回到我們的出發點,我們保證不丢失消息的情況下盡量少重複消息,消費順序不保證。那麼重複消息下和亂序消息下業務的正确,應該是由消費方保證的,我們要做的是減少消息發送的重複。

我們無法定義業務方的業務版本号/狀态機,如果API裡強制需要指定版本号,則顯得過于綁架客戶了。況且,在消費方維護這麼多狀态,就涉及到一個消費方的消息落地/多機間的同步消費狀态問題,複雜度指數級上升,而且隻能解決部分問題。

減少重複消息的關鍵步驟:

  1. broker記錄MessageId,直到投遞成功後清除,重複的ID到來不做處理,這樣隻要發送者在清除周期内能夠感覺到消息投遞成功,就基本不會在server端産生重複消息。
  2. 對于server投遞到consumer的消息,由于不确定對端是在處理過程中還是消息發送丢失的情況下,有必要記錄下投遞的IP位址。決定重發之前詢問這個IP,消息處理成功了嗎?如果詢問無果,再重發。

事務

持久性是事務的一個特性,然而隻滿足持久性卻不一定能滿足事務的特性。還是拿扣錢/加錢的例子講。滿足事務的一緻性特征,則必須要麼都不進行,要麼都能成功。

解決方案從大方向上有兩種:

  1. 兩階段送出,分布式事務。
  2. 本地事務,本地落地,補償發送。

分布式事務存在的最大問題是成本太高,兩階段送出協定,對于仲裁down機或者單點故障,幾乎是一個無解的黑洞。對于交易密集型或者I/O密集型的應用,沒有辦法承受這麼高的網絡延遲,系統複雜性。

并且成熟的分布式事務一定建構與比較靠譜的商用DB和商用中間件上,成本也太高。

那如何使用本地事務解決分布式事務的問題呢?以本地和業務在一個資料庫執行個體中建表為例子,與扣錢的業務操作同一個事務裡,将消息插入本地資料庫。如果消息入庫失敗,則業務復原;如果消息入庫成功,事務送出。

然後發送消息(注意這裡可以實時發送,不需要等定時任務檢出,以提高消息實時性)。以後的問題就是前文的最終一緻性問題所提到的了,隻要消息沒有發送成功,就一直靠定時任務重試。

這裡有一個關鍵的點,本地事務做的,是業務落地和消息落地的事務,而不是業務落地和RPC成功的事務。這裡很多人容易混淆,如果是後者,無疑是事務嵌套RPC,是大忌,會有長事務死鎖等各種風險。

而消息隻要成功落地,很大程度上就沒有丢失的風險(磁盤實體損壞除外)。而消息隻要投遞到服務端确認後本地才做删除,就完成了producer->broker的可靠投遞,并且當消息存儲異常時,業務也是可以復原的。

本地事務存在兩個最大的使用障礙:

  1. 配置較為複雜,“綁架”業務方,必須本地資料庫執行個體提供一個庫表。
  2. 對于消息延遲高敏感的業務不适用。

話說回來,不是每個業務都需要強事務的。扣錢和加錢需要事務保證,但下單和生成短信卻不需要事務,不能因為要求發短信的消息存儲投遞失敗而要求下單業務復原。是以,一個完整的消息隊列應該定義清楚自己可以投遞的消息類型,如事務型消息,本地非持久型消息,以及服務端不落地的非可靠消息等。對不同的業務場景做不同的選擇。另外事務的使用應該盡量低成本、透明化,可以依托于現有的成熟架構,如Spring的聲明式事務做擴充。業務方隻需要使用@Transactional标簽即可。

性能相關

異步/同步

首先澄清一個概念,異步,同步和oneway是三件事。異步,歸根結底你還是需要關心結果的,但可能不是當時的時間點關心,可以用輪詢或者回調等方式處理結果;同步是需要當時關心

的結果的;而oneway是發出去就不管死活的方式,這種對于某些完全對可靠性沒有要求的場景還是适用的,但不是我們重點讨論的範疇。

回歸來看,任何的RPC都是存在用戶端異步與服務端異步的,而且是可以任意組合的:用戶端同步對服務端異步,用戶端異步對服務端異步,用戶端同步對服務端同步,用戶端異步對服務端同步。

對于用戶端來說,同步與異步主要是拿到一個Result,還是Future(Listenable)的差別。實作方式可以是線程池,NIO或者其他事件機制,這裡先不展開講。

服務端異步可能稍微難了解一點,這個是需要RPC協定支援的。參考servlet 3.0規範,服務端可以吐一個future給用戶端,并且在future done的時候通知用戶端。

整個過程可以參考下面的代碼:

用戶端同步服務端異步。

Future<Result> future = request(server);//server立刻傳回future
synchronized(future){
while(!future.isDone()){
   future.wait();//server處理結束後會notify這個future,并修改isdone标志
}
}
return future.get();
           

用戶端同步服務端同步。

Result result = request(server);
           

用戶端異步服務端同步(這裡用線程池的方式)。

Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
    result = request(server);
}})
return future;
           

用戶端異步服務端異步。

Future<Result> future = request(server);//server立刻傳回future

return future
           

上面說了這麼多,其實是想讓大家脫離兩個誤區:

  1. RPC隻有用戶端能做異步,服務端不能。
  2. 異步隻能通過線程池。

那麼,服務端使用異步最大的好處是什麼呢?說到底,是解放了線程和I/O。試想服務端有一堆I/O等待處理,如果每個請求都需要同步響應,每條消息都需要結果立刻傳回,那麼就幾乎沒法做I/O合并

(當然接口可以設計成batch的,但可能batch發過來的仍然數量較少)。而如果用異步的方式傳回給用戶端future,就可以有機會進行I/O的合并,把幾個批次發過來的消息一起落地(這種合并對于MySQL等允許batch insert的資料庫效果尤其明顯),并且徹底釋放了線程。不至于說來多少請求開多少線程,能夠支援的并發量直線提高。

來看第二個誤區,傳回future的方式不一定隻有線程池。換句話說,可以線上程池裡面進行同步操作,也可以進行異步操作,也可以不使用線程池使用異步操作(NIO、事件)。

回到消息隊列的議題上,我們當然不希望消息的發送阻塞主流程(前面提到了,server端如果使用異步模型,則可能因消息合并帶來一定程度上的消息延遲),是以可以先使用線程池送出一個發送請求,主流程繼續往下走。

但是線程池中的請求關心結果嗎?Of course,必須等待服務端消息成功落地,才算是消息發送成功。是以這裡的模型,準确地說事用戶端半同步半異步(使用線程池不阻塞主流程,但線程池中的任務需要等待server端的傳回),server端是純異步。用戶端的線程池wait在server端吐回的future上,直到server端處理完畢,才解除阻塞繼續進行。

總結一句,同步能夠保證結果,異步能夠保證效率,要合理的結合才能做到最好的效率。

批量

談到批量就不得不提生産者消費者模型。但生産者消費者模型中最大的痛點是:消費者到底應該何時進行消費。大處着眼來看,消費動作都是事件驅動的。主要事件包括:

  1. 攢夠了一定數量。
  2. 到達了一定時間。
  3. 隊列裡有新的資料到來。

對于及時性要求高的資料,可用采用方式3來完成,比如用戶端向服務端投遞資料。隻要隊列有資料,就把隊列中的所有資料刷出,否則将自己挂起,等待新資料的到來。

在第一次把隊列資料往外刷的過程中,又積攢了一部分資料,第二次又可以形成一個批量。僞代碼如下:

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//這裡由于共享隊列,Runnable可以複用,故做成全局的
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在這個過程中會有新的消息到來,如果4個線程都占滿,隊列就有機會囤新的消息
   }
});
public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}
           

這種方式是消息延遲和批量的一個比較好的平衡,但優先響應低延遲。延遲的最高程度由上一次發送的等待時間決定。但可能造成的問題是發送過快的話批量的大小不夠滿足性能的極緻。

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
   flush();
},500,500,TimeUnits.MILLS);
private Runnable task = new Runnable({//這裡由于共享隊列,Runnable可以複用,顧做成全局的。
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在這個過程中會有新的消息到來,如果4個線程都占滿,隊列就有機會屯新的消息。
   }
});
public void send(Message message){
    last = System.currentMills();
    queue.offer(message);
    flush();
}
private void flush(){
 if(queue.size>200||System.currentMills()-last>200){
       executor.submit(task)
  }
}
           

相反對于可以用适量的延遲來換取高性能的場景來說,用定時/定量二選一的方式可能會更為理想,既到達一定數量才發送,但如果數量一直達不到,也不能幹等,有一個時間上限。

具體說來,在上文的submit之前,多判斷一個時間和數量,并且Runnable内部維護一個定時器,避免沒有新任務到來時舊的任務永遠沒有機會觸發發送條件。對于server端的資料落地,使用這種方式就非常友善。

最後啰嗦幾句,曾經有人問我,為什麼網絡請求小包合并成大包會提高性能?主要原因有兩個:

  1. 減少無謂的請求頭,如果你每個請求隻有幾位元組,而頭卻有幾十位元組,無疑效率非常低下。
  2. 減少回複的ack包個數。把請求合并後,ack包數量必然減少,确認和重發的成本就會降低。

push還是pull

上文提到的消息隊列,大多是針對push模型的設計。現在市面上有很多經典的也比較成熟的pull模型的消息隊列,如Kafka、MetaQ等。這跟JMS中傳統的push方式有很大的差別,可謂另辟蹊徑。

我們簡要分析下push和pull模型各自存在的利弊。

慢消費

慢消費無疑是push模型最大的緻命傷,穿成流水線來看,如果消費者的速度比發送者的速度慢很多,勢必造成消息在broker的堆積。假設這些消息都是有用的無法丢棄的,消息就要一直在broker端儲存。當然這還不是最緻命的,最緻命的是broker給consumer推送一堆consumer無法處理的消息,consumer不是reject就是error,然後來回踢皮球。

反觀pull模式,consumer可以按需消費,不用擔心自己處理不了的消息來騷擾自己,而broker堆積消息也會相對簡單,無需記錄每一個要發送消息的狀态,隻需要維護所有消息的隊列和偏移量就可以了。是以對于建立索引等慢消費,消息量有限且到來的速度不均勻的情況,pull模式比較合适。

消息延遲與忙等

這是pull模式最大的短闆。由于主動權在消費方,消費方無法準确地決定何時去拉取最新的消息。如果一次pull取到消息了還可以繼續去pull,如果沒有pull取到則需要等待一段時間重新pull。

但等待多久就很難判定了。你可能會說,我可以有xx動态pull取時間調整算法,但問題的本質在于,有沒有消息到來這件事情決定權不在消費方。也許1分鐘内連續來了1000條消息,然後半個小時沒有新消息産生,

可能你的算法算出下次最有可能到來的時間點是31分鐘之後,或者60分鐘之後,結果下條消息10分鐘後到了,是不是很讓人沮喪?

當然也不是說延遲就沒有解決方案了,業界較成熟的做法是從短時間開始(不會對broker有太大負擔),然後指數級增長等待。比如開始等5ms,然後10ms,然後20ms,然後40ms……直到有消息到來,然後再回到5ms。

即使這樣,依然存在延遲問題:假設40ms到80ms之間的50ms消息到來,消息就延遲了30ms,而且對于半個小時來一次的消息,這些開銷就是白白浪費的。

在阿裡的RocketMq裡,有一種優化的做法-長輪詢,來平衡推拉模型各自的缺點。基本思路是:消費者如果嘗試拉取失敗,不是直接return,而是把連接配接挂在那裡wait,服務端如果有新的消息到來,把連接配接notify起來,這也是不錯的思路。但海量的長連接配接block對系統的開銷還是不容小觑的,還是要合理的評估時間間隔,給wait加一個時間上限比較好~

順序消息

如果push模式的消息隊列,支援分區,單分區隻支援一個消費者消費,并且消費者隻有确認一個消息消費後才能push送另外一個消息,還要發送者保證全局順序唯一,聽起來也能做順序消息,但成本太高了,尤其是必須每個消息消費确認後才能發下一條消息,這對于本身堆積能力和慢消費就是瓶頸的push模式的消息隊列,簡直是一場災難。

反觀pull模式,如果想做到全局順序消息,就相對容易很多:

  1. producer對應partition,并且單線程。
  2. consumer對應partition,消費确認(或批量确認),繼續消費即可。

是以對于日志push送這種最好全局有序,但允許出現小誤差的場景,pull模式非常合适。如果你不想看到通篇亂套的日志~~

Anyway,需要順序消息的場景還是比較有限的而且成本太高,請慎重考慮。

總結

本文從為何使用消息隊列開始講起,然後主要介紹了如何從零開始設計一個消息隊列,包括RPC、事務、最終一緻性、廣播、消息确認等關鍵問題。并對消息隊列的push、pull模型做了簡要分析,最後從批量和異步角度,分析了消息隊列性能優化的思路。下篇會着重介紹一些進階話題,如存儲系統的設計、流控和錯峰的設計、公平排程等。希望通過這些,讓大家對消息隊列有個提綱挈領的整體認識,并給自主開發消息隊列提供思路。另外,本文主要是源自自己在開發消息隊列中的思考和讀源碼時的體會,比較不"官方",也難免會存在一些漏洞,歡迎大家多多交流。

後續我們還會推出消息隊列設計進階篇,内容會涵蓋以下方面:

  • pull模型消息系統設計理念
  • 存儲子系統設計
  • 流量控制
  • 公平排程