消息隊列常見問題和解決方案
說明:此文是筆者對中華石衫老師對消息隊列講解的一篇總結包括筆者自己的一些了解
一、為什麼使用消息隊列?
消息隊列使用的場景和中間件有很多,但解決的核心問題主要是:異步、解耦、消峰填谷。
二、消息隊列的優缺點
異步、解耦、消峰填谷這是消息隊列最大的優點,除了這些消息隊列還可以會解決一些我們特殊業務場景的問題。但是缺點主要在于系統的可用性、複雜性、一緻性問題,引入消息隊列後,需要考慮MQ的可用性,萬一MQ崩潰了豈不是要爆炸?而且複雜性明顯提高了,需要考慮一些消息隊列的常見問題和解決方案,還有就是一緻性問題,一條消息由多個消費者消費,萬一有一個消費者消費失敗了,就會導緻資料不一緻。
三、消息隊列選型
目前常見和使用廣泛的MQ有ActiveMQ、RabbitMQ、RocketMQ、Kakfa,其特性如下:

個人總結:
ActiveMQ早期用的比較多,但是現在貌似用的都不是很多了,網上也沒有大規模吞吐量的使用案例分析,社群也貌似不是很活躍了,如果是新項目不建議采用ActiveMQ。
RabbitMQ現在使用的較為多一些,社群活躍度也很高,功能也很強大,官方還提供了管理的web界面,性能也很好,但是RabbitMQ性能好的主要原因是因為使用erlang語言開發的,erlang語言貌似天生性能好,但對于我們java開發者來說,源碼基本看不懂,更别提深入的研究了,不過spring推出了rabbit的支援,貌似還比較好用,比自己去封裝實作并且去處理一些問題的要好多了。
RocketMQ現在開始用的人也比較多,很多人對于RocketMQ的看法是內建了Kafka和RabbitMQ的有點,是阿裡開源的産品,貌似現在是捐贈給了Apache,其源碼是java寫的,功能十分強大并且是經過阿裡大規模應用的,能經過阿裡實踐使用的一般來說可靠性和可用性都是相當高的,但是也存在一些小問題,現在RocketMQ雖然使用的人好像越來越多了,但是文檔資料還是比較少,含金量不怎麼高,并且阿裡開源的有不維護的風險,就像dubbo中間也用2年沒維護,有實力的團隊應該沒有什麼問題,小公司小團隊需要考慮一下使用RocketMQ。
Kafka就不多說了,Kafka可以說是業内标準,基本上大資料領域的實時計算、日志、資料處理都是用kafka,開源社群異常活躍,而且像現在阿裡雲、騰訊雲都推出了Kafka的雲服務,是以說Kafka就不說了,絕對沒問題,放心大膽的用吧。
最後給一個個人選型意見(不一定對啊),如果是小公司小團隊最好采用Kafka和RabbitMQ,有實力的團隊可以去搞一搞RocketMQ。
四、如何保證消息隊列的高可用性
由于筆者隻使用和實踐過RabbitMQ和Kafka,RocketMQ和ActiveMQ了解的不深,是以分析一下RabbitMQ和Kafka的高可用。
(一)RabbitMQ
RabbitMQ有三種模式:單機模式,普通叢集模式,鏡像叢集模式
(1)單機模式
單機模式平常使用在開發或者本地測試場景,一般就是測試是不是能夠正确的處理消息,生産上基本沒人去用單機模式,風險很大。
(2)普通叢集模式
普通叢集模式就是啟動多個RabbitMQ執行個體。在你建立的queue,隻會放在一個rabbtimq執行個體上,但是每個執行個體都同步queue的中繼資料。在消費的時候完了,上如果連接配接到了另外一個執行個體,那麼那個執行個體會從queue所在執行個體上拉取資料過來。
這種方式确實很麻煩,也不怎麼好,沒做到所謂的分布式,就是個普通叢集。因為這導緻你要麼消費者每次随機連接配接一個執行個體然後拉取資料,要麼固定連接配接那個queue所在執行個體消費資料,前者有資料拉取的開銷,後者導緻單執行個體性能瓶頸。
而且如果那個放queue的執行個體當機了,會導緻接下來其他執行個體就無法從那個執行個體拉取,如果你開啟了消息持久化,讓RabbitMQ落地存儲消息的話,消息不一定會丢,得等這個執行個體恢複了,然後才可以繼續從這個queue拉取資料。
這方案主要是提高吞吐量的,就是說讓叢集中多個節點來服務某個queue的讀寫操作。
(3)鏡像叢集模式
鏡像叢集模式是所謂的RabbitMQ的高可用模式,跟普通叢集模式不一樣的是,你建立的queue,無論中繼資料還是queue裡的消息都會存在于多個執行個體上,然後每次你寫消息到queue的時候,都會自動把消息到多個執行個體的queue裡進行消息同步。
優點在于你任何一個執行個體當機了,沒事兒,别的執行個體都可以用。缺點在于性能開銷太大和擴充性很低,同步所有執行個體,這會導緻網絡帶寬和壓力很重,而且擴充性很低,每增加一個執行個體都會去包含已有的queue的所有資料,并沒有辦法線性擴充queue。
開啟鏡像叢集模式可以去RabbitMQ的管理控制台去增加一個政策,指定要求資料同步到所有節點的,也可以要求就同步到指定數量的節點,然後你再次建立queue的時候,應用這個政策,就會自動将資料同步到其他的節點上去了。
(二)Kafka
Kafka天生就是一個分布式的消息隊列,它可以由多個broker組成,每個broker是一個節點;你建立一個topic,這個topic可以劃分為多個partition,每個partition可以存在于不同的broker上,每個partition就放一部分資料。
kafka 0.8以前,是沒有HA機制的,就是任何一個broker當機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。
kafka 0.8以後,提供了HA機制,就是replica副本機制。kafka會均勻的将一個partition的所有replica分布在不同的機器上,來提高容錯性。每個partition的資料都會同步到吉他機器上,形成自己的多個replica副本。然後所有replica會選舉一個leader出來,那麼生産和消費都去leader,其他replica就是follower,leader會同步資料給follower。當leader挂了會自動去找replica,然後會再選舉一個leader出來,這樣就具有高可用性了。
寫資料的時候,生産者就寫leader,然後leader将資料落地寫本地磁盤,接着其他follower自己主動從leader來pull資料。一旦所有follower同步好資料了,就會發送ack給leader,leader收到所有follower的ack之後,就會傳回寫成功的消息給生産者。(當然,這隻是其中一種模式,還可以适當調整這個行為)
消費的時候,隻會從leader去讀,但是隻有一個消息已經被所有follower都同步成功傳回ack的時候,這個消息才會被消費者讀到。
五、如何保證消息消費時的幂等性
其實消息重複消費的主要原因在于回饋機制(RabbitMQ是ack,Kafka是offset),在某些場景中我們采用的回饋機制不同,原因也不同,例如消費者消費完消息後回複ack, 但是剛消費完還沒來得及送出系統就重新開機了,這時候上來就pull消息的時候由于沒有送出ack或者offset,消費的還是上條消息。
那麼如何怎麼來保證消息消費的幂等性呢?實際上我們隻要保證多條相同的資料過來的時候隻處理一條或者說多條處理和處理一條造成的結果相同即可,但是具體怎麼做要根據業務需求來定,例如入庫消息,先查一下消息是否已經入庫啊或者說搞個唯一限制啊什麼的,還有一些是天生保證幂等性就根本不用去管,例如redis就是天然幂等性。
還有一個問題,消費者消費消息的時候在某些場景下要放過消費不了的消息,遇到消費不了的消息通過日志記錄一下或者搞個什麼措施以後再來處理,但是一定要放過消息,因為在某些場景下例如spring-rabbitmq的預設回饋政策是出現異常就沒有送出ack,導緻了一直在重發那條消費異常的消息,而且一直還消費不了,這就尴尬了,後果你會懂的。
六、如何保證消息的可靠性傳輸?
由于筆者隻使用和實踐過RabbitMQ和Kafka,RocketMQ和ActiveMQ了解的不深,是以分析一下RabbitMQ和Kafka的消息可靠性傳輸的問題。、
(一)RabbitMQ
(1)生産者弄丢了資料
生産者将資料發送到RabbitMQ的時候,可能資料就在半路給搞丢了,因為網絡啥的問題,都有可能。此時可以選擇用RabbitMQ提供的事務功能,就是生産者發送資料之前開啟RabbitMQ事務(channel.txSelect),然後發送消息,如果消息沒有成功被RabbitMQ接收到,那麼生産者會收到異常報錯,此時就可以復原事務(channel.txRollback),然後重試發送消息;如果收到了消息,那麼可以送出事務(channel.txCommit)。但是問題是,RabbitMQ事務機制一搞,基本上吞吐量會下來,因為太耗性能。
是以一般來說,如果你要確定說寫RabbitMQ的消息别丢,可以開啟confirm模式,在生産者那裡設定開啟confirm模式之後,你每次寫的消息都會配置設定一個唯一的id,然後如果寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。如果RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在記憶體裡維護每個消息id的狀态,如果超過一定時間還沒接收到這個消息的回調,那麼你可以重發。
事務機制和cnofirm機制最大的不同在于,事務機制是同步的,你送出一個事務之後會阻塞在那兒,但是confirm機制是異步的,你發送個消息之後就可以發送下一個消息,然後那個消息RabbitMQ接收了之後會異步回調你一個接口通知你這個消息接收到了。
是以一般在生産者這塊避免資料丢失,都是用confirm機制的。
(2)RabbitMQ弄丢了資料
就是RabbitMQ自己弄丢了資料,這個你必須開啟RabbitMQ的持久化,就是消息寫入之後會持久化到磁盤,哪怕是RabbitMQ自己挂了,恢複之後會自動讀取之前存儲的資料,一般資料不會丢。除非極其罕見的是,RabbitMQ還沒持久化,自己就挂了,可能導緻少量資料會丢失的,但是這個機率較小。
設定持久化有兩個步驟,第一個是建立queue的時候将其設定為持久化的,這樣就可以保證RabbitMQ持久化queue的中繼資料,但是不會持久化queue裡的資料;第二個是發送消息的時候将消息的deliveryMode設定為2,就是将消息設定為持久化的,此時RabbitMQ就會将消息持久化到磁盤上去。必須要同時設定這兩個持久化才行,RabbitMQ哪怕是挂了,再次重新開機,也會從磁盤上重新開機恢複queue,恢複這個queue裡的資料。
而且持久化可以跟生産者那邊的confirm機制配合起來,隻有消息被持久化到磁盤之後,才會通知生産者ack了,是以哪怕是在持久化到磁盤之前,RabbitMQ挂了,資料丢了,生産者收不到ack,你也是可以自己重發的。
哪怕是你給RabbitMQ開啟了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,但是還沒來得及持久化到磁盤上,結果不巧,此時RabbitMQ挂了,就會導緻記憶體裡的一點點資料會丢失。
(3)消費端弄丢了資料
RabbitMQ如果丢失了資料,主要是因為你消費的時候,剛消費到,還沒處理,結果程序挂了,比如重新開機了,那麼就尴尬了,RabbitMQ認為你都消費了,這資料就丢了。
這個時候得用RabbitMQ提供的ack機制,簡單來說,就是你關閉RabbitMQ自動ack,可以通過一個api來調用就行,然後每次你自己代碼裡確定處理完的時候,再程式裡ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認為你還沒處理完,這個時候RabbitMQ會把這個消費配置設定給别的consumer去處理,消息是不會丢的。
(二)Kafka
(1)消費端弄丢了資料
唯一可能導緻消費者弄丢資料的情況,就是說,你那個消費到了這個消息,然後消費者那邊自動送出了offset,讓kafka以為你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你自己就挂了,此時這條消息就丢咯。
大家都知道kafka會自動送出offset,那麼隻要關閉自動送出offset,在處理完之後自己手動送出offset,就可以保證資料不會丢。但是此時确實還是會重複消費,比如你剛處理完,還沒送出offset,結果自己挂了,此時肯定會重複消費一次,自己保證幂等性就好了。
生産環境碰到的一個問題,就是說我們的kafka消費者消費到了資料之後是寫到一個記憶體的queue裡先緩沖一下,結果有的時候,你剛把消息寫入記憶體queue,然後消費者會自動送出offset。
然後此時我們重新開機了系統,就會導緻記憶體queue裡還沒來得及處理的資料就丢失了
(2)kafka弄丢了資料
這塊比較常見的一個場景,就是kafka某個broker當機,然後重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些資料沒有同步,結果此時leader挂了,然後選舉某個follower成leader之後,他不就少了一些資料?這就丢了一些資料啊。
生産環境也遇到過,我們也是,之前kafka的leader機器當機了,将follower切換為leader之後,就會發現說這個資料就丢了。
是以此時一般是要求起碼設定如下4個參數:
- 給這個topic設定replication.factor參數:這個值必須大于1,要求每個partition必須有至少2個副本。
- 在kafka服務端設定min.insync.replicas參數:這個值必須大于1,這個是要求一個leader至少感覺到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確定leader挂了還有一個follower吧。
- 在producer端設定acks=all:這個是要求每條資料,必須是寫入所有replica之後,才能認為是寫成功了。
- 在producer端設定retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裡了。
(3)生産者會不會弄丢資料
如果按照上述的思路設定了ack=all,一定不會丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之後,才認為本次寫成功了。如果沒滿足這個條件,生産者會自動不斷的重試,重試無限次。
六、如何保證消息的順序性
因為在某些情況下我們扔進MQ中的消息是要嚴格保證順序的,尤其涉及到訂單什麼的業務需求,消費的時候也是要嚴格保證順序,不然會出大問題的。
先看看順序會錯亂的倆場景
- rabbitmq:一個queue,多個consumer,這不明顯亂了
-
kafka:一個topic,一個partition,一個consumer,内部多線程,這不也明顯亂了
如何來保證消息的順序性呢?
- rabbitmq:拆分多個queue,每個queue一個consumer,就是多一些queue而已,确實是麻煩點;或者就一個queue但是對應一個consumer,然後這個consumer内部用記憶體隊列做排隊,然後分發給底層不同的worker來處理。
- kafka:一個topic,一個partition,一個consumer,内部單線程消費,寫N個記憶體queue,然後N個線程分别消費一個記憶體queue即可。
七、如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以後該怎麼處理?有幾百萬消息持續積壓幾小時怎麼解決?
(一)、大量消息在mq裡積壓了幾個小時了還沒解決
幾千萬條資料在MQ裡積壓了七八個小時,從下午4點多,積壓到了晚上很晚,10點多,11點多
這個是我們真實遇到過的一個場景,确實是線上故障了,這個時候要不然就是修複consumer的問題,讓他恢複消費速度,然後傻傻的等待幾個小時消費完畢。這個肯定不能在面試的時候說吧。
一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條,1000多萬條,是以如果你積壓了幾百萬到上千萬的資料,即使消費者恢複了,也需要大概1小時的時間才能恢複過來。
一般這個時候,隻能操作臨時緊急擴容了,具體操作步驟和思路如下:
- 先修複consumer的問題,確定其恢複消費速度,然後将現有cnosumer都停掉。
- 建立一個topic,partition是原來的10倍,臨時建立好原先10倍或者20倍的queue數量。
- 然後寫一個臨時的分發資料的consumer程式,這個程式部署上去消費積壓的資料,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數量的queue。
- 接着臨時征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的資料。
- 這種做法相當于是臨時将queue資源和consumer資源擴大10倍,以正常的10倍速度來消費資料。
- 等快速消費完積壓資料之後,得恢複原先部署架構,重新用原先的consumer機器來消費消息。
(二)、消息隊列過期失效問題
假設你用的是rabbitmq,rabbitmq是可以設定過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個資料就沒了。那這就是第二個坑了。這就不是說資料會大量積壓在mq裡,而是大量的資料會直接搞丢。
這個情況下,就不是說要增加consumer消費積壓的消息,因為實際上沒啥積壓,而是丢了大量的消息。我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景幹過。就是大量積壓的時候,我們當時就直接丢棄資料了,然後等過了高峰期以後,比如大家一起喝咖啡熬夜到晚上12點以後,使用者都睡覺了。
這個時候我們就開始寫程式,将丢失的那批資料,寫個臨時程式,一點一點的查出來,然後重新灌入mq裡面去,把白天丢的資料給他補回來。也隻能是這樣了。
假設1萬個訂單積壓在mq裡面,沒有處理,其中1000個訂單都丢了,你隻能手動寫程式把那1000個訂單給查出來,手動發到mq裡去再補一次。
(三)、消息隊列滿了怎麼搞?
如果走的方式是消息積壓在mq裡,那麼如果你很長時間都沒處理掉,此時導緻mq都快寫滿了,咋辦?這個還有别的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程式,接入資料來消費,消費一個丢棄一個,都不要了,快速消費掉所有的消息。然後走第二個方案,到了晚上再補資料吧。