天天看點

RabbitMQ消息發送與接收

  所有mq産品從模型抽象上來說都是一樣的過程。消費者訂閱某個隊列。生産者建立消息,然後釋出到隊列,最後将消息發送到監聽的消費者。

   amqp(advanced message queuing protocol)是一個提供統一消息服務的應用層标準協定,基于此協定的用戶端與消息中間件可傳遞消息,并不受用戶端、中間件等不同産品,不同開發語言等條件的限制。

  activemq是基于jms(java message service)協定的消息中間件。差別如下:

RabbitMQ消息發送與接收

 rabbit模型如下:

RabbitMQ消息發送與接收

 1.message。消息,是不具體的。由消息頭和消息體組成。消息體是不透明的,而消息頭是一系列可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(優先級)、delivery-mode(是否持久存儲)等

2.publisher。消息的生産者,也是一個向交換機釋出消息的用戶端應用程式。

3.exchanger。交換機,用來接收生産者釋出的消息并将這些消息路由給伺服器中的隊列。

4.binging。綁定,用于消息隊列和交換器之間的管理。一個綁定就是基于路由鍵将交換器和消息隊列連接配接起來的路由規則。是以可以将交換器了解成一個由綁定構成的路由表。

5.queue。消息隊列,用來儲存消息知道發送給消費者。一個消息可投入一個或對個隊列。

6.connection。網絡連接配接,比如一個tcp連接配接。

7.channel。信道,多路複用連接配接中的一條獨立的雙向資料流通道,可讀可寫。一個connection包括多個channel。因為對于作業系統來說建立和銷毀tcp是非常昂貴的開銷,是以引入信道的概念,以複用一條tcp連接配接。

8.consumer。消費者,從消息隊列取得消息的用戶端應用程式。

9.virtualhost。虛拟主機。表示一批交換機、消息隊列和相關對象。vhost本質上是一個mini版的rabbitmq伺服器,擁有自己的隊列、綁定、交換器和權限控制;vhost通過在各個執行個體間提供邏輯上分離,允許你為不同應用程式安全保密地運作資料;vhost是amqp概念的基礎,必須在連接配接時進行指定,rabbitmq包含了預設vhost:“/”。

10.borker。表示消息隊列伺服器實體。表示啟動一個rabbitmq所包含的程序。

不涉及交換機的模型如下:

RabbitMQ消息發送與接收

pom檔案引入如下依賴:

1.消息生産者 

  注意:5672是rabbitmq暴露的端口,15672是management插件的端口。

發送成功之後可以從15672端口檢視,也可以從15672進行消費,如下:

RabbitMQ消息發送與接收

 2.消息接收

  注意:消息的确認模式可以為自動也可以為手動,自動确認讀取完會自動從隊列删除;手動需要自己ack,如果設為手動也沒ack可能會造成消息重複消費。

  如果是多個消費者,會從隊列以輪詢的方式處理消息,這種稱為工作隊列模式。

補充:這種實際也是用了rabbitmq的一個預設交換機,routing_key為隊列名稱。也可以了解為是rabbitmq類型為system的交換機。

RabbitMQ消息發送與接收

測試:修改消費者代碼

結果:(可以看出是有路由key的,值為隊列名稱)

  exchange類型根據分發政策分為四種。direct、fanout、topic、headers。headers比對amqp消息的header而不是路由鍵,此外headers交換機和direct交換機完全一緻,目前幾乎不用。exchange隻負責轉發消息,不具備存儲消息的能力,是以如果沒有任何隊列與exchange綁定,或者沒有符合路由規則的隊列,消息會丢失。是以隻能收到監聽之後生産者發送的消息。

抽取工具類:

  精準綁定,消息中的路由鍵(routingkey)和binding的bindingkey一緻。

生産者:

消費者一:

消費者二:

結果:

(1)消費者一

RabbitMQ消息發送與接收

(2)消費者二

RabbitMQ消息發送與接收

  每個發到fanout類型交換器的消息會被分發到所有的隊列中。fanout不處理路由鍵,發消息最快。

兩個消費者:

消費者1:

啟動兩個生産者,後啟動消費者後消費消息。

3.topic類型

  處理routingkey和bindingkey,支援通配符。# 比對0或多個單詞,* 比對單個單詞。 topic主題模式可以實作 publish/subscribe釋出訂閱模式 和 routing路由模式 的雙重功能

啟動兩個消費者後啟動生産者,最終入下:

RabbitMQ消息發送與接收

 (2)消費者二

RabbitMQ消息發送與接收

 總結:

1、簡單模式

一個生産者、一個消費者,不需要設定交換機(使用預設的交換機,一個direct類型的交換機,routing_key為queue名稱)

2、工作隊列模式 work queue

一個生産者、多個消費者(競争關系),不需要設定交換機(使用預設的交換機,一個direct類型的交換機,routing_key為queue名稱)

3、釋出訂閱模式 publish/subscribe

需要設定類型為fanout的交換機,并且交換機和隊列進行綁定,當發送消息到交換機後,交換機會将消息發送到綁定的隊列。多點傳播模式,不進行routingkey的判斷。

4、路由模式 routing

需要設定類型為direct的交換機,交換機和隊列進行綁定,并且指定routing key,當發送消息到交換機後,交換機會根據routing key将消息發送到對應的隊列

5、通配符模式 topic

需要設定類型為topic的交換機,交換機和隊列進行綁定,并且指定通配符方式的routing key,當發送消息到交換機後,交換機會根據routing key将消息發送到對應的隊列

  補充一下,無論是fanout多點傳播模式還是direct路由模式還是topic通配符模式,exchanger收到消息是會發送到後面的queue列中。如果一個應用以多執行個體部署,多個執行個體監聽一個exchanger下面相同的隊列,不會造成一個消息被相同的應用多執行個體重複消費,因為queue本質是不可重複消費。

  開發中可以一個應用一個交換機,不同的消息類型放到不同的隊列中。如果涉及死信隊列,可以對每個應用再建立一個死信交換機,隊列名稱相同,便于處理死信消息。

補充:消息的屬性可以通過basicproperties進行設定

basicproperties源碼如下:

測試:

(1)生産者發送消息時生成一些屬性

(2)消息接收者

結果:

補充: rabbitmqheaders消息類型的交換機使用方法如下:

x-match 為all是比對所有的請求頭和值,必須所有相等才會發送;any是滿足任意一個即可。

【當你用心寫完每一篇部落格之後,你會發現它比你用代碼實作功能更有成就感!】