天天看點

rabbitmqAMQP協定基本概念交換機分類死信流控可靠性投遞RabbitMQ叢集消息分發機制

AMQP協定

Advanced Message Queuing Protocol,應用層進階消息隊列協定,基于此協定的用戶端與消息中間件可傳遞消息,不受用戶端/中間件同産品、不同的開發語言等條件的限制。

RabbitMQ遵循了AMQP協定。

基本概念

rabbitmqAMQP協定基本概念交換機分類死信流控可靠性投遞RabbitMQ叢集消息分發機制
Broker RabbitMQ伺服器

Exchange 交換機,消息首先發送到指定exchange,然後按照routing key 路由到指定Queue

Queue 消息的載體,每條消息都會被投送到一個或多個隊列中

Binding 綁定關系,将Exchange和Queue按照指定路由規則綁定起來

Routing 路由關鍵字,Exchange根據Routing Key進行消息投遞

Vhost 一個Broker可以包含多個vhost,一個vhost包含一組Exchange、Queue和Binding等

Producer 生産者

Consumer 消費者

Connection Producer和Consumer與Broker之間的TCP長連接配接

Channel 信道,每個連接配接裡可以建立多個Channel,每個Channel代表一個會話任務           

交換機分類

Direct Exchange

直連交換機,與隊列綁定時需要指定一個明确的binding key。發送消息到該交換機時,隻有routing key跟binding key完全比對,綁定的隊列才能收到消息。

例如binding key為‘faith.account’,那麼routing key 需也為‘faith.account’才能路由到指定隊列。

Topic Exchange

主題交換機,與隊列綁定時,可使用通配符定義routing key。

*代表比對一個單詞。#代表比對零個或者多個單詞。單詞與單詞之間用 . 隔開。

發送消息時,routing key符合binding key的模式時,綁定的隊列才能收到消息。

例如:binding key 為‘#.faith.#’,那麼routing key為‘faith.message’路由到該隊列。

Fanout Exchange

廣播交換機,與隊列綁定時不需要指定binding key。

當消息發送到該類型的交換機時,所有與之綁定的隊列都能收到消息。

死信

當消息:

消費者拒絕消息并且消息沒有重新入隊;

消息過期,通過設定消息的ttl(time to live)屬性可以實作;

隊列達到最大長度,此時最先入隊的消息會被發送到DLX           

時會進入DLX(Dead Letter Exchange)。

通過設定死信隊列(Dead Letter Queue)與DLX綁定,可以接收死信,并通過監聽該DLQ消費消息。

流控

參數設定

rabbitmq.config 檔案中配置預設0.4的記憶體門檻值,當rabbitmq占用記憶體超過40% 時會抛出記憶體警告并阻塞所有連接配接。

[{rabbit, [{vm_memory_high_watermark, 0.4}]}].           

且預設剩餘磁盤空間在 1GB 以下,也會主動阻塞所有的生産者。

消費端限流

設定prefetch值,例如1,表示當該消費者消費的消息有1條未被确認時,不進行新的消費。

prefetch沒有預設值。如果沒有設定,隊列預設會把所有消息都發給消費者,在消費者沒有ACK的情況下,發了多少就會産生多少Unacked。

如果prefetch是1,那麼隻要一條消息沒有收到消費者的ACK,後續的消息都不會發送到這個消費者,造成消息堵塞。

可靠性投遞

rabbitmqAMQP協定基本概念交換機分類死信流控可靠性投遞RabbitMQ叢集消息分發機制

生産者到broker

這個階段主要解決消息投遞的可靠性,一般兩種解決方案:Transaction(事務)模式和Confirm(确認)模式。

事務模式影響性能,一般使用confirm模式,消息正常到達exchange後會傳回給生産者資訊,以spring為例,開啟需要如下配置:

<rabbit:connection-factory publisher-confirms="true" publisher-returns="true" />

<rabbit:template mandatory="true" />           

publisher-confirms="true"

publisher-returns="true"

,表示開啟消息确認模式以及消息傳回模式。

其次

mandatory="true"

,表示如果exchange根據自身類型和消息routingKey無法路由到指定queue時,broker會調用basic.return方法将消息返還給生産者,當mandatory為false時,出現上述情況broker會直接将消息丢棄。

如果消息投遞失敗,則啟用重複投遞方式,例如投遞5次,5次失敗之後告警并存入DB中。

這裡還可以使用try-catch,如代碼執行失敗,一樣重試或存入DB。

需要注意的是事務機制和确認機制是互斥的。如果企圖将已開啟事務模式的信道再設定為publisher confirm模式,或者企圖将已開啟publisher confirm模式的信道設定為事務模式的話,RabbitMQ會報錯:

cannot switch from tx to confirm mode           

例如rabbitTemplate還設定了:

