概述
本示例程式全部來自rabbitmq官方示例程式,rabbitmq-demo;
官方共有6個demo,針對不同的語言(如 C#,Java,Spring-AMQP等),都有不同的示例程式;
本示例程式主要是Spring-AMQP的參考示例,如果需要其他語言的參考示例,可以參考官網;
rabbitmq模拟器
模拟器
rabbitmq簡介
核心架構圖
AMQP 0-9-1 Model Explained
重要文法說明
- producer或publisher: 消息生産者/釋出者,即:産生消息的;
- Exchange:producer或publisher隻會将message發送到Exchange,目前有4種不同的Exchange類型;
- Queue:消息隊列,所有的消費者都是直接從Queue擷取Message并消費;
- Binging:連接配接Exchange和Queue的紐帶,決定Exchange如何路由消息到不同的Queue;
- routingKey:生産者-->message-->Exchange,需要指定一個key,叫做routingKey;
- routingKey:Exchange-->Binging-->Queue,Binging有一個Key值,叫routingKey或bingingKey;
- bingingKey:Exchange-->Binging-->Queue,Binging有一個Key值,bingingKey;
核心了解
4種不同的Exchange,對routingKey的解釋都不相同;
對routingKey的不同解釋,決定了Exchange路由Message到Queue的不同方案;
- direct exchange: 比對2個routingKey(即routingKey和bingingKey)是否相等,相等時才進行消息路由;
- fanout exchange: 忽略routingKey,會将Message路由到所有綁定的Queue;
- topic exchange: routingKey格式形如
、aaa.bbb.xxx
,類似正規表達式比對;*.ccc.dd.#
- headers exchange:
jar包說明
-
Java版本:
Java版本使用如下jar(說明:若是使用):
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
-
Spring-AMQP版本:
Spring AMQP 官方詳細文章
使用Profile配置各個demo的運作選擇,當
使用如下Jar包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
demo1: 單生産者-單消費者
-
Java版本:單生産者單消費者
程式位置:java.demo1包下面
-
spring-amqp版本:單生産者單消費者
application.properties配置
spring.profiles.active=hello-world, sender, receiver
demo2: 單生産者-多消費者
Work queues官方示例
application.properties配置
spring.profiles.active=work-queues, sender, receiver
#spring.profiles.active=work-queues, sender
#spring.profiles.active=work-queues, receiver
較長的描述參見:單生産者-多消費者詳細
demo3: 釋出/訂閱
Publish/Subscribe官方示例
- 消費廣播到多個消費者進行消費;
- 使用fanout pattern;
application.properties配置
spring.profiles.active=pub-sub, receiver , sender
較長的描述參見:釋出/訂閱詳細
demo4: Routing
Routing官方示例
Direct exchange 模式進行route結構圖
a message goes to the queues whose
binding key
exactly matches the
routing key
of the message;(相等時才路由)
Multiple bindings
兩個Queue使用相同的BingingKey(black) ==> 效果類似于:釋出/訂閱模式(demo3);
完整的結構圖
application.properties配置
pring.profiles.active=routing, receiver , sender
較長的描述參見:釋出/訂閱詳細
demo5: Topics
Topics官方示例
- 使用 Topic exchange實作;
- 發送到Topic exchange的routingKey必須滿足一定要求:用"."分割的words清單,如:
;*.aaa.bbb.#
- BingingKey和routingKey有相同的格式要求;
-
: 可以比對一個word;*
-
: 可以比對0個或多個words;#
application.properties配置
pring.profiles.active=topics, receiver , sender
較長的描述參見:Topics
demo6: RPC over RabbitMQ
RPC官方示例
結構圖
application.properties配置
spring.profiles.active=rpc,server
#spring.profiles.active=rpc,client
較長的描述參見:RPC
消費端确認
Delivery Identifiers: Delivery Tags
消費者注冊後,rabbitmq将消息傳遞給消費者時,都會帶有一個“Delivery Tags”,這個是唯一的ID辨別,id以整數的遞增的方式實作。
Acknowledgement Modes(消費端)
自動确認模式
- 發送之後,就認為是發送成功(fire-and-forget)
- 消息不停的發送到消費端消費,無需等待消費端任何确認;
缺點:
- 可能造成消費端不堪重負;
手動模式
- basic.ack: 肯定的确認;
- basic.nack: 否定的确認(RabbitMQ對AMQP 0-9-1的擴充),支援消息
;批量确認
- basic.reject:否定的确認,消息消費失敗後,直接從broker中将消息
,delete
;不支援批量确認
Acknowledging Multiple Deliveries at Once(消息批量确認)
- 一次确認多個消息發送,而不是每一個消息單獨确認;
- basic.reject:不具備該功能;
- basic.nack: 具備該功能;
實作方式
- multiple field: 設定為true;
示例
假設:在Channel(ch)上有5,6,7,8這4個delivery tags未确認;
- 情況1,
: 則5,6,7,8這4個tags都将被确認;delivery_tag=8 & multiple=true
- 情況2,
:則隻有8被确認,而5,6,7将不會被被确認;delivery_tag=8 & multiple=false
Channel Prefetch Count (QoS)[可以設定消費端消費的速率]
- 消息消費是
完成的,手動确認也是異步
的;異步
- 有一部分消息是被消費了,但是還未來得及确認:
;希望控制未被确認消息的size,防止無界的緩存
-
:使用prefetch count
方法設定該值可以控制未被确認消息的max size;basic.qos
- 當達到該最大值時,rabbitmq将停止傳遞消息進行消費;
- 僅對
方法有效,對basic.qos
方法無效;basic.get
示例
假設:在Channel(Ch)上有5,6,7,8共4個
未被确認
的消息,且ch的
prefetch count=4
;
結果:rabbitmq将不會再傳遞任何消息到該Channel上,除非有消息被确認;
消費确認選擇,prefetch設定以及吞吐量
- 情況1:增大
:提高向消費者傳遞消息的速度;prefetch
- 情況2:自動确認模式可以産生最佳的傳送速率;
應避免:
-
;自動确認模式
-
+手動确認模式
;無限制的prefetch
結論:
-
和情況1
都可能導緻情況2
的Message增加,增大RAM的消耗;傳遞但未來得及處理
推薦值:
-
: 100~300,可以有效提高吞吐量,并避免RAM消耗過多的風險;prefetch
消費失敗或連接配接中斷: 自動重新reQueue
當消息發送給消費端後,如果出現如下情況,則消息會
重新reQueue
,會被再次發送;
- TCP連接配接中斷;
- 消費端挂掉:無法進行消息确認;
Client Errors: Double Acking and Unknown Tags
消費端無法對同一個消息确認超過一次,當超過一次之後,将抛出Channel error:
PRECONDITION_FAILED - unknown delivery tag XXXX
總結
- 每個傳遞給消費端的消息,都有一個唯一的辨別
;delivery tag
- 自動消息确認;
- 手動消息确認:
和每個消息單獨确認
;批量消息确認
-
:可以控制消息端的吞吐量,避免消費端消費過慢,産生RAM大量消耗;prefetchCount
- 失敗重傳:
或TCP連接配接中斷
,都會引起消息重新入隊列,重新消費(手動消息确認時);消費端挂掉
- 無法對同一個消息進行2次或2次以上的
,否則會抛出異常;确認
發送端确認
Channel事務
- 不推薦使用: 會嚴重降低吞吐量;
在 AMQP 0-9-1中,保證消息不丢失的唯一方法,就是使用事務;
- 開啟Channel事務;
- 發送消息,送出事務;
類似消費端的應答确認機制
-
: 應用于Channel時,表示使用confirm.select
;确認模式
-
和事務
無法共存:二者隻能選擇其一;确認模式
确認模式 (confirm.select)
- 發送端使用
;confirm.select
-
發送broker
來确認Message已被處理;basic.ack
-
: 消息序列,具有唯一性;delivery-tag
-
: 用于設定multiple=true
;批量消息确認
- 無法保證消息何時被确認;
- 确認模式:消息要麼被
,要麼被confirmed(OK)
,且only once;nack(fail)
Java示例:(發送端發送大量messages,使用确認模式)
程式-确認模式
否定确認
異常情況時,服務端無法處理消息,則
broker
發送
basic.nack
來進行
否定确認
;
應答延時和持久化消息
- 僅當消息被持久化到disk之後,才會發送
應答;basic.ack
- 吞吐量提高建議:
、異步處理應答
;批量發送消息
應答順序
發送确認 + 保證傳遞
- 消息持久化: 并不能保證消息不丢失(在寫入disk前broker就挂掉);