消息隊列的常見問題
消息的丢失可能會出現在三個地方:
生産者将資料發送到RabbitMQ的時候,可能資料就在半路給搞丢了,因為網絡啥的問題,都有可能。怎麼解決?
事務:生産者發送資料之前開啟RabbitMQ事務(channel.txSelect),然後發送消息,如果消息沒有成功被RabbitMQ接收到,那麼生産者會收到異常報錯,此時就可以復原事務(channel.txRollback),然後重試發送消息;如果收到了消息,可以送出事務(channel.txCommit)。但是問題是,RabbitMQ事務機制一搞,基本上吞吐量會下來,因為太耗性能。
confirm模式:在生産者那裡設定開啟confirm模式之後,你每次寫的消息都會配置設定一個唯一的id,然後如果寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。如果RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在記憶體裡維護每個消息id的狀态,如果超過一定時間還沒接收到這個消息的回調,那麼你可以重發。
是以一般在生産者這塊避免資料丢失,都是用confirm機制的。
就是RabbitMQ自己弄丢了資料,這個你必須開啟RabbitMQ的持久化,就是消息寫入之後會持久化到磁盤,哪怕是RabbitMQ自己挂了,恢複之後會自動讀取之前存儲的資料,一般資料不會丢。
設定持久化有兩個步驟:
第一個是建立queue和交換器的時候将其設定為持久化,這樣就可以保證RabbitMQ持久化相關的中繼資料,但是不會持久化queue裡的資料;
第二個是發送消息的時候将消息的deliveryMode設定為2,就是将消息設定為持久化的,此時RabbitMQ就會将消息持久化到磁盤上去
必須要同時設定這兩個持久化才行
持久化可以和生産者的confirm結合,當持久化成功後,再ack生産者。如果持久化之前RabbitMQ挂了,生産者沒收到ack,會重發。
哪怕是你給RabbitMQ開啟了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,但是還沒來得及持久化到磁盤上,結果不巧,此時RabbitMQ挂了,就會導緻記憶體裡的一點點資料會丢失。
RabbitMQ如果沒有丢失了資料消費者丢失了資料,主要是因為你消費的時候,剛消費到,還沒處理,結果程序挂了,比如重新開機了,那麼就尴尬了,RabbitMQ認為你都消費了,這資料就丢了。
這個時候得用RabbitMQ提供的ack機制,簡單來說,就是你關閉RabbitMQ自動ack,可以通過一個api來調用就行,然後每次你自己代碼裡確定處理完的時候,再在程式裡用aip調用ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認為你還沒處理完,這個時候RabbitMQ會把這個消費配置設定給别的consumer去處理,消息是不會丢的。
唯一可能導緻消費者弄丢資料的情況,就是說,你那個消費到了這個消息,然後消費者那邊自動送出了offset,讓kafka以為你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你自己就挂了,此時這條消息就丢咯。
大家都知道kafka會自動送出offset,那麼隻要關閉自動送出offset,在處理完之後自己手動送出offset,就可以保證資料不會丢。但是此時确實還是會重複消費,比如你剛處理完,還沒送出offset,結果自己挂了,此時肯定會重複消費一次,自己保證幂等性就好了。
生産環境碰到的一個問題,就是說我們的kafka消費者消費到了資料之後是寫到一個記憶體的queue裡先緩沖一下,結果有的時候,你剛把消息寫入記憶體queue,然後消費者會自動送出offset。
然後此時我們重新開機了系統,就會導緻記憶體queue裡還沒來得及處理的資料就丢失了
這塊比較常見的一個場景,就是kafka某個broker當機,然後重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些資料沒有同步,結果此時leader挂了,然後選舉某個follower成leader之後,他不就少了一些資料?這就丢了一些資料啊。
生産環境也遇到過,我們也是,之前kafka的leader機器當機了,将follower切換為leader之後,就會發現說這個資料就丢了。
是以此時一般是要求起碼設定如下4個參數:
給這個topic設定replication.factor參數:這個值必須大于1,要求每個partition必須有至少2個副本。
在kafka服務端設定min.insync.replicas參數:這個值必須大于1,這個是要求一個leader至少感覺到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確定leader挂了還有一個follower吧。
在producer端設定acks=all:這個是要求每條資料,必須是寫入所有replica之後,才能認為是寫成功了。
在producer端設定retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裡了。
如果按照上述的思路設定了ack=all,一定不會丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之後,才認為本次寫成功了。如果沒滿足這個條件,生産者會自動不斷的重試,重試無限次。
從根本上說,異步消息是不應該有順序依賴的。在MQ上估計是沒法解決。要實作嚴格的順序消息,簡單且可行的辦法就是:保證生産者- MQServer -消費者是一對一對一的關系。
如果有順序依賴的消息,要保證消息有一個hashKey,類似于資料庫表分區的的分區key列。保證對同一個key的消息發送到相同的隊列。A使用者産生的消息(包括建立消息和删除消息)都按A的hashKey分發到同一個隊列。隻需要把強相關的兩條消息基于相同的路由就行了,也就是說經過m1和m2的在路由表裡的路由是一樣的,那自然m1會優先于m2去投遞。而且一個queue隻對應一個consumer。
分為兩大類情況:1、生産者消息重複發送; 2.MQ向消費者投遞時重複投遞
終極解決辦法:幂等性
例如,一個 SQL 操作:
這個操作多次執行,id 等于 1 的記錄中的 count 字段的值都為 10 (每一次的count都不是上一次傳回的count,都是原始的count),這個操作就是幂等的,我們不用擔心這個操作被重複。
再來看另外一個 SQL 操作:
這樣的 SQL 操作就不是幂等的(每一次的count都是上一次傳回的count,都不是原始的count),一旦重複,結果就會産生變化。
是以應對消息重複的辦法是使消息接收端的處理是一個幂等操作。這樣的做法降低了消息中間件的整體複雜性,不過也給使用消息中間件的消息接收端應用帶來了一定的限制和門檻。
1. MVCC:
多版本并發控制,樂觀鎖的一種實作,在生産者發送消息時進行資料更新時需要帶上資料的版本号,消費者去更新時需要去比較持有資料的版本号,版本号不一緻的操作無法成功。
例如部落格點贊次數自動+1的接口:
每一個version隻有一次執行成功的機會,一旦失敗了生産者必須重新擷取資料的最新版本号再次發起更新。
2. 去重表:
利用資料庫表單的特性來實作幂等,常用的一個思路是在表上建構唯一性索引,保證某一類資料一旦執行完畢,後續同樣的請求不再重複處理了(利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那麼就不再處理這條消息。)
以電商平台為例子,電商平台上的訂單 id 就是最适合的 token。當使用者下單時,會經曆多個環節,比如生成訂單,減庫存,減優惠券等等。每一個環節執行時都先檢測一下該訂單id是否已經執行過這一步驟,對未執行的請求,執行操作并緩存結果,而對已經執行過此 id 的請求,則直接傳回之前的執行結果,不做任何操作。這樣可以在最大程度上避免操作的重複執行問題,緩存起來的執行結果也能用于事務的控制等。
因為在某些情況下我們扔進MQ中的消息是要嚴格保證順序的,尤其涉及到訂單什麼的業務需求,消費的時候也是要嚴格保證順序,不然會出大問題的。
先看看順序會錯亂的倆場景
rabbitmq:一個queue,多個consumer,這不明顯亂了
kafka:一個topic,一個partition,一個consumer,内部多線程,這不也明顯亂了
如何來保證消息的順序性呢?
rabbitmq:拆分多個queue,每個queue一個consumer,就是多一些queue而已,确實是麻煩點;或者就一個queue但是對應一個consumer,然後這個consumer内部用記憶體隊列做排隊,然後分發給底層不同的worker來處理。
kafka:一個topic,一個partition,一個consumer,内部單線程消費,寫N個記憶體queue,然後N個線程分别消費一個記憶體queue即可。
幾千萬條資料在MQ裡積壓了七八個小時,從下午4點多,積壓到了晚上很晚,10點多,11點多
這個是我們真實遇到過的一個場景,确實是線上故障了,這個時候要不然就是修複consumer的問題,讓他恢複消費速度,然後傻傻的等待幾個小時消費完畢。這個肯定不能在面試的時候說吧。
一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條,1000多萬條,是以如果你積壓了幾百萬到上千萬的資料,即使消費者恢複了,也需要大概1小時的時間才能恢複過來。
一般這個時候,隻能操作臨時緊急擴容了,具體操作步驟和思路如下:
假設你用的是rabbitmq,rabbitmq是可以設定過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個資料就沒了。那這就是第二個坑了。這就不是說資料會大量積壓在mq裡,而是大量的資料會直接搞丢。
這個情況下,就不是說要增加consumer消費積壓的消息,因為實際上沒啥積壓,而是丢了大量的消息。我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景幹過。就是大量積壓的時候,我們當時就直接丢棄資料了,然後等過了高峰期以後,比如大家一起喝咖啡熬夜到晚上12點以後,使用者都睡覺了。這個時候我們就開始寫程式,将丢失的那批資料,寫個臨時程式,一點一點的查出來,然後重新灌入mq裡面去,把白天丢的資料給他補回來。也隻能是這樣了。
假設1萬個訂單積壓在mq裡面,沒有處理,其中1000個訂單都丢了,你隻能手動寫程式把那1000個訂單給查出來,手動發到mq裡去再補一次。
如果走的方式是消息積壓在mq裡,那麼如果你很長時間都沒處理掉,此時導緻mq都快寫滿了,咋辦?這個還有别的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程式,接入資料來消費,消費一個丢棄一個,都不要了,快速消費掉所有的消息。然後走第二個方案,到了晚上再補資料吧。
由于筆者隻使用和實踐過RabbitMQ和Kafka,RocketMQ和ActiveMQ了解的不深,是以分析一下RabbitMQ和Kafka的高可用。
RabbitMQ有三種模式:單機模式,普通叢集模式,鏡像叢集模式
(1)單機模式
單機模式平常使用在開發或者本地測試場景,一般就是測試是不是能夠正确的處理消息,生産上基本沒人去用單機模式,風險很大。
(2)普通叢集模式
普通叢集模式就是啟動多個RabbitMQ執行個體。在你建立的queue,隻會放在一個rabbtimq執行個體上,但是每個執行個體都同步queue的中繼資料。在消費的時候完了,上如果連接配接到了另外一個執行個體,那麼那個執行個體會從queue所在執行個體上拉取資料過來。
這種方式确實很麻煩,也不怎麼好,沒做到所謂的分布式,就是個普通叢集。因為這導緻你要麼消費者每次随機連接配接一個執行個體然後拉取資料,要麼固定連接配接那個queue所在執行個體消費資料,前者有資料拉取的開銷,後者導緻單執行個體性能瓶頸。
而且如果那個放queue的執行個體當機了,會導緻接下來其他執行個體就無法從那個執行個體拉取,如果你開啟了消息持久化,讓RabbitMQ落地存儲消息的話,消息不一定會丢,得等這個執行個體恢複了,然後才可以繼續從這個queue拉取資料。
這方案主要是提高吞吐量的,就是說讓叢集中多個節點來服務某個queue的讀寫操作。
(3)鏡像叢集模式
鏡像叢集模式是所謂的RabbitMQ的高可用模式,跟普通叢集模式不一樣的是,你建立的queue,無論中繼資料還是queue裡的消息都會存在于多個執行個體上,然後每次你寫消息到queue的時候,都會自動把消息到多個執行個體的queue裡進行消息同步。
優點在于你任何一個執行個體當機了,沒事兒,别的執行個體都可以用。缺點在于性能開銷太大和擴充性很低,同步所有執行個體,這會導緻網絡帶寬和壓力很重,而且擴充性很低,每增加一個執行個體都會去包含已有的queue的所有資料,并沒有辦法線性擴充queue。
開啟鏡像叢集模式可以去RabbitMQ的管理控制台去增加一個政策,指定要求資料同步到所有節點的,也可以要求就同步到指定數量的節點,然後你再次建立queue的時候,應用這個政策,就會自動将資料同步到其他的節點上去了。
Kafka天生就是一個分布式的消息隊列,它可以由多個broker組成,每個broker是一個節點;你建立一個topic,這個topic可以劃分為多個partition,每個partition可以存在于不同的broker上,每個partition就放一部分資料。
kafka 0.8以前,是沒有HA機制的,就是任何一個broker當機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。
kafka 0.8以後,提供了HA機制,就是replica副本機制。kafka會均勻的将一個partition的所有replica分布在不同的機器上,來提高容錯性。每個partition的資料都會同步到吉他機器上,形成自己的多個replica副本。然後所有replica會選舉一個leader出來,那麼生産和消費都去leader,其他replica就是follower,leader會同步資料給follower。當leader挂了會自動去找replica,然後會再選舉一個leader出來,這樣就具有高可用性了。
寫資料的時候,生産者就寫leader,然後leader将資料落地寫本地磁盤,接着其他follower自己主動從leader來pull資料。一旦所有follower同步好資料了,就會發送ack給leader,leader收到所有follower的ack之後,就會傳回寫成功的消息給生産者。(當然,這隻是其中一種模式,還可以适當調整這個行為)
消費的時候,隻會從leader去讀,但是隻有一個消息已經被所有follower都同步成功傳回ack的時候,這個消息才會被消費者讀到。