轉載請注明出處:http://www.cnblogs.com/4----/p/6549865.html
0.目錄
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ RabbitMQ-從基礎到實戰(2)— 防止消息丢失 RabbitMQ-從基礎到實戰(4)— 消息的交換(中) RabbitMQ-從基礎到實戰(5)— 消息的交換(下) RabbitMQ-從基礎到實戰(6)— 與Spring內建1.簡介
在前面的例子中,每個消息都隻對應一個消費者,即使有多個消費者線上,也隻會有一個消費者接收并處理一條消息,這是消息中間件的一種常用方式。
另外一種方式,生産者生産一條消息,廣播給一個或多個隊列,所有訂閱了這個隊列的消費者,都可以消費這條消息,這就是消息訂閱。
官方教程列舉了這樣一個場景,生産者發出一條記錄日志的消息,消費者1接收到後寫日志到硬碟,消費者2接收到後列印日志到螢幕。工作中還有很多這樣的場景有待發掘,适當的使用消息訂閱後可以成倍的增加效率。
2.RabbitMQ的交換中心(Exchange)
在前兩章的例子中,我們涉及到了三個概念
- 生産者
- 隊列
- 消費者
這不禁讓我們以為,生産者生産消息後直接到發送到隊列,消費者從隊列中擷取消息,再消費掉。
其實這是錯誤的,在RabbitMQ中,生産者不會直接把消息發送給隊列,實際上,生産者甚至不知道一條消息會不會被發送到隊列上。
正确的概念是,生産者會把消息發送給RabbitMQ的交換中心(Exchange),Exchange的一側是生産者,另一側則是一個或多個隊列,由Exchange決定一條消息的生命周期--發送給某些隊列,或者直接丢棄掉。
這個概念在官方文檔中被稱作RabbitMQ消息模型的核心思想(core idea)
如下圖,其中X代表的是Exchange。

