天天看點

RabbitMQ (四) 路由選擇 (Routing)

上一篇部落格我們建立了一個簡單的日志系統,我們能夠廣播日志消息給所有你的接收者,如果你不了解,請檢視:​​RabbitMQ (三) 釋出/訂閱​​。本篇部落格我們準備給日志系統添加新的特性,讓日志接收者能夠訂閱部分消息。例如,我們可以僅僅将緻命的錯誤寫入日志檔案,然而仍然在控制台上列印出所有的其他類型的日志消息。

1、綁定(Bindings)

在上一篇部落格中我們已經使用過綁定。類似下面的代碼:

channel.queueBind(queueName, EXCHANGE_NAME, "");      

綁定表示轉發器與隊列之間的關系。我們也可以簡單的認為:隊列對該轉發器上的消息感興趣。

綁定可以附帶一個額外的參數routingKey。為了與避免basicPublish方法(釋出消息的方法)的參數混淆,我們準備把它稱作綁定鍵(binding key)。下面展示如何使用綁定鍵(binding key)來建立一個綁定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");      

綁定鍵的意義依賴于轉發器的類型。對于fanout類型,忽略此參數。

2、直接轉發(Direct exchange)

上一篇的日志系統廣播所有的消息給所有的消費者。我們希望可以對其擴充,來允許根據日志的嚴重性進行過濾日志。例如:我們可能希望把緻命類型的錯誤寫入硬碟,而不把硬碟空間浪費在警告或者消息類型的日志上。

之前我們使用fanout類型的轉發器,但是并沒有給我們帶來更多的靈活性:僅僅可以愚蠢的轉發。

我們将會使用direct類型的轉發器進行替代。direct類型的轉發器背後的路由轉發算法很簡單:消息會被推送至綁定鍵(binding key)和消息釋出附帶的選擇鍵(routing key)完全比對的隊列。

圖解:

RabbitMQ (四) 路由選擇 (Routing)

上圖,我們可以看到direct類型的轉發器與兩個隊列綁定。第一個隊列與綁定鍵orange綁定,第二個隊列與轉發器間有兩個綁定,一個與綁定鍵black綁定,另一個與green綁定鍵綁定。

這樣的話,當一個消息附帶一個選擇鍵(routing key) orange釋出至轉發器将會被導向到隊列Q1。消息附帶一個選擇鍵(routing key)black或者green将會被導向到Q2.所有的其他的消息将會被丢棄。

3、多重綁定(multiple bindings)

RabbitMQ (四) 路由選擇 (Routing)

使用一個綁定鍵(binding key)綁定多個隊列是完全合法的。如上圖,一個附帶選擇鍵(routing key)的消息将會被轉發到Q1和Q2。

4、發送日志(Emittinglogs)

我們準備将這種模式用于我們的日志系統。我們将消息發送到direct類型的轉發器而不是fanout類型。我們将把日志的嚴重性作為選擇鍵(routing key)。這樣的話,接收程式可以根據嚴重性來選擇接收。我們首先關注發送日志的代碼:

像以前一樣,我們需要先建立一個轉發器:

channel.exchangeDeclare(EXCHANGE_NAME,"direct");

然後我們準備發送一條消息:

channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());

為了簡化代碼,我們假定‘severity’是‘info’,‘warning’,‘error’中的一個。

5、訂閱

接收消息的代碼和前面的部落格的中類似,隻有一點不同:我們給我們所感興趣的嚴重性類型的日志建立一個綁定。

StringqueueName = channel.queueDeclare().getQueue();
 for(Stringseverity : argv)
 {
 channel.queueBind(queueName, EXCHANGE_NAME, severity);
 }      

6、完整的執行個體

RabbitMQ (四) 路由選擇 (Routing)

發送端:EmitLogDirect.java

[java]  ​​view plain​​  ​​copy​​

