天天看點

RabbitMQ(五):Exchange交換器--topic

内容翻譯自:RabbitMQ Tutorials Java版

RabbitMQ(一):Hello World程式

RabbitMQ(二):Work Queues、循環分發、消息确認、持久化、公平分發

RabbitMQ(三):Exchange交換器--fanout

RabbitMQ(四):Exchange交換器--direct

RabbitMQ(五):Exchange交換器--topic

RabbitMQ(六):回調隊列callback queue、關聯辨別correlation id、實作簡單的RPC系統

RabbitMQ(七):常用方法說明 與 學習小結

Topics:

在上一篇部落格中我們改進了我們的日志系統:使用

direct

路由器替代了

fanout

路由器,進而可以選擇性地接收日志。

盡管使用direct路由器給我們的日志系統帶來了改進,但仍然有一些限制:不能基于多種标準進行路由。

在我們的日志系統中,我們可能不僅需要根據日志的嚴重級别來接收日志,而且有時想基于日志來源進行路由。如果你知道syslog這個Unix工具,你可能了解這個概念,

sysylog

會基于日志嚴重級别(

info/warn/crit...

)和裝置(

auth/cron/kern...

)進行日志分發。

如果我們可以監聽來自

corn

的錯誤日志,同時也監聽

kern

的所有日志,那麼我們的日志系統就會更加靈活。

為了實作這個功能,我們需要了解一個複雜的路由器:

topic

路由器。

主題路由器(Topic Exchange):

發送到

topic

路由器的消息的路由鍵

routing_key

不能任意給定:它必須是一些單詞的集合,中間用點号

.

分割。這些單詞可以是任意的,但通常會展現出消息的特征。一些有效的路由鍵示例:

stock.usd.nyse

nyse.vmw

quick.orange.rabbit

。這些路由鍵可以包含很多單詞,但路由鍵總長度不能超過255個位元組。

綁定鍵

binding key

也必須是這種形式。

topic

路由器背後的邏輯與

direct

路由器類似:以特定路由鍵發送的消息将會發送到所有綁定鍵與之比對的隊列中。但綁定鍵有兩種特殊的情況:

(1)*(星号)僅代表一個單詞

(2)#(井号)代表任意個單詞

下圖可以很好地解釋這兩個符号的含義:

RabbitMQ(五):Exchange交換器--topic

對于上圖的例子,我們将會發送描述動物的消息。這些消息将會以由三個單詞組成的路由鍵發送。路由鍵中的第一個單詞描述了速度,第二個描述了顔色,第三個描述了物種:

<speed>.<colour>.<species>

我們建立了三個綁定,Q1的綁定鍵為

*.orange.*

,Q2的綁定鍵有兩個,分别是

*.*.rabbit

lazy.#

上述綁定關系可以描述為:

(1)Q1關注所有顔色為

orange

的動物。

(2)Q2關注所有的

rabbit

,以及所有的

lazy

的動物。

如果一個消息的路由鍵是

quick.orange.rabbit

,那麼Q1和Q2都可以接收到,路由鍵是

lazy.orange.elephant

的消息同樣如此。但是,路由鍵是

quick.orange.fox

的消息隻會到達Q1,路由鍵是

lazy.brown.fox

的消息隻會到達Q2。注意,路由鍵為

lazy.pink.rabbit

的消息隻會到達Q2一次,盡管它比對了兩個綁定鍵。路由鍵為

quick.brown.fox

的消息因為不和任意的綁定鍵比對,是以将會被丢棄。

假如我們不按常理出牌:發送一個路由鍵隻有一個單詞或者四個單詞的消息,像

orange

或者

quick.orange.male.rabbit

,這樣的話,這些消息因為不和任意綁定鍵比對,都将會丢棄。但是,

lazy.orange.male.rabbit

消息因為和

lazy.#

比對,是以會到達Q2,盡管它包含四個單詞。

Topic exchange::

Topic exchange

非常強大,可以實作其他任意路由器的功能。

當一個隊列以綁定鍵

#

綁定,它将會接收到所有的消息,而無視路由鍵(實際是綁定鍵

#

比對了任意的路由鍵)。----這和

fanout

路由器一樣了。

*

#

這兩個特殊的字元不出現在綁定鍵中,

Topic exchange

就會和

direct exchange

類似了。

放在一塊:

我們将會在我們的日志系統中使用主題路由器

Topic exchange

,并假設所有的日志消息以兩個單詞

<facility>.<severity>

為路由鍵。

代碼和上個教程幾乎一樣。

生産者

EmitLogTopic.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            //建立連接配接和通道
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();

            //聲明路由器和路由器類型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            //定義路由鍵和消息
            String routingKey = "";
            String message = "msg.....";

            //釋出消息
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}
           

消費者

ReceiveLogsTopic.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        //建立連接配接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //聲明路由器和路由器類型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        //
        String bingingKeys[] = {""};

        for (String bindingKey : bingingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //監聽消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
           

現在,可以動手實驗了。

開頭提到的:日志嚴重級别

info/warn/crit...

和裝置

auth/cron/kern...

消費者:

String bingingKeys[] = {""}

改為

String bingingKeys[] = {"#"}

,啟動第一個消費者;

再改為

String bingingKeys[] = {"kern.*"}

,啟動第二個消費者;

再改為

String bingingKeys[] = {"*.critical"}

,啟動第三個消費者;

再改為

String bingingKeys[] = {"kern.*", "*.critical"}

,啟動第四個消費者。

生産者,發送多個消息,如:

路由鍵為

kern.critical

的消息:

A critical kernel error

路由鍵為

kern.info

的消息:

A kernel info

路由鍵為

kern.warn

的消息:

A kernel warning

路由鍵為

auth.critical

的消息:

A critical auth error

路由鍵為

cron.warn

的消息:

A cron waning

路由鍵為

cron.critical

的消息:

A critical cron error

試試最後的結果:第一個消費者将會接收到所有的消息,第二個消費者将會

kern

的所有嚴重級别的日志,第三個消費者将會接收到所有裝置的

critical

消息,第四個消費者将會接收到

kern

裝置的所有消息和所有

critical

消息。

下篇部落格中,我們将會學習如何讓消息往返,以此來作為一個遠端過程調用(RPC)。

說明

①與原文略有出入,如有疑問,請參閱原文

②原文均是編譯後通過javacp指令直接運作程式,我是在IDE中進行的,相應的操作做了修改。

繼續閱讀