RabbitMQ中,有4種類型的Exchange
- direct 通過消息的routing key比較queue的key,相等則發給該queue,常用于相同應用多執行個體之間的任務分發
- 預設類型 本身是一個direct類型的exchange,routing key自動設定為queue name。注意,direct不等于預設類型,預設類型是在queue沒有指定exchange時的預設處理方式,發消息時,exchange字段也要相應的填成空字元串“”
- topic 話題,通過可配置的規則分發給綁定在該exchange上的隊列,通過地理位置推送等場景适用
- headers 當分發規則很複雜,用routing key不好表達時适用,忽略routing key,用header取代之,header可以為非字元串,例如Integer或者String
- fanout 分發給所有綁定到該exchange上的隊列,忽略routing key,适用于MMO遊戲、廣播、群聊等場景
更詳細的介紹,請看
官方文檔3.臨時隊列
可以對一個隊列命名是十分重要的,在消費者消費消息時,要指明消費哪個隊列的消息(下面的queue),這樣就可以讓多個消費者同時分享一個隊列
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
上述記錄日志的場景中,有以下幾個特點
- 所有消費者都需要監聽所有的日志消息,是以每個消費者都需要一個單獨的隊列,不需要和别人分享
- 消費者隻關心最新的消息,連接配接到RabbitMQ之前的消息不需要關心,是以,每次連接配接時需要建立一個隊列,綁定到相應的exchange上,連接配接斷開後,删除該隊列
自己聲明隊列是比較麻煩的,是以,RabbitMQ提供了簡便的擷取臨時隊列的方法,該隊列會在連接配接斷開後銷毀
String queueName = channel.queueDeclare().getQueue();
這行代碼會擷取一個名字類似于“amq.gen-JzTY20BRgKO-HjmUJj0wLg”的臨時隊列
4.綁定
再次注意,在RabbitMQ中,消息是發送到Exchange的,不是直接發送的Queue。是以,需要把Queue和Exchange進行綁定,告訴RabbitMQ把指定的Exchange上的消息發送的這個隊列上來
綁定隊列使用此方法
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
其中,queue是隊列名,exchange是要綁定的交換中心,routingKey就是這個queue的routingKey
5.實踐
下面來實作上述場景,生産者發送日志消息,消費者1記錄日志,消費者2列印日志
下面的代碼中,把連接配接工廠等方法放到了構造函數中,也就是說,每new一個對象,都會建立一個連接配接,在生産環境這樣做是很浪費性能的,每次建立一個connection都會建立一次TCP連接配接,生産環境應使用連接配接池。而Channel又不一樣,多個Channel是共用一個TCP連接配接的,是以可以放心的擷取Channel(本結論出自官方文檔對Channel的定義)
AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".
For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.
日志消息發送類 LogSender
1 import java.io.IOException;
2 import java.util.concurrent.TimeoutException;
3
4 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory;
6
7 import com.rabbitmq.client.Channel;
8 import com.rabbitmq.client.Connection;
9 import com.rabbitmq.client.ConnectionFactory;
10
11 public class LogSender {
12
13 private Logger logger = LoggerFactory.getLogger(LogSender.class);
14 private ConnectionFactory factory;
15 private Connection connection;
16 private Channel channel;
17
18 /**
19 * 在構造函數中擷取連接配接
20 */
21 public LogSender(){
22 super();
23 try {
24 factory = new ConnectionFactory();
25 factory.setHost("127.0.0.1");
26 connection = factory.newConnection();
27 channel = connection.createChannel();
28 } catch (Exception e) {
29 logger.error(" [X] INIT ERROR!",e);
30 }
31 }
32 /**
33 * 提供個關閉方法,現在并沒有什麼卵用
34 * @return
35 */
36 public boolean closeAll(){
37 try {
38 this.channel.close();
39 this.connection.close();
40 } catch (IOException | TimeoutException e) {
41 logger.error(" [X] CLOSE ERROR!",e);
42 return false;
43 }
44 return true;
45 }
46
47 /**
48 * 我們更加關心的業務方法
49 * @param message
50 */
51 public void sendMessage(String message) {
52 try {
53 //聲明一個exchange,命名為logs,類型為fanout
54 channel.exchangeDeclare("logs", "fanout");
55 //exchange是logs,表示發送到此Exchange上
56 //fanout類型的exchange,忽略routingKey,是以第二個參數為空
57 channel.basicPublish("logs", "", null, message.getBytes());
58 logger.debug(" [D] message sent:"+message);
59 } catch (IOException e) {
60 e.printStackTrace();
61 }
62 }
63 }
在LogSender中,和之前的例子不一樣的地方是,我們沒有直接聲明一個Queue,取而代之的是聲明了一個exchange
釋出消息時,第一個參數填了我們聲明的exchange名字,routingKey留白,因為fanout類型忽略它。
在前面的例子中,我們routingKey填的是隊列名,因為預設的exchange(exchange位空字元串時使用預設交換中心)會把隊列的routingKey設定為queueName(聲明隊列的時候設定的,不是發送消息的時候),又是direct類型,是以可以通過queueName當做routingKey找到隊列。
消費類 LogConsumer
1 package com.liyang.ticktock.rabbitmq;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16
17 public class LogConsumer {
18
19 private Logger logger = LoggerFactory.getLogger(LogConsumer.class);
20 private ConnectionFactory factory;
21 private Connection connection;
22 private Channel channel;
23
24 /**
25 * 在構造函數中擷取連接配接
26 */
27 public LogConsumer() {
28 super();
29 try {
30 factory = new ConnectionFactory();
31 factory.setHost("127.0.0.1");
32 connection = factory.newConnection();
33 channel = connection.createChannel();
34 // 聲明exchange,防止生産者沒啟動,exchange不存在
35 channel.exchangeDeclare("logs","fanout");
36 } catch (Exception e) {
37 logger.error(" [X] INIT ERROR!", e);
38 }
39 }
40
41 /**
42 * 提供個關閉方法,現在并沒有什麼卵用
43 *
44 * @return
45 */
46 public boolean closeAll() {
47 try {
48 this.channel.close();
49 this.connection.close();
50 } catch (IOException | TimeoutException e) {
51 logger.error(" [X] CLOSE ERROR!", e);
52 return false;
53 }
54 return true;
55 }
56
57 /**
58 * 我們更加關心的業務方法
59 */
60 public void consume() {
61 try {
62 // 擷取一個臨時隊列
63 String queueName = channel.queueDeclare().getQueue();
64 // 把剛剛擷取的隊列綁定到logs這個交換中心上,fanout類型忽略routingKey,是以第三個參數為空
65 channel.queueBind(queueName, "logs", "");
66 //定義一個Consumer,消費Log消息
67 Consumer consumer = new DefaultConsumer(channel) {
68 @Override
69 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
70 byte[] body) throws IOException {
71 String message = new String(body, "UTF-8");
72 logger.debug(" [D] 我是來列印日志的:"+message);
73 }
74 };
75 //這裡自動确認為true,接收到消息後該消息就銷毀了
76 channel.basicConsume(queueName, true, consumer);
77 } catch (IOException e) {
78 e.printStackTrace();
79 }
80 }
81 }
複制一個項目,把72行改為如下代碼,代表兩個做不同工作的消費者
1 logger.debug(" [D] 我已經把消息寫到硬碟了:"+message);
消費者App
1 public class App
2 {
3 public static void main( String[] args )
4 {
5 LogConsumer consumer = new LogConsumer();
6 consumer.consume();
7 }
8 }
生産者App
1 public class App {
2 public static void main( String[] args ) throws InterruptedException{
3 LogSender sender = new LogSender();
4 while(true){
5 sender.sendMessage(System.nanoTime()+"");
6 Thread.sleep(1000);
7 }
8 }
9 }
把消費者打包成兩個可執行的jar包,友善觀察控制台
用java -jar 指令執行,結果如下
6.結束語
本章介紹了RabbitMQ中消息的交換,再次強調,RabbitMQ中,消息是通過交換中心轉發到隊列的,不要被預設的exchange混淆,預設的exchange會自動把queue的名字設定為它的routingKey,是以消息釋出時,才能通過queueName找到該隊列,其實此時queueName扮演的角色就是routingKey。
本教程是參考官方文檔寫出來的,後續章節會介紹更多RabbitMQ的相關知識以及項目中的實戰技巧