這篇文章,主要介紹消息隊列RabbitMQ七種模式之釋出訂閱模式(Publish/Subscribe)。
目錄
一、釋出訂閱模式
1.1、Exchange交換機
(1)什麼是Exchange交換機呢???
(2)Exchange如何知道消息應該分發到哪個Queue裡面?
(3)Exchange分類
1.2、釋出訂閱模式介紹
1.3、案例代碼
(1)引入依賴
(2)編寫生産者
(3)編寫消費者
一、釋出訂閱模式
1.1、Exchange交換機
前面介紹的兩個模式下,生産者都是直接和Queue消息隊列互動的,這種方式存在一個問題,那就是如果要向不同的Queue消息隊列裡面發送資料,還得重新寫一個生産者出來,為了解決這個問題,RabbitMQ引入了Exchange交換機。
(1)什麼是Exchange交換機呢???
Exchange交換機,它主要作用就是接收生産者發送的消息,并且根據消息的RouteKey将消息映射到不同的Queue消息隊列裡面,通過這種方式,就可以實作不同的消息分發到不同的Queue消息隊列裡面,并且這種模式下,生産者是不會直接和Queue消息隊列互動的,生産者也不需要知道消息應該放入哪個隊列,它隻要告訴Exchange即可,剩下的事情都是Exchange來處理。
(2)Exchange如何知道消息應該分發到哪個Queue裡面?
為了能夠讓Exchange知道某一條消息應該分發到哪個消息隊列裡面,RabbitMQ需要給Exchange和Queue之間添加映射關系,這個關系叫做:Binding(綁定)。雖然綁定在一起了,但是還是區分不了哪個消息應該分發到哪個Queue隊列裡面,是以還需要一個唯一辨別,這個辨別就是RouteKey路由鍵,也可以認為是Queue的唯一辨別。采用上面這種模式,大緻的邏輯圖就如下所示啦:
(3)Exchange分類
RabbitMQ中有四種常見的Exchange交換機類型,分别是:
- direct:直接模式。
- topic:主題模式,後面介紹的Topic模式就是利用這個Exchange類型。
- fanout:廣播模式,這是釋出訂閱模式要使用的Exchange類型。
- headers:不常用。
1.2、釋出訂閱模式介紹
RabbitMQ中,釋出訂閱模式就是指定Exchange交換機的類型是【fanout】,然後RabbitMQ就會将消息分發到所有和這個Exchange交換機綁定的Queue消息隊列裡面,此時所有的消費者都可以接收到這一條消息。
注意:對于釋出訂閱模式來說,消息的路由鍵RouteKey是沒有作用的,可以不寫。
1.3、案例代碼
(1)引入依賴
<!-- 引入 RabbitMQ 依賴 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
(2)編寫生産者
- 因為這裡采用的Exchange交換機來分發消息,是以生産者中隻需要将消息發送到Exchange交換機裡面即可,而不需要指定具體的Queue隊列。
package com.rabbitmq.demo.pub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:23
* @Copyright (C) ZhuYouBin
* @Description: 消息生産者
*/
public class Producer {
public static void main(String[] args) {
// 1、建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2、設定連接配接的 RabbitMQ 服務位址
factory.setHost("127.0.0.1"); // 預設就是本機
factory.setPort(5672); // 預設就是 5672 端口
// 3、擷取連接配接
Connection connection = null; // 連接配接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、擷取通道
channel = connection.createChannel();
// 5、聲明 Exchange,如果不存在,則會建立
String exchangeName = "exchange_demo_2023";
channel.exchangeDeclare(exchangeName, "fanout");
// 6、發送消息
String message = "這是釋出訂閱模式,發送的消息資料";
channel.basicPublish(exchangeName, "", null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {}
}
if (null != connection) {
try {
connection.close();
} catch (Exception e) {}
}
}
}
}
(3)編寫消費者
- 消費者中,需要建立Queue隊列,并且将這個Queue綁定到對應的Exchange上面(不同的消費者可以建立不同的隊列,隊列不同照樣可以接收到訂閱的消息)。
package com.rabbitmq.demo.pub;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:30
* @Copyright (C) ZhuYouBin
* @Description: 消息消費者
*/
public class Consumer {
public static void main(String[] args) {
// 1、建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2、設定連接配接的 RabbitMQ 服務位址
factory.setHost("127.0.0.1"); // 預設就是本機
factory.setPort(5672); // 預設就是 5672 端口
// 3、擷取連接配接
Connection connection = null; // 連接配接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、擷取通道
channel = connection.createChannel();
// 5、聲明 Exchange,如果不存在,則會建立
String exchangeName = "exchange_demo_2023";
channel.exchangeDeclare(exchangeName, "fanout");
// 6、指定需要操作的消息隊列,如果隊列不存在,則會建立
String queueName = "queue_demo_2023";
channel.queueDeclare(queueName, false, false, false, null);
// 7、綁定 Exchange 和 Queue
channel.queueBind(queueName, exchangeName, "");
// 8、消費消息
DeliverCallback callback = new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
// 接收消息
System.out.println("這是接收的消息:" + new String(delivery.getBody()));
}
};
channel.basicConsume(queueName, true, callback, i->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
運作消費者、生産者,檢視控制台,消費者日志輸出内容,可以發現兩個綁定不同Queue隊列的消費者,接收到了相同的消息,這就是釋出訂閱模式。
到此,RabbitMQ消息隊列中的釋出訂閱模式就介紹完啦。
綜上,這篇文章結束了,主要介紹消息隊列RabbitMQ七種模式之釋出訂閱模式(Publish/Subscribe)。