内容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程式
RabbitMQ(二):Work Queues、循環分發、消息确認、持久化、公平分發
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調隊列callback queue、關聯辨別correlation id、實作簡單的RPC系統
RabbitMQ(七):常用方法說明 與 學習小結
Topics:
在上一篇部落格中我們改進了我們的日志系統:使用
direct
路由器替代了
fanout
路由器,進而可以選擇性地接收日志。
盡管使用direct路由器給我們的日志系統帶來了改進,但仍然有一些限制:不能基于多種标準進行路由。
在我們的日志系統中,我們可能不僅需要根據日志的嚴重級别來接收日志,而且有時想基于日志來源進行路由。如果你知道syslog這個Unix工具,你可能了解這個概念,
sysylog
會基于日志嚴重級别(
info/warn/crit...
)和裝置(
auth/cron/kern...
)進行日志分發。
如果我們可以監聽來自
corn
的錯誤日志,同時也監聽
kern
的所有日志,那麼我們的日志系統就會更加靈活。
為了實作這個功能,我們需要了解一個複雜的路由器:
topic
路由器。
主題路由器(Topic Exchange):
發送到
topic
路由器的消息的路由鍵
routing_key
不能任意給定:它必須是一些單詞的集合,中間用點号
.
分割。這些單詞可以是任意的,但通常會展現出消息的特征。一些有效的路由鍵示例:
stock.usd.nyse
,
nyse.vmw
,
quick.orange.rabbit
。這些路由鍵可以包含很多單詞,但路由鍵總長度不能超過255個位元組。
綁定鍵
binding key
也必須是這種形式。
topic
路由器背後的邏輯與
direct
路由器類似:以特定路由鍵發送的消息将會發送到所有綁定鍵與之比對的隊列中。但綁定鍵有兩種特殊的情況:
(1)*(星号)僅代表一個單詞
(2)#(井号)代表任意個單詞
下圖可以很好地解釋這兩個符号的含義:
對于上圖的例子,我們将會發送描述動物的消息。這些消息将會以由三個單詞組成的路由鍵發送。路由鍵中的第一個單詞描述了速度,第二個描述了顔色,第三個描述了物種:
<speed>.<colour>.<species>
。
我們建立了三個綁定,Q1的綁定鍵為
*.orange.*
,Q2的綁定鍵有兩個,分别是
*.*.rabbit
和
lazy.#
。
上述綁定關系可以描述為:
(1)Q1關注所有顔色為
orange
的動物。
(2)Q2關注所有的
rabbit
,以及所有的
lazy
的動物。
如果一個消息的路由鍵是
quick.orange.rabbit
,那麼Q1和Q2都可以接收到,路由鍵是
lazy.orange.elephant
的消息同樣如此。但是,路由鍵是
quick.orange.fox
的消息隻會到達Q1,路由鍵是
lazy.brown.fox
的消息隻會到達Q2。注意,路由鍵為
lazy.pink.rabbit
的消息隻會到達Q2一次,盡管它比對了兩個綁定鍵。路由鍵為
quick.brown.fox
的消息因為不和任意的綁定鍵比對,是以将會被丢棄。
假如我們不按常理出牌:發送一個路由鍵隻有一個單詞或者四個單詞的消息,像
orange
或者
quick.orange.male.rabbit
,這樣的話,這些消息因為不和任意綁定鍵比對,都将會丢棄。但是,
lazy.orange.male.rabbit
消息因為和
lazy.#
比對,是以會到達Q2,盡管它包含四個單詞。
Topic exchange::
Topic exchange
非常強大,可以實作其他任意路由器的功能。
當一個隊列以綁定鍵
綁定,它将會接收到所有的消息,而無視路由鍵(實際是綁定鍵
#
比對了任意的路由鍵)。----這和
#
fanout
路由器一樣了。
當
和
*
這兩個特殊的字元不出現在綁定鍵中,
#
就會和
Topic exchange
類似了。
direct exchange
放在一塊:
我們将會在我們的日志系統中使用主題路由器
Topic exchange
,并假設所有的日志消息以兩個單詞
<facility>.<severity>
為路由鍵。
代碼和上個教程幾乎一樣。
生産者
EmitLogTopic.java
:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
//建立連接配接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//聲明路由器和路由器類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//定義路由鍵和消息
String routingKey = "";
String message = "msg.....";
//釋出消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
消費者
ReceiveLogsTopic.java
:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
//建立連接配接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明路由器和路由器類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//
String bingingKeys[] = {""};
for (String bindingKey : bingingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//監聽消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
現在,可以動手實驗了。
開頭提到的:日志嚴重級别
info/warn/crit...
和裝置
auth/cron/kern...
。
消費者:
将
String bingingKeys[] = {""}
改為
String bingingKeys[] = {"#"}
,啟動第一個消費者;
再改為
String bingingKeys[] = {"kern.*"}
,啟動第二個消費者;
再改為
String bingingKeys[] = {"*.critical"}
,啟動第三個消費者;
再改為
String bingingKeys[] = {"kern.*", "*.critical"}
,啟動第四個消費者。
生産者,發送多個消息,如:
路由鍵為
kern.critical
的消息:
A critical kernel error
;
路由鍵為
kern.info
的消息:
A kernel info
;
路由鍵為
kern.warn
的消息:
A kernel warning
;
路由鍵為
auth.critical
的消息:
A critical auth error
;
路由鍵為
cron.warn
的消息:
A cron waning
;
路由鍵為
cron.critical
的消息:
A critical cron error
;
試試最後的結果:第一個消費者将會接收到所有的消息,第二個消費者将會
kern
的所有嚴重級别的日志,第三個消費者将會接收到所有裝置的
critical
消息,第四個消費者将會接收到
kern
裝置的所有消息和所有
critical
消息。
下篇部落格中,我們将會學習如何讓消息往返,以此來作為一個遠端過程調用(RPC)。
說明
①與原文略有出入,如有疑問,請參閱原文
②原文均是編譯後通過javacp指令直接運作程式,我是在IDE中進行的,相應的操作做了修改。