天天看點

RabbitMQ消息追蹤之Firehose

在使用任何消息中間件的過程中,難免會出現某條消息異常丢失的情況。對于RabbitMQ而言,可能是因為生産者或消費者與RabbitMQ斷開了連接配接,而它們與RabbitMQ又采用了不同的确認機制;也有可能是因為交換器與隊列之間不同的轉發政策;甚至是交換器并沒有與任何隊列進行綁定,生産者又不感覺或者沒有采取相應的措施;另外RabbitMQ本身的叢集政策也可能導緻消息的丢失。這個時候就需要有一個較好的機制跟蹤記錄消息的投遞過程,以此協助開發和運維人員進行問題的定位。

在RabbitMQ中可以使用Firehose功能來實作消息追蹤,Firehose可以記錄每一次發送或者消費消息的記錄,友善使用RabbitMQ的使用者進行調試、排錯等。

Firehose的機制是将生産者投遞給RabbitMQ的消息,或者是RabbitMQ投遞給消費者的消息按照指定的格式發送到預設的交換器上。這個預設的交換器的名稱為amq.rabbitmq.trace,它是一個topic類型的交換器。發送到這個交換器上的消息的routingKey為publish.exchangename和deliver.queuename。其中exchangename和queuename為實際的交換器和隊列的名稱,分别對應生産者投遞到交換器的消息和消費者從隊列中擷取的消息。

開啟Firehose指令:

其中[-p vhost]是可選參數,用來指定vhost。

對應的關閉指令為:

注意Firehose預設情況處于關閉狀态,并且Firehose的狀态也是非持久化的,會在RabbitMQ服務重新開機的時候還原成預設的狀态。Firehose開啟之後多少會影響服務的性能,因為它會引起額外的消息生成、路由和存儲。

下面我們舉例說明下Firehose的用法。需要做一下準備工作,確定Firehose處于開啟狀态,建立7個隊列:queue、queue.another、queue1、queue2、queue3、queue4和queue5。之後再建立2個交換器exchange和exchange.another,分别通過綁定鍵rk和rk.another與queue和queue.another進行綁定。最後将amq.rabbitmq.trace這個關鍵的交換器與queue1、queue2、queue3、queue4和queue5綁定,詳細可以參考下圖。

RabbitMQ消息追蹤之Firehose

分别用用戶端向exchange和exchange.another中發送一條消息“trace test payload.”,然後再用用戶端消費隊列queue和queue.another中的消息。

此時queue1中有2條消息,queue2中有2條消息,queue3中有4條消息,而queue4和queue5中隻有一條消息。在向exchange發送一條消息後,amq.rabbitmq.trace分别向queue1、queue3和queue4發送一條内部封裝的消息。同樣,在想exchange.another中發送一條消息之後,對應的隊列queue1和queue3中會多一條消息。消費隊列queue的時候,queue2、queue3和queue5中會多一條消息,消費隊列queue.another的時候,queue2和queue3會多一條消息。“publish.#”比對發送到所有交換器的消息,“deliver.#”比對消費所有隊列的消息,而“#”則包含了“publish.#”和“deliver.#”。

在Firehose開啟狀态下,當有用戶端發送或者消費消息時,Firehose會自動封裝相應的消息體,并添加詳細的headers屬性。對于前面的将“trace test payload.”這條消息發送到交換器exchange來說,Firehore會将其封裝成如下的内容:

RabbitMQ消息追蹤之Firehose

在消費queue時,會将這條消息封裝成如下的内容:

RabbitMQ消息追蹤之Firehose

headers中的exchange_name表示發送此條消息的交換器;routing_keys表示與exchange_name對應的路由鍵清單;properties表示消息本身的屬性,比如delivery_mode=2表示消息需要持久化處理。

<a href="http://www.rabbitmq.com/firehose.html">http://www.rabbitmq.com/firehose.html</a>