天天看點

RabbitMQ六種隊列模式-主題模式

前言

RabbitMQ六種隊列模式-簡單隊列

RabbitMQ六種隊列模式-工作隊列

RabbitMQ六種隊列模式-釋出訂閱

RabbitMQ六種隊列模式-路由模式

RabbitMQ六種隊列模式-主題模式 [本文]

從前面的幾篇我們依次經曆了 exchange 模式從 fanout > direct 的轉變過程,在 fanout 時,我們隻能進行簡單的廣播,對應類型比較單一,使用 direct 後,消費者則可以進行一定程度的選擇,但是,direct 還是有局限性,路由不支援多個條件。

怎麼講呢?

direct 不支援比對 routingKey,一但綁定了就是綁定了,而 topic 主題模式支援規則比對,隻要符合 routingKey 就能發送到綁定的隊列上。

文章目錄

1. 什麼是主題模式2. 代碼部分2.1 生産者2.2 *消費者2.3 #消費者2.4 運作截圖3. 總結

1. 什麼是主題模式

官方連結:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

topics 主題模式跟 routing 路由模式類似,隻不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊比對路由鍵 routingKey,類似于SQL中 = 和 like 的關系。
RabbitMQ六種隊列模式-主題模式

P 表示為生産者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由

一個英文句點号“.”分隔的字元串(我們将被句點号“.”分隔開的每一段獨立的字元串稱為一個單詞),比如 "lazy.orange.fox"。topics routingKey 中可以存在兩種特殊字元“”與“#”,用于做模糊比對,其中“”用于比對一個單詞,“#”用于比對多個單詞(可以是零個)。

"*" 表示任何一個詞

"#" 表示0或1個詞

以上圖中的配置為例:

如果一個消息的 routingKey 設定為 “xxx.orange.rabbit”,那麼該消息會同時路由到 Q1 與 Q2,routingKey="lazy.orange.fox”的消息會路由到Q1與Q2;

routingKey="lazy.brown.fox”的消息會路由到 Q2;

routingKey="lazy.pink.rabbit”的消息會路由到 Q2(隻會投遞給Q2一次,雖然這個routingKey 與 Q2 的兩個 bindingKey 都比對);

routingKey="quick.brown.fox”、routingKey="orange”、routingKey="quick.orange.male.rabbit”的消息将會被丢棄,因為它們沒有比對任何bindingKey。

接下來代碼為例:

2. 代碼部分

2.1 生産者

public class ProducerTopic {

    private static final String EXCHANGE_NAME = "my_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        /** 1.建立新的連接配接 */
        Connection connection = MQConnectionUtils.newConnection();
        /** 2.建立通道 */
        Channel channel = connection.createChannel();
        /** 3.綁定的交換機 參數1互動機名稱 參數2 exchange類型 */
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        /** 4.發送消息 */
        String routingKey = "log.info.error";
        String msg = "topic_exchange_msg:" + routingKey;
        System.out.println("[send] = " + msg);
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        /** 5.關閉通道、連接配接 */
        channel.close();
        connection.close();
        /** 注意:如果消費沒有綁定交換機和隊列,則消息會丢失 */
    }
}
           

2.2 *消費者

public class ConsumerLogXTopic {

    private static final String QUEUE_NAME = "topic_consumer_info";
    private static final String EXCHANGE_NAME = "my_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("log * 消費者啟動");
        /* 1.建立新的連接配接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.建立通道 */
        Channel channel = connection.createChannel();
        /* 3.消費者關聯隊列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消費者綁定交換機 參數1 隊列 參數2交換機 參數3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者擷取生産者消息:" + msg);
            }
        };
        /* 5.消費者監聽隊列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
           

2.3 #消費者

public class ConsumerLogJTopic {

    private static final String QUEUE_NAME = "topic_consumer_info";
    private static final String EXCHANGE_NAME = "my_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("log # 消費者啟動");
        /* 1.建立新的連接配接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.建立通道 */
        Channel channel = connection.createChannel();
        /* 3.消費者關聯隊列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消費者綁定交換機 參數1 隊列 參數2交換機 參數3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者擷取生産者消息:" + msg);
            }
        };
        /* 5.消費者監聽隊列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
           

2.4 運作截圖

生産者

RabbitMQ六種隊列模式-主題模式

星花*消費者

RabbitMQ六種隊列模式-主題模式

#消費者

RabbitMQ六種隊列模式-主題模式

3. 總結

1、topic 相對于之前幾種算是比較複雜了,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“标題”(RouteKey),exchange 會将消息轉發到所有關注主題能與 routeKey 模糊比對的隊列。

2、在進行綁定時,要提供一個該隊列關心的主題,如“#.sscai.#”表示該隊列關心所有涉及 sscai 的消息(一個 routeKey 為 "club.sscai.tmax”的消息會被轉發到該隊列)。

3、"#”表示0個或若幹個關鍵字,“”表示一個關鍵字。如“club.”能與“club.sscai”比對,無法與“club.sscai.xxx”比對;但是“club.#”能與上述兩者比對。

4、同樣,如果 exchange 沒有發現能夠與 routeKey 比對的 Queue,則會抛棄此消息。

案例代碼:https://www.lanzous.com/i5ydu6d

我建立了一個java相關的公衆号,用來記錄自己的學習之路,感興趣的小夥伴可以關注一下微信公衆号哈:程式員小源
RabbitMQ六種隊列模式-主題模式