RabbitMQ入門
- 學習目标
- 1. 消息隊列概述
-
- 1.1. 消息隊列MQ
- 1.2、AMQP 和 JMS
- 1.3. 消息隊列産品
- 1.4. RabbitMQ
- 2、安裝及配置RabbitMQ
-
- 2.1. 安裝說明
- 2.2. 使用者以及Virtual Hosts配置
-
- 2.2.1、使用者角色
- 2.2.2、Virtual Hosts配置
- 3、RabbitMQ案例入門
-
- 3.1、搭建rabbit子產品工程及引入依賴
- 3.2、編寫生産者
- 3.3、編寫消費者
- 3.4、測試
- 3.5、小結
- 4、RabbitMQ工作模式
-
- 4.1、Work queues工作隊列模式
-
- 4.1.1、測試work queue工作模式
- 4.1.2、測試
- 4.1.3、小結
- 4.2、訂閱模式類型
- 4.3、Publish/Subscribe釋出與訂閱模式
-
- 4.3.1、模式說明
- 4.3.2、代碼
- 4.3.3、測試
- 4.3.4、小結
- 4.3.5、釋出訂閱模式與工作隊列模式的差別
- 4.4、Routing路由模式
-
- 4.4.1、模式說明
- 4.4.2、代碼
- 4.4.3、測試
- 4.4.4、小結
- 4.5、Topics通配符模式
-
- 4.5.1、模式說明
- 4.5.2、代碼
- 4.5.3、測試
- 4.5.4、小結
- 4.6、模式總結
學習目标
- 能夠說出什麼是消息隊列
- 能夠安裝RabbitMQ
- 能夠編寫RabbitMQ的入門程式
- 能夠說出RabbitMQ的5種模式特征
- 能夠使用SpringBoot整合RabbitMQ
1. 消息隊列概述
1.1. 消息隊列MQ
MQ全稱是Message Queue,消息隊列是應用程式和應用程式之間的通信方法。
- 為什麼使用MQ?
- 在項目中,可将一些無需即時傳回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節省了伺服器的請求響應時間,進而提高了系統的吞吐量。
- 開發中消息隊列通常有如下應用場景:
- 任務異步處理:将不需要同步處理的并且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程式的響應時間。
- 應用程式解耦合:MQ相當于一個中介,生産方通過MQ與消費方互動,它将應用程式進行解耦合。
1.2、AMQP 和 JMS
MQ是消息通信的模型;實作MQ的大緻有兩種主流方式:
AMQP
、
JMS
。
AMQP:
- AMQP是一種協定,更準确的說是一種binary wire-level protocol(連結協定)。這是其和JMS的本質差别,AMQP不從API層進行限定,而是直接定義網絡交換的資料格式。
JMS:
- MS即Java消息服務(JavaMessage Service)應用程式接口,是一個Java平台中關于面向消息中間件(MOM)的API,用于在兩個應用程式之間,或分布式系統中發送消息,進行異步通信。
AMQP 與 JMS 差別:
- JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協定來統一資料互動的格式
- JMS限定了必須使用Java語言;AMQP隻是協定,不規定實作方式,是以是跨語言的。
- JMS規定了兩種消息模式(點對點、訂閱);而AMQP的消息模式更加豐富(最典型是路由)。
1.3. 消息隊列産品
市場上常見的消息隊列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C語言開發
- RabbitMQ:基于AMQP協定,erlang語言開發,穩定性好
- RocketMQ:基于JMS,阿裡巴巴産品
- Kafka:類似MQ的産品;分布式消息系統,高吞吐量
1.4. RabbitMQ
RabbitMQ是由erlang語言開發,基于AMQP(Advanced Message Queue 進階消息隊列協定)協定實作的消息隊列,它是一種應用程式之間的通信方法,消息隊列在分布式系統開發中應用非常廣泛。
- RabbitMQ官方位址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式:
簡單模式
,
work模式
,
Publish/Subscribe釋出與訂閱模式
,
Routing路由模式
,
Topics主題模式
,
RPC遠端調用模式
(遠端調用,不太算MQ;不作介紹);
- 官網對應模式介紹:https://www.rabbitmq.com/getstarted.html
2、安裝及配置RabbitMQ
2.1. 安裝說明
windows下安裝Erlang和RabbitMQ詳細教程
2.2. 使用者以及Virtual Hosts配置
2.2.1、使用者角色
RabbitMQ在安裝好後,可以通路
http://localhost:15672
;其自帶了guest/guest的使用者名和密碼;如果需要建立自定義使用者,那麼也可以登入管理界面後,如下操作:
角色說明:
1、 超級管理者(administrator)
- 可登陸管理控制台,可檢視所有的資訊,并且可以對使用者,政策(policy)進行操作。
2、 監控者(monitoring)
- 可登陸管理控制台,同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁盤使用情況等)
3、 政策制定者(policymaker)
- 可登陸管理控制台, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框辨別的部分)。
4、 普通管理者(management)
- 僅可登陸管理控制台,無法看到節點資訊,也無法對政策進行管理。
5、 其他
- 無法登陸管理控制台,通常就是普通的生産者和消費者。
2.2.2、Virtual Hosts配置
像mysql擁有資料庫的概念并且可以指定使用者對庫和表等操作的權限。RabbitMQ也有類似的權限管理;在RabbitMQ中可以虛拟消息伺服器Virtual Host,每個Virtual Hosts相當于一個相對獨立的RabbitMQ伺服器,每個VirtualHost之間是互相隔離的。exchange、queue、message不能互通。 相當于mysql的db。
注意:Virtual Name一般以
/
開頭。
1. 建立Virtual Hosts
2. 設定Virtual Hosts權限
3、RabbitMQ案例入門
3.1、搭建rabbit子產品工程及引入依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
3.2、編寫生産者
抽取建立connection的工具類com.ebuy.rabbitmq.util.ConnectionUtil工具類;
public class ConnectionUtil {
public static Connection getConnection() throws Exception
{
//建立連接配接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//主機位址:預設為loalhost
connectionFactory.setHost("127.0.0.1");
//連接配接端口:預設為5672
connectionFactory.setPort(5672);
//虛拟主機名稱:預設為 /
connectionFactory.setVirtualHost("/crm");
//連接配接使用者名:建立的使用者(預設為guest)
connectionFactory.setUsername("wbs");
//連接配接民密碼:使用者密碼(預設為guest)
connectionFactory.setPassword("123456");
//建立連接配接
Connection connection=connectionFactory.newConnection();
return connection;
}
}
編寫生産者:
public class Producer {
//隊列名稱
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) {
try {
//建立一個連接配接
Connection connection = ConnectionUtil.getConnection();
//建立一個頻道
Channel channel = connection.createChannel();
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//生産一個消息
String message="你好,我是001";
/**
* 參數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 參數2:路由key,簡單模式可以傳遞隊列名稱
* 參數3:消息其它屬性
* 參數4:消息内容(要轉化為位元組類型)
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//消息發送後,提示語句
System.out.println("消息已發送:"+message);
//關閉連接配接
channel.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
在執行上述的消息發送之後;可以登入rabbitMQ的管理控制台,可以發現隊列和其消息:
3.3、編寫消費者
編寫消息的消費者com.ithouse.rabbitmq.simple.Consumer消費者類:
public class Consumer {
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection= ConnectionUtil.getConnection();
//建立頻道
Channel channel=connection.createChannel();
/**
* 聲明隊列
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息為:" + new String(body, "utf-8"));
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
3.4、測試
先啟動生産者:
再啟動消費者:
3.5、小結
上述的入門案例中中其實使用的是如下的簡單模式:
在上圖的模型中,有以下概念:
-
:生産者,也就是要發送消息的程式P
-
:消費者:消息的接受者,會一直等待消息到來。C
-
:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生産者向其中投遞消息,消費者從其中取出消息。queue
在rabbitMQ中消息者是一定要到某個消息隊列中去擷取消息的。
4、RabbitMQ工作模式
4.1、Work queues工作隊列模式
Work Queues
與入門程式的 簡單模式 相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。
應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
4.1.1、測試work queue工作模式
Work Queues
與入門程式的 簡單模式 的代碼是幾乎一樣的;可以完全複制,并複制多一個消費者進行多個消費者同時消費消息的測試,隻需在生産者中添加一個for循環模拟建立多個消息加入到消息隊列中:
(1)、生産者
public class Producer {
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) {
try {
//建立一個連接配接
Connection connection= ConnectionUtil.getConnection();
//建立一個頻道
Channel channel=connection.createChannel();
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for(int i=1;i<=30;i++) {
String message = "你好,我是work:"+i;
/**
* 參數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 參數2:路由key,簡單模式可以傳遞隊列名稱
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息已發送:" + message);
}
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
(2)、消費者1
public class Consumer1 {
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection= ConnectionUtil.getConnection();
//建立頻道
Channel channel=connection.createChannel();
/**
* 聲明隊列
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer1接收到的消息為:" + new String(body, "utf-8"));
/**
* 線程等待一秒
*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
(3)、消費者2
public class Consumer2 {
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection= ConnectionUtil.getConnection();
//建立頻道
Channel channel=connection.createChannel();
/**
* 聲明隊列
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer2接收到的消息為:" + new String(body, "utf-8"));
/**
* 線程等待一秒
*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
4.1.2、測試
先啟動兩個消費者,然後再啟動生産者發送消息;到兩個消費者對應的控制台檢視是否屬于競争性的接收到消息。
Consumer1:
Consumer2:
4.1.3、小結
在一個隊列中如果有多個消費者,那麼消費者之間對于同一個消息的關系是競争的關系。
4.2、訂閱模式類型
訂閱模式示例圖:
前面2個案例中,隻有3個角色:
-
:生産者,也就是要發送消息的程式P
-
:消費者:消息的接受者,會一直等待消息到來。C
-
:消息隊列,圖中紅色部分queue
而在訂閱模型中,多了一個
Exchange
角色,而且過程略有變化:
-
:生産者,也就是要發送消息的程式,但是不再發送到隊列中,而是發給X(交換機)P
-
:消費者,消息的接受者,會一直等待消息到來。C
-
:消息隊列,接收消息、緩存消息。Queue
-
:交換機,就是圖中的X。一方面,接收生産者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特别隊列、遞交給所有隊列、或是将消息丢棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:Exchange
-
:廣播,将消息交給所有綁定到交換機的隊列;Fanout
-
:定向,把消息交給符合指定Direct
的隊列;routing key
-
:通配符,把消息交給符合Topic
的隊列。routing pattern(路由模式)
-
Exchange(交換機)隻需負責轉發消息,不具備儲存消息的能力,是以如果沒有任何隊列與Exchange綁定或者沒有符合路由規則的隊列,那麼消息會消失!!!
4.3、Publish/Subscribe釋出與訂閱模式
4.3.1、模式說明
釋出訂閱模式:
- 每個消費者監聽自己的隊列。
- 生産者将消息發給broker,由交換機将消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都将接收到消息。
4.3.2、代碼
(1)Consumer1
public class Consumer1 {
//交換機名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//隊列名稱1
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//隊列名稱1
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection= ConnectionUtil.getConnection();
//建立頻道
Channel channel=connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機
* 參數2:指定釋出訂閱類型
*/
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
/**
* 聲明隊列
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
/**
* 隊列綁定交換機
* 參數1:隊列
* 參數2:要綁定的交換機
* 參數3:路由key
*/
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHAGE,"");
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer1接收到的消息為:" + new String(body, "utf-8"));
/**
* 線程等待一秒
*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(FANOUT_QUEUE_1, true, consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
(2)Consumer2
public class Consumer2 {
//交換機名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//隊列名稱1
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//隊列名稱1
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection= ConnectionUtil.getConnection();
//建立頻道
Channel channel=connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機
* 參數2:指定釋出訂閱類型
*/
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
/**
* 聲明隊列
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
/**
* 隊列綁定交換機
* 參數1:隊列
* 參數2:要綁定的交換機
* 參數3:路由key
*/
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHAGE,"");
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer2接收到的消息為:" + new String(body, "utf-8"));
/**
* 線程等待一秒
*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(FANOUT_QUEUE_2, true, consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
(3)Producer
public class Producer {
//交換機名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//隊列名稱1
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//隊列名稱1
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection = ConnectionUtil.getConnection();
//建立頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機名稱
* 參數2:交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
/**
* 聲明(建立)隊列
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
//隊列綁定交換機(綁不綁都可以,因為在消費者方可以随意組合路由key和隊列)
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
for(int i=0;i<30;i++){
String message = "你好:我是ps" + i;
/**
* 參數1:交換機名稱,如果沒有指定則使用預設Default Exchage
* 參數2:路由key,簡單模式可以傳遞隊列名稱
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish(FANOUT_EXCHAGE,"",null,message.getBytes());
System.out.println("消息已經發送:" + message);
}
//關閉資源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.3.3、測試
首先啟動Producer生産者,檢視rabbitmq服務中的交換機:
然後啟動兩個Consumer消費者,檢視兩個Consumer消費者的控制台:
4.3.4、小結
交換機需要與隊列進行綁定,綁定之後;一個消息可以被多個消費者都收到,多個消費者之間不再是競争的關系。
4.3.5、釋出訂閱模式與工作隊列模式的差別
- 工作隊列模式不用定義交換機,而釋出/訂閱模式需要定義交換機;
- 工作隊列模式的生産方是面向隊列發送消息的(底層使用預設交換機),而釋出/訂閱模式的生産方是面向交換機發送消息的;
- 工作隊列模式不需要設定,實際上工作隊列模式将隊列綁定綁定的到預設的交換機,而釋出/訂閱模式需要設定隊列和交換機的綁定。
4.4、Routing路由模式
4.4.1、模式說明
路由模式特點:
- 隊列與交換機的綁定,不能是任意綁定的,而是要指定一個
;RoutingKey(路由key)
- 消息的發送方是在向
發送消息時,也必須指定消息的Exchange
;RoutingKey
-
不再把消息交給每一個綁定的隊列,而是根據消息的Exchange
進行判斷,隻有隊列RoutingKey
與消息的RoutingKey
完全一緻,才會接收到資訊。RoutingKey
圖解:
- P:生産者,向Exchange發送消息,發送消息時,會指定一個
;routingkey
- X:Exchange(交換機),接收生産者的消息,然後把消息遞交給與
完全比對的隊列;routingkey
- C1:消費者,其所在隊列指定了需要
為error的消息;routingkey
- C2:消費者,其所在隊列指定了需要
為info、error、warning的消息。routingkey
4.4.2、代碼
Producer生産者:
public class Producer {
//交換機名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊列名稱(插入)
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊列名稱(更新)
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection = ConnectionUtil.getConnection();
//建立頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機名稱
* 參數2:交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT,true,false,false,null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE,true,false,false,null);
//隊列綁定交換機(綁不綁都可以,但是不建議指定routkey,因為在消費者方可以随意組合路由key和隊列)
channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHAGE,"");
channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHAGE,"");
//發送
String message = "新增了商品。路由模式;routing key 為 insert ";
/**
* 參數1:交換機名稱,如果沒有指定則使用預設Default Exchnage
* 參數2:指定路由key
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish(DIRECT_EXCHAGE,"insert",null,message.getBytes());
System.out.println("消息已發送:" + message);
//發送
message = "更新了商品。路由模式;routing key 為 ";
/**
* 參數1:交換機名稱,如果沒有指定則使用預設Default Exchange
* 參數2:指定路由key
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish(DIRECT_EXCHAGE,"updates",null,message.getBytes());
System.out.println("消息已發送:" + message);
// 關閉資源
channel.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
注意:
在生産者方,其實channel頻道中隻是将交換機和所有的隊列都綁定在一起,而路由規則key與隊列的綁定應該由消費者一方來決定,可以将routkey與隊列随意組合調用;
如果在生産者方直接将routkey和指定的隊列綁定在交換機中,過于死闆了。
Cosumer1:
public class Consumer1 {
//交換機名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊列名稱(插入)
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊列名稱(更新)
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//建立頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機名稱
* 參數2:交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT,true,false,false,null);
//綁定指定routingkey隊列到交換機
channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHAGE,"insert");
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer1接收到的消息為:" + new String(body, "utf-8"));
/**
* 線程等待一秒
*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(DIRECT_QUEUE_INSERT, true, consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
Consumer2:
public class Consumer2 {
//交換機名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊列名稱(插入)
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊列名稱(更新)
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//建立頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機名稱
* 參數2:交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(DIRECT_QUEUE_UPDATE,true,false,false,null);
//綁定指定routingkey隊列到交換機
channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHAGE,"update");
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer2接收到的消息為:" + new String(body, "utf-8"));
/**
* 線程等待一秒
*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(DIRECT_QUEUE_UPDATE, true, consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
4.4.3、測試
啟動所有消費者,然後使用生産者發送消息;在消費者對應的控制台可以檢視到生産者發送對應
routing key
對應的隊列的消息;達到
按照需要接收
的效果。
在執行完測試代碼後,其實到RabbitMQ的管理背景找到 Exchanges 頁籤,點選
direct_exchange
的交換機,可以檢視到如下的綁定:
檢視Queues隊列頁籤:
direct_queue_update隊列同理!!!
檢視兩個消費者控制台:
4.4.4、小結
Routing模式要求隊列在綁定交換機時要指定routing key,消息會轉發到符合routing key的隊列。
4.5、Topics通配符模式
4.5.1、模式說明
Topic 類型與 Direct 相比,都是可以根據 RoutingKey 把消息路由到不同的隊列。隻不過 Topic 類型 Exchange 可以讓隊列在綁定 Routing key 的時候使用通配符!
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:
#
:比對一個或多個詞
-
:能夠比對 item.insert.abc 或者 item.insertitem.#
*
:比對不多不少恰好1個詞
-
:隻能比對 item.insertitem.*
圖解:
- 紅色Queue:綁定的是
,是以凡是以usa.#
開頭的 routing key 都會被比對到usa.
- 黃色Queue:綁定的是
,是以凡是以#.news
結尾的 routing key 都會被比對.news
4.5.2、代碼
(1)Proucer
public class Producer {
//交換機名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//隊列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//隊列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection = ConnectionUtil.getConnection();
//建立頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機名稱
* 參數2:交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 發送資訊
String message = "新增了商品。Topic模式;routing key 為 item.insert" ;
/**
* 參數1:交換機名稱;如果沒有則指定空字元串(表示使用預設的交換機)
* 參數2:路由key,簡單模式中可以使用隊列名稱
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已發送消息:" + message);
//修改了商品
message = "修改了商品。Topic模式;routing key 為 item.update" ;
/**
* 參數1:交換機名稱;如果沒有則指定空字元串(表示使用預設的交換機)
* 參數2:路由key,簡單模式中可以使用隊列名稱
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已發送消息:" + message);
message = "删除了商品。通配符模式 ;routing key 為 item.delete";
/**
* 參數1:交換機名稱;如果沒有則指定空字元串(表示使用預設的交換機)
* 參數2:路由key,簡單模式中可以使用隊列名稱
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已發送消息:" + message);
//關閉資源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
(2)Consumer1
public class Consumer1 {
//交換機名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//隊列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//隊列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection = ConnectionUtil.getConnection();
//建立頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機名稱
* 參數2:交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
/**
* 隊列綁定交換機
* 參數1:隊列名
* 參數2:交換機
* 參數3:路由key
*/
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.insert");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer1接收到的消息為:" + new String(body, "utf-8"));
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(TOPIC_QUEUE_1,true,consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
關鍵語句:
/**
* 隊列綁定交換機
* 參數1:隊列名
* 參數2:交換機
* 參數3:路由key
*/
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.insert");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
(3)Consumer2
public class Consumer2 {
//交換機名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//隊列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//隊列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) {
try {
//建立連接配接
Connection connection = ConnectionUtil.getConnection();
//建立頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機
* 參數1:交換機名稱
* 參數2:交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接配接
* 參數4:是否在不使用的時候自動删除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
/**
* 隊列綁定交換機
* 參數1:隊列名
* 參數2:交換機
* 參數3:路由key
*/
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
//建立消費者;并設定消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标簽,在channel.basicConsumer時候可以指定
* @param envelope 消息包的内容,可從中擷取交換機,消息id,路由key,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties 屬性資訊
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer2接收到的消息為:" + new String(body, "utf-8"));
}
};
/**
* 參數1:隊列名稱
* 參數2:是否自動确認,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動确認
* 參數3:消息接收到後回調
*/
channel.basicConsume(TOPIC_QUEUE_2,true,consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
關鍵語句:
/**
* 隊列綁定交換機
* 參數1:隊列名
* 參數2:交換機
* 參數3:路由key
*/
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
4.5.3、測試
啟動所有消費者,然後使用生産者發送消息;在消費者對應的控制台可以檢視到生産者發送對應routing key對應隊列的消息;到達按照需要接收的效果;并且這些routing key可以使用通配符。
在執行完測試代碼後,其實到RabbitMQ的管理背景找到 Exchanges 頁籤,點選 topic_exchange 的交換機,可以檢視到如下的綁定:
檢視兩個消費者的控制台:
如果将生産者中的其中一個發送routkey修改成三級路徑
item.abc.insert
:
再次啟動生産者,檢視兩個消費者的控制台:
解決辦法,将Consumer2中
item.*
改為
item.#
,重新啟動,檢視控制台:
/**
* 隊列綁定交換機
* 參數1:隊列名
* 參數2:交換機
* 參數3:路由key
*/
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.#");
4.5.4、小結
Topic主題模式可以實作 Publish/Subscribe釋出與訂閱模式 和 Routing路由模式 的功能;隻是Topic在配置routingkey 的時候可以使用通配符,顯得更加靈活。
4.6、模式總結
RabbitMQ工作模式:
- 1、
簡單模式
HelloWorld 一個生産者、一個消費者,不需要設定交換機(使用預設的交換
機)
- 2、
Work Queue 一個生産者、多個消費者(競争關系),不需要設定交換機(使用預設的交換機)工作隊列模式
- 3、
Publish/subscribe 需要設定類型為fanout的交換機,并且交換機和隊列進行綁定,當發送消息到交換機後,交換機會将消息發送到綁定的隊列釋出訂閱模式
- 4、
Routing 需要設定類型為direct的交換機,交換機和隊列進行綁定,并且指定routing key,當發送消息到交換機後,交換機會根據routing key将消息發送到對應的隊列路由模式
- 5、
Topic 需要設定類型為topic的交換機,交換機和隊列進行綁定,并且指定通配符方式的routing key,當發送消息到交換機後,交換機會根據routing key将消息發送到對應的隊列通配符模式