channel-transacted="true"

會與确認機制配置發生沖突。

消息存儲可靠性

當機、重新開機、關閉等情況可能導緻消息丢失。解決方案:

隊列持久化

交換機持久化

消息持久化

叢集,鏡像隊列           

隊列和交換機的持久化均可在對象聲明時指定,消息的持久化可以在發送時指定。

消息消費時的可靠性

這一階段可采用多種方式綜合運用。

消息确認機制

使用消息确認機制(message acknowledgement),消費者訂閱隊列時可指定autoAck參數,當autoAck為false時,RabbitMQ會等待消費者顯式地回複确認信号後才從隊列中移去消息。

如果消息消費失敗,也可以調用Basic.Reject或者Basic.Nack來拒絕目前消息而不是确認。如果requeue參數為true,該消息可重新入隊,以便重發。

如果不确認,會導緻prefetch數量+1,如果prefetch為1,則導緻該消費者阻塞,不再收到broker推送的消息。

消費者回調

消費者處理消息以後,可以再發送一條消息給生産者,或者調用生産者的API,告知消息處理完畢。

例如支付中異步通信的回執,多次互動。如支付寶支付後會回調支付發起應用的回調函數,并有失敗重發機制。

補償機制

對于一定時間沒有得到響應的消息,可以設定一個定時重發的機制,但要控制次數,比如最多重發3次,否則會造成消息堆積。

例如支付寶回調失敗重發就是補償機制的應用。

RabbitMQ叢集

叢集主要用于實作高可用與負載均衡。

RabbitMQ通過/var/lib/rabbitmq/.erlang.cookie來驗證身份,需要在所有節點上保持一緻。

叢集有兩種節點類型,一種是磁盤節點,一種是記憶體節點。叢集中至少需要一個磁盤節點以實作中繼資料的持久化,未指定類型的情況下,預設為磁盤節點。

叢集模式有兩種,一種是普通模式,普通模式中的queue内的消息實體隻存在于

其中一個節點,其餘節點中僅有相同的隊列的結構。例如叢集節點為A和B,消息實體在A中。當consumer從B節點消費時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息取出并經過B發送給consumer。是以consumer應盡量連接配接每一個節點,防止消息都從一個出口出來。

當A故障後,B節點無法取到A節點中的消息實體。如果做了消息持久化,那麼A節點恢複後才能被消費;否則消息丢失。

叢集另一種模式是鏡像模式,該模式和普通模式不同之處在于,消息實體會主動在鏡像節點間同步,而不是在用戶端取資料時臨時拉取。但該模式副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,叢集内部的

網絡帶寬将會被這種同步通訊大大消耗,是以隻在可靠性要求較高的場合中适用。

RabbitMQ鏡像隊列

叢集方式下,隊列和消息是無法在節點之間同步,是以需要使用RabbitMQ的鏡像隊列機制進行同步。

鏡像隊列機制能将queue鏡像到cluster中其他的節點之上。如果叢集中的一個節點失效了,queue能自動切換到鏡像中的另一個節點以保證服務的可用性。

每個鏡像隊列都包含一個master和多個slave,分别對應于不同的節點。slave會按照master執行指令的順序進行指令執行,故slave與master上維護的狀态是相同的。除了publish外所有動作都隻會向master發送,然後由master将指令執行的結果廣播給slave們,故看似從鏡像隊列中的消費操作實際上是在master上執行的。

RabbitMQ的鏡像隊列支援publisher confirm和事務機制。事務機制中,隻有目前事務在全部鏡像queue中執行之後,用戶端才會收到Tx.CommitOk的消息。同樣的,在publisher confirm機制中,向publisher進行目前message确認的前提是該message被全部鏡像所接受了。

鏡像隊列配置通過添加policy完成:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

-p Vhost: 可選參數,針對指定vhost下的queue進行設定
Name: policy的名稱
Pattern: queue的比對模式(正規表達式)
Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode
    ha-mode:指明鏡像隊列的模式,有效值為 all/exactly/nodes
        all:表示在叢集中所有的節點上進行鏡像
        exactly:表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定
        nodes:表示在指定的節點上進行鏡像,節點名稱通過ha-params指定
    ha-params:ha-mode模式需要用到的參數
    ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic和manual
priority:可選參數,policy的優先級           

示例:

rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'           

或通過web管理界面添加:

rabbitmqAMQP協定基本概念交換機分類死信流控可靠性投遞RabbitMQ叢集消息分發機制

消息分發機制

Round-Robin(輪詢)

預設的政策,消費者輪流、平均地收到消息。           

Fair dispatch (公平分發)

根據消費者的處理能力來分發消息,可以用basicQos(int prefetch_count)來設定。

prefetch_count:當消費者有多少條消息沒有響應ACK時,不再給這個消費者發送消息。