RabbitMQ
RabbitMQ 是一款開源的,Erlang 編寫的,基于 AMQP(進階消息隊列協定)協定的消息中間件。
RabbitMQ 的一些重要概念
- Broker: 簡單來說就是消息隊列伺服器實體
- Producer: 消息生産者
- Consumer: 消息消費者
重要的是消費者和生産者是消息發送和消息接受的展現,而非用戶端和服務端。
- Channel: 信道是建立在真實的 TCP 連接配接内的虛拟連接配接。AMQP 指令都是通過信道發送出去的。在用戶端的每個連接配接裡,可建立多個 channel,每個 channel 代表一個會話任務。(你也許問為啥不直接使用 TCP 連結發送 AMQP 指令呢?主要原因在于對于作業系統來說建立和銷毀 TCP 繪畫是非常昂貴的開銷。使用信道,你能夠根據應用需要,盡可能的建立并行的傳輸層,而不會被 TCP 連結限制所限制。)
AMQP 消息路由必須由三部分:交換器、隊列和綁定。
生産者把消息釋出到交換器上;
消息最終到達隊列,并被消費者接收;
綁定決定了消息如何從路由器路由到特定的隊列。
- Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列;對負載均衡來說,隊列是絕佳方案。隻需要附加一堆消費者,并讓 RabbitMQ 以循環的方式均勻配置設定發來的消息。
隊列的一些有用參數:
- exclusive——如果設定為 true 的話,隊列将程式設計私有的,此時隻有你的應用程式才能夠消費隊列消息。當你想限制一個隊列隻有一個消費者的時候很有幫助。
- auto-delete——當最後一個消費者取消訂閱的時候,隊列就會自動移除。
- Exchange: 交換器,當你想要将消息投遞到隊列時,你通過把消息投遞給交換器來完成。它指定消息按什麼規則,路由到哪個隊列。這些規則被稱為路由鍵(routing key)。隊列通過路由鍵綁定到交換器。
- Binding: 綁定,它的作用就是把 exchange 和 queue 按照路由規則綁定起來。
- Routing Key: 路由關鍵字,exchange 根據這個關鍵字進行消息投遞
- VHost: vhost 可以了解為虛拟 broker ,即 mini-RabbitMQ server。其内部均含有獨立的 queue、exchange 和 binding 等,但最最重要的是,其擁有獨立的權限系統,可以做到 vhost 範圍的使用者控制。當然,從 RabbitMQ 的全局角度,vhost 可以作為不同權限隔離的手段(一個典型的例子就是不同的應用可以跑在不同的 vhost 中)。
持久化消息
關于再 Rabbit 裡建立隊列和交換器有個不可告人的秘密:預設情況下它們無法幸免于伺服器重新開機。當重新開機 RabbitMQ 伺服器後,那些隊列和交換器就都消失了(随同裡面的消息)。能從 AMQP 伺服器崩潰中回複消息,我們稱之為持久化消息,要做到持久化必須以下三點:
- 把它的投遞模式選項設定為 2(持久)
- 發送到持久化的交換器
- 到達持久化的隊列
MQ 的特點
簡答
- 異步處理 - 相比于傳統的串行、并行方式,提高了系統吞吐量。
- 應用解耦 - 系統間通過消息通信,不用關心其他系統的處理。
- 流量削鋒 - 可以通過消息隊列長度控制請求量;可以緩解短時間内的高并發請求。
- 日志處理 - 解決大量日志傳輸。
- 消息通訊 - 消息隊列一般都内置了高效的通信機制,是以也可以用在純的消息通訊。比如實作點對點消息隊列,或者聊天室等。
詳答
主要是:解耦、異步、削峰。
解耦:A 系統發送資料到 BCD 三個系統,通過接口調用發送。如果 E 系統也要這個資料呢?那如果 C 系統現在不需要了呢?A 系統負責人幾乎崩潰…A 系統跟其它各種亂七八糟的系統嚴重耦合,A 系統産生一條比較關鍵的資料,很多系統都需要 A 系統将這個資料發送過來。如果使用 MQ,A 系統産生一條資料,發送到 MQ 裡面去,哪個系統需要資料自己去 MQ 裡面消費。如果新系統需要資料,直接從 MQ 裡消費即可;如果某個系統不需要這條資料了,就取消對 MQ 消息的消費即可。這樣下來,A 系統壓根兒不需要去考慮要給誰發送資料,不需要維護這個代碼,也不需要考慮人家是否調用成功、失敗逾時等情況。
就是一個系統或者一個子產品,調用了多個系統或者子產品,互相之間的調用很複雜,維護起來很麻煩。但是其實這個調用是不需要直接同步調用接口的,如果用 MQ 給它異步化解耦。
異步:A 系統接收一個請求,需要在自己本地寫庫,還需要在 BCD 三個系統寫庫,自己本地寫庫要 3ms,BCD 三個系統分别寫庫要 300ms、450ms、200ms。最終請求總延時是 3 + 300 + 450 + 200 = 953ms,接近 1s,使用者感覺搞個什麼東西,慢死了慢死了。使用者通過浏覽器發起請求。如果使用 MQ,那麼 A 系統連續發送 3 條消息到 MQ 隊列中,假如耗時 5ms,A 系統從接受一個請求到傳回響應給使用者,總時長是 3 + 5 = 8ms。
削峰:減少高峰時期對伺服器壓力。
Rabbit MQ 有什麼優缺點?
優點上面已經說了,就是在特殊場景下有其對應的好處,解耦、異步、削峰。
缺點有以下幾個:
系統可用性降低
本來系統運作好好的,現在你非要加入個消息隊列進去,那消息隊列挂了,你的系統不是呵呵了。是以,系統可用性會降低;
系統複雜度提高
加入了消息隊列,要多考慮很多方面的問題,比如:一緻性問題、如何保證消息不被重複消費、如何保證消息可靠性傳輸等。是以,需要考慮的東西更多,複雜性增大。
一緻性問題
A 系統處理完了直接傳回成功了,人都以為你這個請求就成功了;但是問題是,要是 BCD 三個系統那裡,BD 兩個系統寫庫成功了,結果 C 系統寫庫失敗了,咋整?你這資料就不一緻了。
是以消息隊列實際是一種非常複雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉,做好之後,你會發現,媽呀,系統複雜度提升了一個數量級,也許是複雜了 10 倍。但是關鍵時刻,用,還是得用的。
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什麼優缺點?
ActiveMQ | RabbitMQ | RocketMQ | Kafka | ZeroMQ | |
---|---|---|---|---|---|
單機吞吐量 | 比RabbitMQ低 | 2.6w/s(消息做持久化) | 11.6w/s | 17.3w/s | 29w/s |
開發語言 | Java | Erlang | Java | Scala/Java | |
訂閱形式 | 點對點(p2p)、廣播(釋出-訂閱) | 提供了4種:direct, topic ,Headers和fanout。fanout就是廣播模式 | 基于topic/messageTag以及按照消息類型、屬性進行正則比對的釋出訂閱模式 | 基于topic以及按照topic進行正則比對的釋出訂閱模式 | 點對點(p2p) |
持久化 | 支援少量堆積 | 支援少量堆積 | 支援大量堆積 | 支援大量堆積 | 不支援 |
順序消息 | 不支援 | 不支援 | 支援 | 支援 | 不支援 |
性能穩定性 | 好 | 好 | 一般 | 較差 | 很好 |
叢集方式 | 支援簡單叢集模式,比如’主-備’,對進階叢集模式支援不好。 | 支援簡單叢集,'複制’模式,對進階叢集模式支援不好。 | 常用 多對’Master-Slave’ 模式,開源版本需手動切換Slave變成Master | 天然的‘Leader-Slave’無狀态叢集,每台伺服器既是Master也是Slave | 不支援 |
管理界面 | 一般 | 較好 | 一般 | 無 | 無 |
綜上,各種對比之後,有如下建議:
一般的業務系統要引入 MQ,最早大家都用 ActiveMQ,但是現在确實大家用的不多了,沒經過大規模吞吐量場景的驗證,社群也不是很活躍,是以大家還是算了吧,我個人不推薦用這個了;
後來大家開始用 RabbitMQ,但是确實 erlang 語言阻止了大量的 Java 工程師去深入研究和掌控它,對公司而言,幾乎處于不可控的狀态,但是确實人家是開源的,比較穩定的支援,活躍度也高;
不過現在确實越來越多的公司會去用 RocketMQ,确實很不錯,畢竟是阿裡出品,但社群可能有突然黃掉的風險(目前 RocketMQ 已捐給 Apache,但 GitHub 上的活躍度其實不算高)對自己公司技術實力有絕對自信的,推薦用 RocketMQ,否則回去老老實實用 RabbitMQ 吧,人家有活躍的開源社群,絕對不會黃。
是以中小型公司,技術實力較為一般,技術挑戰不是特别高,用 RabbitMQ 是不錯的選擇;大型公司,基礎架構研發實力較強,用 RocketMQ 是很好的選擇。
如果是大資料領域的實時計算、日志采集等場景,用 Kafka 是業内标準的,絕對沒問題,社群活躍度很高,絕對不會黃,何況幾乎是全世界這個領域的事實性規範。
如何保證RabbitMQ消息的順序性?
拆分多個 queue,每個 queue 一個 consumer,就是多一些 queue 而已,确實是麻煩點;或者就一個 queue 但是對應一個 consumer,然後這個 consumer 内部用記憶體隊列做排隊,然後分發給底層不同的 worker 來處理。
消息如何分發?
若該隊列至少有一個消費者訂閱,消息将以循環(round-robin)的方式發送給消費者。每條消息隻會分發給一個訂閱的消費者(前提是消費者能夠正常處理消息并進行确認)。通過路由可實作多消費的功能
消息怎麼路由?
生産者将消息釋出到交換器時,消息将擁有一個路由鍵(routing key),在消息建立時設定。
通過隊列路由鍵,可以把隊列綁定到交換器上。
消息到達交換器後,RabbitMQ 會将消息的路由鍵與隊列的路由鍵進行比對(針對不同的交換器有不同的路由規則);
交換器有四種類型,每一種類型實作了不同的路由算法:
headers:比對 AMQP 消息的 header 而非路由鍵。除此之外,headers 交換器與 direct 交換器完全一緻,但性能會差很多。是以幾乎不用。
direct:如果路由鍵完全比對,消息就被投遞到對應的隊列。
fanout:如果交換器收到消息,它會把消息投遞給所有綁定在此交換器上的隊列。這允許你對單條消息做出不同方式的反應。
topic:可以使來自不同源頭的消息能夠到達同一個隊列。
“*” 操作符将 “.” 視為分隔符;
“#” 操作符沒有分塊的概念,他将任意 “.” 字元均視為關鍵字的一部分。
如何確定消息正确地發送至 RabbitMQ? 如何確定消息接收方消費了消息?
發送方确認模式
将信道設定成 confirm 模式,而且隻能通過重新建立信道來關閉該設定。一旦信道進入 confirm 模式,所有在信道上釋出的消息都會被指派一個唯一的 ID(從 1 開始)。一旦消息被投遞給所有比對的隊列後,信道會發送一個發送方确認模式給生産者應用程式(包含消息的唯一 ID)這使得生産者知曉消息已經安全到達目的隊列了。如果消息和隊列是持久化的,那麼确認消息隻會在隊列将消息寫入磁盤後才會發出。
如果 RabbitMQ 發生内部錯誤進而導緻消息丢失,會發送一條 nack(not acknowledged,未确認)消息。
發送方确認模式是異步的,生産者應用程式在等待确認的同時,可以繼續發送消息。當确認消息到達生産者應用程式,生産者應用程式的回調方法就會被觸發來處理确認消息。
接收方确認機制
消費者接收每一條消息後都必須進行确認(消息接收和消息确認是兩個不同操作)。隻有消費者确認了消息,RabbitMQ 才能安全地把消息從隊列中删除。
這裡并沒有用到逾時機制,RabbitMQ 僅通過 Consumer 的連接配接中斷來确認是否需要重新發送消息。也就是說,隻要連接配接不中斷,RabbitMQ 給了 Consumer 足夠長的時間來處理消息。保證資料的最終一緻性;
下面羅列幾種特殊情況
- 如果消費者接收到消息,在确認之前斷開了連接配接或取消訂閱,RabbitMQ 會認為消息沒有被分發,然後重新分發給下一個訂閱的消費者。(可能存在消息重複消費的隐患,需要去重)
- 如果消費者接收到消息卻沒有确認消息,連接配接也未斷開,則 RabbitMQ 認為該消費者繁忙,将不會給該消費者分發更多的消息。
如何保證RabbitMQ消息的可靠傳輸?
消息不可靠的情況可能是消息丢失,劫持等原因;
丢失又分為:生産者丢失消息、消息清單丢失消息、消費者丢失消息;
生産者丢失消息:從生産者弄丢資料這個角度來看,RabbitMQ提供 transaction 和 confirm 模式來確定生産者不丢消息;
transaction 機制就是說:發送消息前,開啟事務(channel.txSelect()),然後發送消息,如果發送過程中出現什麼異常,事務就會復原(channel.txRollback()),如果發送成功則送出事務(channel.txCommit())。然而,這種方式有個缺點:吞吐量下降;
confirm 模式用的居多:一旦 channel 進入 confirm 模式,所有在該信道上釋出的消息都将會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有比對的隊列之後;
RabbitMQ 就會發送一個 ACK 給生産者(包含消息的唯一ID),這就使得生産者知道消息已經正确到達目的隊列了;
如果 RabbitMQ 沒能處理該消息,則會發送一個 Nack 消息給你,你可以進行重試操作。
消息隊列丢資料:消息持久化。
處理消息隊列丢資料的情況,一般是開啟持久化磁盤的配置。
這個持久化配置可以和 confirm 機制配合使用,你可以在消息持久化磁盤後,再給生産者發送一個 Ack 信号。
這樣,如果消息持久化磁盤之前,RabbitMQ 陣亡了,那麼生産者收不到 Ack 信号,生産者會自動重發。
那麼如何持久化呢?上文已經提到。也就是下面三步
- 将 queue 的持久化辨別 durable 設定為 true ,則代表是一個持久的隊列
- 将 exchange 的持久化辨別 durable 設定為 true ,則代表是一個持久的交換器
- 發送消息的時候将 deliveryMode = 2
這樣設定以後,即使 RabbitMQ 挂了,重新開機後也能恢複資料
消費者丢失消息:消費者丢資料一般是因為采用了自動确認消息模式,改為手動确認消息即可!
消費者在收到消息之後,處理消息之前,會自動回複 RabbitMQ 已收到消息;
如果這時處理消息失敗,就會丢失該消息;
解決方案:處理消息成功後,手動回複确認消息。
參考文章
- 參考書籍《RabbitMQ 實戰》
- 消息中間件MQ與RabbitMQ面試題(2020最新版)