前言
消息隊列在現今資料量超大,并發量超高的系統中是十分常用的。本文将會對現時最常用到的幾款消息隊列架構 ActiveMQ、RabbitMQ、Kafka 進行分析對比。
詳細介紹 RabbitMQ 在 Spring 架構下的結構及實作原理,從Producer 端的事務、回調函數(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 資訊接收容器進行詳細的分析。通過對 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用類型介紹,深入剖析在消息處理各個傳輸環節中的原理及注意事項。
并舉以執行個體對死信隊列、持久化操作進行一一介紹。
目錄
一、RabbitMQ 與 AMQP 的關系
二、RabbitMQ 的實作原理
三、RabbitMQ 應用執行個體
四、Producer 端的消息發送與監控
五、Consumer 端的消息接收與監控
六、死信隊列
七、持久化操作
一、RabbitMQ 與 AMQP 的關系
1.1 AMQP簡介
AMQP(Advanced Message Queue Protocol 進階消息隊列協定)是一個消息隊列協定,它支援符合條件的用戶端和消息代理中間件(message middleware broker)進行通訊。RabbitMQ 則是 AMQP 協定的實作者,主要用于在分布式系統中資訊的存儲發送與接收,RabbitMQ 的伺服器端用 Erlang 語言編寫,用戶端支援多種開發語言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。
1.2 ActiveMQ、RabbitMQ、Kafka 對比
現在在市場上有 ActiveMQ、RabbitMQ、Kafka 等多個常用的消息隊列架構,與其他架構對比起來,RabbitMQ 在易用性、擴充性、高可用性、多協定、支援多語言用戶端等方面都有不俗表現。

添加描述
1.2.1 AcitveMQ 特點
ActiveMQ 是 Apache 以 Java 語言開發的消息模型,它完美地支援 JMS(Java Message Service)消息服務,用戶端支援 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多種開主發語言,支援OpenWire、Stomp、REST、XMPP、AMQP 等多種協定。ActiveMQ 采用異步消息傳遞方式,在設計上保證了多主機叢集,用戶端-伺服器,點對點等模式的有效通信。從開始它就是按照 JMS 1.1 和 J2EE 1.4 規範進行開發,實作了消息持久化,XA,事務支撐等功能。經曆多年的更新完善,現今已成為 Java 應用開發中主流的消息解決方案。但相比起 RabbitMQ、Kafka 它的主要缺點表現為資源消耗比較大,吞吐量較低,在高并發的情況下系統支撐能力較弱。如果系統全程使用 Java 開發,其并發量在可控範圍内,或系統需要支援多種不同的協定,使用 ActiveMQ 可更輕便地搭建起消息隊列服務。
1.2.2 Kafka 特點
Kafka 天生是面向分布式系統開發的消息隊列,它具有高性能、容災性、可動态擴容等特點。Kafka 與生俱來的特點在于它會把每個Partition 的資料都備份到不同的伺服器當中,并與 ZooKeeper 配合,當某個Broker 故障失效時,ZooKeeper 服務就會将通知生産者和消費者,從備份伺服器進行資料恢複。在性能上 Kafka 也大大超越了傳統的 ActiveMQ、RabbitMQ ,由于 Kafka 叢集可支援動态擴容,在負載量到達峰值時可動态增加新的伺服器進叢集而無需重新開機服務。但由于 Kafka 屬于分布式系統,是以它隻能在同一分區内實作消息有序,無法實作全局消息有序。而且它内部的監控機制不夠完善,需要安裝插件,依賴ZooKeeper 進行中繼資料管理。如果系統屬于分布式管理機制,資料量較大且并發量難以預估的情況下,建議使用 Kafka 隊列。
1.2.3 RabbitMQ 對比
由于 ActiveMQ 過于依賴 JMS 的規範而限制了它的發展,是以 RabbitMQ 在性能和吞吐量上明顯會優于 ActiveMQ。
由于上市時間較長,在可用性、穩定性、可靠性上 RabbitMq 會比 Kafka 技術成熟,而且 RabbitMq 使用 Erlang 開發,是以天生具備高并發高可用的特點。而 Kafka 屬于分布式系統,它的性能、吞吐量、TPS 都會比 RabbitMq 要強。
二、RabbitMQ 的實作原理
2.1 生産者(Producer)、消費者(Consumer)、服務中心(Broker)之間的關系
首先簡單介紹 RabbitMQ 的運作原理,在 RabbitMQ 使用時,系統會先安裝并啟動 Broker Server,也就是 RabbitMQ 的服務中心。無論是生産者 (Producer),消費者(Consumer)都會通過連接配接池(Connection)使用 TCP/IP 協定(預設)來與 BrokerServer 進行連接配接。然後 Producer 會把 Exchange / Queue 的綁定資訊發送到 Broker Server,Broker Server 根據 Exchange 的類型邏輯選擇對應 Queue ,最後把資訊發送到與 Queue 關聯的對應 Consumer 。
2.2 交換器(Exchange)、隊列(Queue)、信道(Channel)、綁定(Binding)的概念
2.2.1 交換器 Exchange
Producer 建立連接配接後,并非直接将消息投遞到隊列 Queue 中,而是把消息發送到交換器 Exchange,由 Exchange 根據不同邏輯把消息發送到一個或多個對應的隊列當中。目前 Exchange 提供了四種不同的常用類型:Fanout、Direct、Topic、Header。
·Fanout類型
此類型是最為常見的交換器,它會将消息轉發給所有與之綁定的隊列上。比如,有N個隊列與 Fanout 交換器綁定,當産生一條消息時,Exchange 會将該消息的N個副本分别發給每個隊列,類似于廣播機制。
·Direct類型
此類型的 Exchange 會把消息發送到 Routing_Key 完全相等的隊列當中。多個 Cousumer 可以使用相同的關鍵字進行綁定,類似于資料庫的一對多關系。比如,Producer 以 Direct 類型的 Exchange 推送 Routing_Key 為 direct.key1 的隊列,系統再指定多個 Cousumer 綁定 direct.key1。如此,消息就會被分發至多個不同的 Cousumer 當中。
·Topic類型
此類型是最靈活的一種方式配置方式,它可以使用模糊比對,根據 Routing_Key 綁定到包含該關鍵字的不同隊列中。比如,Producer 使用 Topic類型的 Exchange 分别推送 Routing_Key 設定為 topic.guangdong.guangzhou 、topic.guangdong.shenzhen 的不同隊列,Cousumer 隻需要把 Routing_Key 設定為 topic.guangdong.# ,就可以把所有消息接收處理。
·Headers類型
該類型的交換器與前面介紹的稍有不同,它不再是基于關鍵字 Routing_Key 進行路由,而是基于多個屬性進行路由的,這些屬性比路由關鍵字更容易表示為消息的頭。也就是說,用于路由的屬性是取自于消息 Header 屬性,當消息 Header 的值與隊列綁定時指定的值相同時,消息就會路由至相應的隊列中。
2.2.2 Queue 隊列
Queue 隊列是消息的載體,每個消息都會被投入到 Queue 當中,它包含 name,durable,arguments 等多個屬性,name 用于定義它的名稱,當 durable(持久化)為 true 時,隊列将會持久化儲存到硬碟上。反之為 false 時,一旦 Broker Server 被重新開機,對應的隊列就會消失,後面還會有例子作詳細介紹。
2.2.3 Channel 通道
當 Broker Server 使用 Connection 連接配接 Producer / Cousumer 時會使用到信道(Channel),一個 Connection上可以建立多個 Channel,每個 Channel 都有一個會話任務,可以了解為邏輯上的連接配接。主要用作管理相關的參數定義,發送消息,擷取消息,事務處理等。
2.2.4 Binding 綁定
Binding 主要用于綁定交換器 Exchange 與 隊列 Queue 之間的對應關系,并記錄路由的 Routing-Key。Binding 資訊會儲存到系統當中,用于 Broker Server 資訊的分發依據。
三、RabbitMQ 應用執行個體
3.1 Rabbit 常用類說明
3.1.1 RabbitTemplate 類
Spring 架構已經封裝了 RabbitTemplate 對 RabbitMQ 的綁定、隊列發送、接收進行簡化管理
方法 | 說明 |
---|---|
void setExchange(String exchange) | 設定綁定的 exchange 名稱 |
String getExchange() | 擷取已綁定的 exchange 名稱 |
void setRoutingKey(String routingKey) | 設定綁定的 routingKey |
String getRoutingKey() | 擷取已綁定的 routingKey |
void send(String exchange, String routingKey, Message message,CorrelationData data) | 以Message方式發送資訊到 Broken Server,CorrelationData 為标示符可為空 |
void convertAndSend(String exchange, String routingKey, Object object, CorrelationData data) | 以自定義對象方式發送資訊到 Broken Server,系統将自動把 object轉換成 Message,CorrelationData 為标示符可為空 |
Message receive(String queueName, long timeoutMillis) | 根據queueuName接收隊列發送Message資訊 |
Object receiveAndConvert(String queueName, long timeoutMillis) | 根據queueuName接收隊列對象資訊 |
void setReceiveTimeout(long receiveTimeout) | 設定接收過期時間 |
void setReplyTimeout(long replyTimeout) | 設定重發時間 |
void setMandatory(boolean mandatory) | 開啟強制委托模式(下文會詳細說明) |
void setConfirmCallback(confirmCallback) | 綁定消息确認回調方法(下文會詳細說明) |
void setReturnCallback(returnCallback) | 綁定消息退出回調方法(下文會詳細說明) |
3.2 初探 RabbitMQ
在官網下載下傳并成功安裝完 RabbitMQ 後,打開預設路徑 http://localhost:15672/#/ 即可看到 RabbitMQ 服務中心的管理界面
3.2.1 Producer 端開發
先在 pom 中添加 RabbitMQ 的依賴,并在 application.yml 中加入 RabbitMQ 帳号密碼等資訊。此例子,我們嘗試使用 Direct 交換器把隊列發送到不同的 Consumer。
1 **********************pom *************************
2 <project>
3 .............
4 <dependency>
5 <groupId>org.springframework.boot</groupId>
6 <artifactId>spring-boot-starter-amqp</artifactId>
7 <version>2.0.5.RELEASE</version>
8 </dependency>
9 </project>
10
11 **************** application.yml ****************
12 spring:
13 application:
14 name: rabbitMqProducer
15 rabbitmq:
16 host: localhost
17 port: 5672
18 username: admin
19 password: 12345678
20 virtual-host: /LeslieHost
複制
首先使用 CachingConnectionFactory 建立連結,通過 BindingBuilder 綁定 Exchange、Queue、RoutingKey之間的關系。
然後通過 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把資訊發送到 Broken Server
1 @Configuration
2 public class ConnectionConfig {
3 @Value("${spring.rabbitmq.host}")
4 public String host;
5
6 @Value("${spring.rabbitmq.port}")
7 public int port;
8
9 @Value("${spring.rabbitmq.username}")
10 public String username;
11
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 factory.setHost(host);
22 factory.setPort(port);
23 factory.setUsername(username);
24 factory.setPassword(password);
25 factory.setVirtualHost(virtualHost);
26 return factory;
27 }
28 }
29
30 @Configuration
31 public class BindingConfig {
32 public final static String first="direct.first";
33 public final static String second="direct.second";
34 public final static String Exchange_NAME="directExchange";
35 public final static String RoutingKey1="directKey1";
36 public final static String RoutingKey2="directKey2";
37
38 @Bean
39 public Queue queueFirst(){
40 return new Queue(first);
41 }
42
43 @Bean
44 public Queue queueSecond(){
45 return new Queue(second);
46 }
47
48 @Bean
49 public DirectExchange directExchange(){
50 return new DirectExchange(Exchange_NAME,true,true);
51 }
52
53 //利用BindingBuilder綁定Direct與queueFirst
54 @Bean
55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
57 }
58
59 //利用BindingBuilder綁定Direct與queueSecond
60 @Bean
61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
63 }
64 }
65
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69 @Autowired
70 private RabbitTemplate template;
71
72 @RequestMapping("/send")
73 public void send() {
74 for(int n=0;n<100;n++){
75
76 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue! "+String.valueOf(n),getCorrelationData());
77 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue! "+String.valueOf(n),getCorrelationData());
78 }
79 }
80
81 private CorrelationData getCorrelationData(){
82 return new CorrelationData(UUID.randomUUID().toString());
83 }
84 }
複制
此時,打開 RabbitMQ 管理界面,可看到 Producer 已經向 Broken Server 的 direct.first / direct.second 兩個 Queue 分别發送100 個 Message
3.2.2 Consumer 端開發
分别建立兩個不同的 Consumer ,一個綁定 direct.first 别一個綁定 direct.second , 然後通過注解 @RabbitListener 監聽不同的 queue,當接到到 Producer 推送隊列時,顯示隊列資訊。
1 @Configuration
2 public class ConnectionConfig {
3 @Value("${spring.rabbitmq.host}")
4 public String host;
5
6 @Value("${spring.rabbitmq.port}")
7 public int port;
8
9 @Value("${spring.rabbitmq.username}")
10 public String username;
11
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 factory.setHost(host);
22 factory.setPort(port);
23 factory.setUsername(username);
24 factory.setPassword(password);
25 factory.setVirtualHost(virtualHost);
26 return factory;
27 }
28 }
29
30 @Configuration
31 public class BindingConfig {
32 public final static String first="direct.first";
33 public final static String Exchange_NAME="directExchange";
34 public final static String RoutingKey1="directKey1";
35
36 @Bean
37 public Queue queueFirst(){
38 return new Queue(first);
39 }
40
41 @Bean
42 public DirectExchange directExchange(){
43 return new DirectExchange(Exchange_NAME);
44 }
45
46 //利用BindingBuilder綁定Direct與queueFirst
47 @Bean
48 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
49 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
50 }
51 }
52
53 @Configuration
54 @RabbitListener(queues="direct.first")
55 public class RabbitMqListener {
56
57 @RabbitHandler
58 public void handler(String message){
59 System.out.println(message);
60 }
61 }
62
63 @SpringBootApplication
64 public class App {
65
66 public static void main(String[] args){
67 SpringApplication.run(App.class, args);
68 }
69 }
複制
運作後可以觀察到不同的 Consumer 會收到不同隊列的消息
如果覺得使用 Binding 代碼綁定過于繁瑣,還可以直接在監聽類RabbitMqListener中使用 @QueueBinding 注解綁定
1 @Configuration
2 public class ConnectionConfig {
3 @Value("${spring.rabbitmq.host}")
4 public String host;
5
6 @Value("${spring.rabbitmq.port}")
7 public int port;
8
9 @Value("${spring.rabbitmq.username}")
10 public String username;
11
12 @Value("${spring.rabbitmq.password}")
13 public String password;
14
15 @Value("${spring.rabbitmq.virtual-host}")
16 public String virtualHost;
17
18 @Bean
19 public ConnectionFactory getConnectionFactory(){
20 CachingConnectionFactory factory=new CachingConnectionFactory();
21 factory.setHost(host);
22 factory.setPort(port);
23 factory.setUsername(username);
24 factory.setPassword(password);
25 factory.setVirtualHost(virtualHost);
26 return factory;
27 }
28 }
29
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.second"),
34 key="directKey2"))
35 public class RabbitMqListener {
36
37 @RabbitHandler
38 public void handler(String message){
39 System.out.println(message);
40 }
41 }
42
43 @SpringBootApplication
44 public class App {
45
46 public static void main(String[] args){
47 SpringApplication.run(App.class, args);
48 }
49 }
複制
運作結果
由于受到篇幅限制,關于 Producer 與 Consumer 的資訊監控及死信隊列,持久化操作等内容,将在下面章節詳細介紹,敬請期待