天天看點

RabbitMQ:交換機(topic exchange)

首先看看​

​AMQP​

​​協定,對​

​RabbitMQ​

​的架構會更了解。

​​深入了解AMQP協定​​

建立一個​

​Maven​

​項目,根據自己伺服器RabbitMQ的版本導入相應的包。

RabbitMQ:交換機(topic exchange)
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.6.5</version>
    </dependency>      

主題交換機

前面提到的 direct exchange的路由規則是嚴格意義上的比對,換言之 Routing Key 必須與 Binding Key 相比對的時候才将消息傳送給 Queue。

而topic exchange的路由規則是一種模糊比對,可以通過通配符滿足一部分規則就可以傳送。

它的約定是:

1)binding key 中可以存在兩種特殊字元 ​

​“*”​

​​ 與​

​“#”​

​​,用于做模糊比對,其中 ​

​“*”​

​​ 用于比對一個單詞,​

​“#”​

​用于比對多個單詞(可以是零個)。

2)routing key 為一個句點号 “.” 分隔的字元串(我們将被句号 “. ” 分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”,binding key 與 routing key 一樣也是句點号 “.” 分隔的字元串。

RabbitMQ:交換機(topic exchange)

當生産者發送消息 Routing Key=F.C.E 的時候,這時候隻滿足 Queue1,是以會被路由到 Queue1 中,如果 Routing Key=A.C.E 這時候會被同時路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 時,這裡隻會發送一條消息到 Queue2 中。

主題交換機擁有非常廣泛的使用者案例。無論何時,當一個問題涉及到那些想要有針對性的選擇需要接收消息的 多消費者 / 多應用(multiple consumers/applications) 的時候,主題交換機都可以被列入考慮範圍。

生産端

package com.kaven.rabbitmq.exchange.topicExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // topic exchange ,RabbitMQ提供的topic exchange
    private static String exchangeName = "amq.topic";
    // exchange type
    private static String exchangeType= "topic";

    // 交換機路由的routingKey
    private static String[] routingKey = {"test.kaven.wyy" , "test" , "test.topic"};

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立一個連接配接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立連接配接
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 發送消息
        for (int i = 0; i < routingKey.length; i++) {
            String msg = "RabbitMQ:Topic Exchange 發送資料 , routingKey:"+routingKey[i];
            channel.basicPublish(exchangeName ,routingKey[i] ,null, msg.getBytes());
        }

        // 5 關閉連接配接
        channel.close();
        connection.close();
    }
}      

消費端

這裡建立兩個消費者,為每個消費者建立一個線程,以便監聽生産端發送過來的消息。

package com.kaven.rabbitmq.exchange.topicExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer {

    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // topic exchange ,RabbitMQ提供的topic exchange
    private static String exchangeName = "amq.topic";
    // exchange type
    private static String exchangeType= "topic";
    // 隊列名
    private static String[] queueName = {"queue_#" , "queue_*"};
    // 隊列與交換機綁定的routingKey
    private static String[] routingKey = {"test.#" , "test.*"};

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立一個連接配接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立連接配接
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        for (int i = 0; i < queueName.length; i++) {
            // 4 定義Queue ,将Queue綁定到direct exchange
            channel.queueDeclare(queueName[i],true , false , false , null);
            channel.queueBind(queueName[i] , exchangeName , routingKey[i]);
        }


        // 5 建立消費者
        QueueingConsumer consumer0 = new QueueingConsumer(channel);
        QueueingConsumer consumer1 = new QueueingConsumer(channel);

        // 6 設定
        channel.basicConsume(queueName[0] , true , consumer0);
        channel.basicConsume(queueName[1] , true , consumer1);

        // 7 接收消息
        Thread thread0 = new Thread(new MyRunnable(consumer0 , routingKey[0]));
        Thread thread1 = new Thread(new MyRunnable(consumer1 , routingKey[1]));

        thread0.start();
        thread1.start();

        thread0.join();
        thread1.join();
    }
}      
package com.kaven.rabbitmq.exchange.topicExchange;

import com.rabbitmq.client.QueueingConsumer;

public class MyRunnable implements Runnable {

    private QueueingConsumer consumer;
    private String routingKey;

    public MyRunnable(QueueingConsumer consumer , String routingKey) {
        this.consumer = consumer;
        this.routingKey = routingKey;
    }

    @Override
    public void run() {
        while(true){
            try {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println(routingKey+"[收到]-"+msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}      

測試

因為這裡使用的是​

​RabbitMQ​

​​提供給我們的topic exchange,是以我們無需自己定義。

因為交換機已經定義好了,是以無論先啟動生産端還是消費端,消費端都可以成功收到消息。

消費端輸出如下:

test.#[收到]-RabbitMQ:Topic Exchange 發送資料 , routingKey:test.kaven.wyy
test.*[收到]-RabbitMQ:Topic Exchange 發送資料 , routingKey:test.topic
test.#[收到]-RabbitMQ:Topic Exchange 發送資料 , routingKey:test
test.#[收到]-RabbitMQ:Topic Exchange 發送資料 , routingKey:test.topic      

很明顯輸出結果符合上面對topic exchange路由規則的解釋。

看看​

​RabbitMQ Management​

​。

RabbitMQ:交換機(topic exchange)
RabbitMQ:交換機(topic exchange)