1. package com.zhy.rabbit._04_binding_key;  
2.   
3. import java.util.Random;  
4. import java.util.UUID;  
5.   
6. import com.rabbitmq.client.Channel;  
7. import com.rabbitmq.client.Connection;  
8. import com.rabbitmq.client.ConnectionFactory;  
9.   
10. public class EmitLogDirect  
11. {  
12.   
13. private static final String EXCHANGE_NAME = "ex_logs_direct";  
14. private static final String[] SEVERITIES = { "info", "warning", "error" };  
15.   
16. public static void main(String[] argv) throws java.io.IOException  
17.     {  
18. // 建立連接配接和頻道  
19. new ConnectionFactory();  
20. "localhost");  
21.         Connection connection = factory.newConnection();  
22.         Channel channel = connection.createChannel();  
23. // 聲明轉發器的類型  
24. "direct");  
25.   
26. //發送6條消息  
27. for (int i = 0; i < 6; i++)  
28.         {  
29.             String severity = getSeverity();  
30. "_log :" + UUID.randomUUID().toString();  
31. // 釋出消息至轉發器,指定routingkey  
32. null, message  
33.                     .getBytes());  
34. " [x] Sent '" + message + "'");  
35.         }  
36.   
37.         channel.close();  
38.         connection.close();  
39.     }  
40.   
41. /**
42.      * 随機産生一種日志類型
43.      * 
44.      * @return
45.      */  
46. private static String getSeverity()  
47.     {  
48. new Random();  
49. int ranVal = random.nextInt(3);  
50. return SEVERITIES[ranVal];  
51.     }  
52. }      

随機發送6條随機類型(routing key)的日志給轉發器~~

接收端:ReceiveLogsDirect.java

[java]  ​​view plain​​  ​​copy​​

1. package com.zhy.rabbit._04_binding_key;  
2.   
3. import java.util.Random;  
4.   
5. import com.rabbitmq.client.Channel;  
6. import com.rabbitmq.client.Connection;  
7. import com.rabbitmq.client.ConnectionFactory;  
8. import com.rabbitmq.client.QueueingConsumer;  
9.   
10. public class ReceiveLogsDirect  
11. {  
12.   
13. private static final String EXCHANGE_NAME = "ex_logs_direct";  
14. private static final String[] SEVERITIES = { "info", "warning", "error" };  
15.   
16. public static void main(String[] argv) throws java.io.IOException,  
17.             java.lang.InterruptedException  
18.     {  
19. // 建立連接配接和頻道  
20. new ConnectionFactory();  
21. "localhost");  
22.         Connection connection = factory.newConnection();  
23.         Channel channel = connection.createChannel();  
24. // 聲明direct類型轉發器  
25. "direct");  
26.   
27.         String queueName = channel.queueDeclare().getQueue();  
28.         String severity = getSeverity();  
29. // 指定binding_key  
30.         channel.queueBind(queueName, EXCHANGE_NAME, severity);  
31. " [*] Waiting for "+severity+" logs. To exit press CTRL+C");  
32.   
33. new QueueingConsumer(channel);  
34. true, consumer);  
35.   
36. while (true)  
37.         {  
38.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
39. new String(delivery.getBody());  
40.   
41. " [x] Received '" + message + "'");  
42.         }  
43.     }  
44.   
45. /**
46.      * 随機産生一種日志類型
47.      * 
48.      * @return
49.      */  
50. private static String getSeverity()  
51.     {  
52. new Random();  
53. int ranVal = random.nextInt(3);  
54. return SEVERITIES[ranVal];  
55.     }  
56. }      

接收端随機設定一個日志嚴重級别(binding_key)。。。

我開啟了3個接收端程式,兩個準備接收error類型日志,一個接收info類型日志,然後運作發送端程式

運作結果:

 [x] Sent 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'

 [x] Sent 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'

 [x] Sent 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'

 [x] Sent 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'

 [x] Sent 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'

 [x] Sent 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'

------------------------------------------------------------------------------------

 [*] Waiting for error logs. To exit press CTRL+C

 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'

 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'

 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'

 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'

------------------------------------------------------------------------------------

 [*] Waiting for error logs. To exit press CTRL+C

 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'

 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'

 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'

 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'

------------------------------------------------------------------------------------

 [*] Waiting for info logs. To exit press CTRL+C

 [x] Received 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'

 [x] Received 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'