天天看點

【RabbitMQ筆記04】消息隊列RabbitMQ七種模式之釋出訂閱模式(Publish/Subscribe)一、釋出訂閱模式

這篇文章,主要介紹消息隊列RabbitMQ七種模式之釋出訂閱模式(Publish/Subscribe)。

目錄

一、釋出訂閱模式

1.1、Exchange交換機

(1)什麼是Exchange交換機呢???

(2)Exchange如何知道消息應該分發到哪個Queue裡面?

(3)Exchange分類

1.2、釋出訂閱模式介紹

1.3、案例代碼

(1)引入依賴

(2)編寫生産者

(3)編寫消費者

一、釋出訂閱模式

1.1、Exchange交換機

前面介紹的兩個模式下,生産者都是直接和Queue消息隊列互動的,這種方式存在一個問題,那就是如果要向不同的Queue消息隊列裡面發送資料,還得重新寫一個生産者出來,為了解決這個問題,RabbitMQ引入了Exchange交換機。

(1)什麼是Exchange交換機呢???

Exchange交換機,它主要作用就是接收生産者發送的消息,并且根據消息的RouteKey将消息映射到不同的Queue消息隊列裡面,通過這種方式,就可以實作不同的消息分發到不同的Queue消息隊列裡面,并且這種模式下,生産者是不會直接和Queue消息隊列互動的,生産者也不需要知道消息應該放入哪個隊列,它隻要告訴Exchange即可,剩下的事情都是Exchange來處理。

【RabbitMQ筆記04】消息隊列RabbitMQ七種模式之釋出訂閱模式(Publish/Subscribe)一、釋出訂閱模式

(2)Exchange如何知道消息應該分發到哪個Queue裡面?

為了能夠讓Exchange知道某一條消息應該分發到哪個消息隊列裡面,RabbitMQ需要給Exchange和Queue之間添加映射關系,這個關系叫做:Binding(綁定)。雖然綁定在一起了,但是還是區分不了哪個消息應該分發到哪個Queue隊列裡面,是以還需要一個唯一辨別,這個辨別就是RouteKey路由鍵,也可以認為是Queue的唯一辨別。采用上面這種模式,大緻的邏輯圖就如下所示啦:

【RabbitMQ筆記04】消息隊列RabbitMQ七種模式之釋出訂閱模式(Publish/Subscribe)一、釋出訂閱模式

(3)Exchange分類

RabbitMQ中有四種常見的Exchange交換機類型,分别是:

  • direct:直接模式。
  • topic:主題模式,後面介紹的Topic模式就是利用這個Exchange類型。
  • fanout:廣播模式,這是釋出訂閱模式要使用的Exchange類型。
  • headers:不常用。

1.2、釋出訂閱模式介紹

RabbitMQ中,釋出訂閱模式就是指定Exchange交換機的類型是【fanout】,然後RabbitMQ就會将消息分發到所有和這個Exchange交換機綁定的Queue消息隊列裡面,此時所有的消費者都可以接收到這一條消息。

注意:對于釋出訂閱模式來說,消息的路由鍵RouteKey是沒有作用的,可以不寫。

1.3、案例代碼

(1)引入依賴

<!-- 引入 RabbitMQ 依賴 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>
           

(2)編寫生産者

  • 因為這裡采用的Exchange交換機來分發消息,是以生産者中隻需要将消息發送到Exchange交換機裡面即可,而不需要指定具體的Queue隊列。
package com.rabbitmq.demo.pub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生産者
 */
public class Producer {
    public static void main(String[] args) {
        // 1、建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設定連接配接的 RabbitMQ 服務位址
        factory.setHost("127.0.0.1"); // 預設就是本機
        factory.setPort(5672); // 預設就是 5672 端口
        // 3、擷取連接配接
        Connection connection = null; // 連接配接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、擷取通道
            channel = connection.createChannel();
            // 5、聲明 Exchange,如果不存在,則會建立
            String exchangeName = "exchange_demo_2023";
            channel.exchangeDeclare(exchangeName, "fanout");
            // 6、發送消息
            String message = "這是釋出訂閱模式,發送的消息資料";
            channel.basicPublish(exchangeName, "", null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}
           

(3)編寫消費者

  • 消費者中,需要建立Queue隊列,并且将這個Queue綁定到對應的Exchange上面(不同的消費者可以建立不同的隊列,隊列不同照樣可以接收到訂閱的消息)。
package com.rabbitmq.demo.pub;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:30
 * @Copyright (C) ZhuYouBin
 * @Description: 消息消費者
 */
public class Consumer {
    public static void main(String[] args) {
        // 1、建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設定連接配接的 RabbitMQ 服務位址
        factory.setHost("127.0.0.1"); // 預設就是本機
        factory.setPort(5672); // 預設就是 5672 端口
        // 3、擷取連接配接
        Connection connection = null; // 連接配接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、擷取通道
            channel = connection.createChannel();
            // 5、聲明 Exchange,如果不存在,則會建立
            String exchangeName = "exchange_demo_2023";
            channel.exchangeDeclare(exchangeName, "fanout");
            // 6、指定需要操作的消息隊列,如果隊列不存在,則會建立
            String queueName = "queue_demo_2023";
            channel.queueDeclare(queueName, false, false, false, null);
            // 7、綁定 Exchange 和 Queue
            channel.queueBind(queueName, exchangeName, "");
            // 8、消費消息
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    // 接收消息
                    System.out.println("這是接收的消息:" + new String(delivery.getBody()));
                }
            };
            channel.basicConsume(queueName, true, callback, i->{});
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
           

運作消費者、生産者,檢視控制台,消費者日志輸出内容,可以發現兩個綁定不同Queue隊列的消費者,接收到了相同的消息,這就是釋出訂閱模式。

到此,RabbitMQ消息隊列中的釋出訂閱模式就介紹完啦。

綜上,這篇文章結束了,主要介紹消息隊列RabbitMQ七種模式之釋出訂閱模式(Publish/Subscribe)。

繼續閱讀