AMQP協定
Advanced Message Queuing Protocol,應用層進階消息隊列協定,基于此協定的用戶端與消息中間件可傳遞消息,不受用戶端/中間件同産品、不同的開發語言等條件的限制。
RabbitMQ遵循了AMQP協定。
基本概念

Broker RabbitMQ伺服器
Exchange 交換機,消息首先發送到指定exchange,然後按照routing key 路由到指定Queue
Queue 消息的載體,每條消息都會被投送到一個或多個隊列中
Binding 綁定關系,将Exchange和Queue按照指定路由規則綁定起來
Routing 路由關鍵字,Exchange根據Routing Key進行消息投遞
Vhost 一個Broker可以包含多個vhost,一個vhost包含一組Exchange、Queue和Binding等
Producer 生産者
Consumer 消費者
Connection Producer和Consumer與Broker之間的TCP長連接配接
Channel 信道,每個連接配接裡可以建立多個Channel,每個Channel代表一個會話任務
交換機分類
Direct Exchange
直連交換機,與隊列綁定時需要指定一個明确的binding key。發送消息到該交換機時,隻有routing key跟binding key完全比對,綁定的隊列才能收到消息。
例如binding key為‘faith.account’,那麼routing key 需也為‘faith.account’才能路由到指定隊列。
Topic Exchange
主題交換機,與隊列綁定時,可使用通配符定義routing key。
*代表比對一個單詞。#代表比對零個或者多個單詞。單詞與單詞之間用 . 隔開。
發送消息時,routing key符合binding key的模式時,綁定的隊列才能收到消息。
例如:binding key 為‘#.faith.#’,那麼routing key為‘faith.message’路由到該隊列。
Fanout Exchange
廣播交換機,與隊列綁定時不需要指定binding key。
當消息發送到該類型的交換機時,所有與之綁定的隊列都能收到消息。
死信
當消息:
消費者拒絕消息并且消息沒有重新入隊;
消息過期,通過設定消息的ttl(time to live)屬性可以實作;
隊列達到最大長度,此時最先入隊的消息會被發送到DLX
時會進入DLX(Dead Letter Exchange)。
通過設定死信隊列(Dead Letter Queue)與DLX綁定,可以接收死信,并通過監聽該DLQ消費消息。
流控
參數設定
rabbitmq.config 檔案中配置預設0.4的記憶體門檻值,當rabbitmq占用記憶體超過40% 時會抛出記憶體警告并阻塞所有連接配接。
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
且預設剩餘磁盤空間在 1GB 以下,也會主動阻塞所有的生産者。
消費端限流
設定prefetch值,例如1,表示當該消費者消費的消息有1條未被确認時,不進行新的消費。
prefetch沒有預設值。如果沒有設定,隊列預設會把所有消息都發給消費者,在消費者沒有ACK的情況下,發了多少就會産生多少Unacked。
如果prefetch是1,那麼隻要一條消息沒有收到消費者的ACK,後續的消息都不會發送到這個消費者,造成消息堵塞。
可靠性投遞
生産者到broker
這個階段主要解決消息投遞的可靠性,一般兩種解決方案:Transaction(事務)模式和Confirm(确認)模式。
事務模式影響性能,一般使用confirm模式,消息正常到達exchange後會傳回給生産者資訊,以spring為例,開啟需要如下配置:
<rabbit:connection-factory publisher-confirms="true" publisher-returns="true" />
<rabbit:template mandatory="true" />
publisher-confirms="true"
和
publisher-returns="true"
,表示開啟消息确認模式以及消息傳回模式。
其次
mandatory="true"
,表示如果exchange根據自身類型和消息routingKey無法路由到指定queue時,broker會調用basic.return方法将消息返還給生産者,當mandatory為false時,出現上述情況broker會直接将消息丢棄。
如果消息投遞失敗,則啟用重複投遞方式,例如投遞5次,5次失敗之後告警并存入DB中。
這裡還可以使用try-catch,如代碼執行失敗,一樣重試或存入DB。
需要注意的是事務機制和确認機制是互斥的。如果企圖将已開啟事務模式的信道再設定為publisher confirm模式,或者企圖将已開啟publisher confirm模式的信道設定為事務模式的話,RabbitMQ會報錯:
cannot switch from tx to confirm mode
例如rabbitTemplate還設定了:
channel-transacted="true"
會與确認機制配置發生沖突。
消息存儲可靠性
當機、重新開機、關閉等情況可能導緻消息丢失。解決方案:
隊列持久化
交換機持久化
消息持久化
叢集,鏡像隊列
隊列和交換機的持久化均可在對象聲明時指定,消息的持久化可以在發送時指定。
消息消費時的可靠性
這一階段可采用多種方式綜合運用。
消息确認機制
使用消息确認機制(message acknowledgement),消費者訂閱隊列時可指定autoAck參數,當autoAck為false時,RabbitMQ會等待消費者顯式地回複确認信号後才從隊列中移去消息。
如果消息消費失敗,也可以調用Basic.Reject或者Basic.Nack來拒絕目前消息而不是确認。如果requeue參數為true,該消息可重新入隊,以便重發。
如果不确認,會導緻prefetch數量+1,如果prefetch為1,則導緻該消費者阻塞,不再收到broker推送的消息。
消費者回調
消費者處理消息以後,可以再發送一條消息給生産者,或者調用生産者的API,告知消息處理完畢。
例如支付中異步通信的回執,多次互動。如支付寶支付後會回調支付發起應用的回調函數,并有失敗重發機制。
補償機制
對于一定時間沒有得到響應的消息,可以設定一個定時重發的機制,但要控制次數,比如最多重發3次,否則會造成消息堆積。
例如支付寶回調失敗重發就是補償機制的應用。
RabbitMQ叢集
叢集主要用于實作高可用與負載均衡。
RabbitMQ通過/var/lib/rabbitmq/.erlang.cookie來驗證身份,需要在所有節點上保持一緻。
叢集有兩種節點類型,一種是磁盤節點,一種是記憶體節點。叢集中至少需要一個磁盤節點以實作中繼資料的持久化,未指定類型的情況下,預設為磁盤節點。
叢集模式有兩種,一種是普通模式,普通模式中的queue内的消息實體隻存在于
其中一個節點,其餘節點中僅有相同的隊列的結構。例如叢集節點為A和B,消息實體在A中。當consumer從B節點消費時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息取出并經過B發送給consumer。是以consumer應盡量連接配接每一個節點,防止消息都從一個出口出來。
當A故障後,B節點無法取到A節點中的消息實體。如果做了消息持久化,那麼A節點恢複後才能被消費;否則消息丢失。
叢集另一種模式是鏡像模式,該模式和普通模式不同之處在于,消息實體會主動在鏡像節點間同步,而不是在用戶端取資料時臨時拉取。但該模式副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,叢集内部的
網絡帶寬将會被這種同步通訊大大消耗,是以隻在可靠性要求較高的場合中适用。
RabbitMQ鏡像隊列
叢集方式下,隊列和消息是無法在節點之間同步,是以需要使用RabbitMQ的鏡像隊列機制進行同步。
鏡像隊列機制能将queue鏡像到cluster中其他的節點之上。如果叢集中的一個節點失效了,queue能自動切換到鏡像中的另一個節點以保證服務的可用性。
每個鏡像隊列都包含一個master和多個slave,分别對應于不同的節點。slave會按照master執行指令的順序進行指令執行,故slave與master上維護的狀态是相同的。除了publish外所有動作都隻會向master發送,然後由master将指令執行的結果廣播給slave們,故看似從鏡像隊列中的消費操作實際上是在master上執行的。
RabbitMQ的鏡像隊列支援publisher confirm和事務機制。事務機制中,隻有目前事務在全部鏡像queue中執行之後,用戶端才會收到Tx.CommitOk的消息。同樣的,在publisher confirm機制中,向publisher進行目前message确認的前提是該message被全部鏡像所接受了。
鏡像隊列配置通過添加policy完成:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可選參數,針對指定vhost下的queue進行設定
Name: policy的名稱
Pattern: queue的比對模式(正規表達式)
Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明鏡像隊列的模式,有效值為 all/exactly/nodes
all:表示在叢集中所有的節點上進行鏡像
exactly:表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定
nodes:表示在指定的節點上進行鏡像,節點名稱通過ha-params指定
ha-params:ha-mode模式需要用到的參數
ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic和manual
priority:可選參數,policy的優先級
示例:
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
或通過web管理界面添加:
消息分發機制
Round-Robin(輪詢)
預設的政策,消費者輪流、平均地收到消息。
Fair dispatch (公平分發)
根據消費者的處理能力來分發消息,可以用basicQos(int prefetch_count)來設定。
prefetch_count:當消費者有多少條消息沒有響應ACK時,不再給這個消費者發送消息。