天天看點

RabbitMQ (三) 釋出/訂閱

本系列教程主要來自于官網入門教程的翻譯,然後自己進行了部分的修改與實驗,内容僅供參考。 

上一篇部落格中,我們實作了工作隊列,并且我們的工作隊列中的一個任務隻會發給一個工作者,除非某個工作者未完成任務意外被殺死,會轉發給另外的工作者,如果你還不了解:​​RabbitMQ (二)工作隊列​​。這篇部落格中,我們會做一些改變,就是把一個消息發給多個消費者,這種模式稱之為釋出/訂閱(類似觀察者模式)。

         為了驗證這種模式,我們準備建構一個簡單的日志系統。這個系統包含兩類程式,一類程式發動日志,另一類程式接收和處理日志。

         在我們的日志系統中,每一個運作的接收者程式都會收到日志。然後我們實作,一個接收者将接收到的資料寫到硬碟上,與此同時,另一個接收者把接收到的消息展現在螢幕上。

         本質上來說,就是釋出的日志消息會轉發給所有的接收者。

1、轉發器(Exchanges)

前面的部落格中我們主要的介紹都是發送者發送消息給隊列,接收者從隊列接收消息。下面我們會引入Exchanges,展示RabbitMQ的完整的消息模型。

RabbitMQ消息模型的核心理念是生産者永遠不會直接發送任何消息給隊列,一般的情況生産者甚至不知道消息應該發送到哪些隊列。

相反的,生産者隻能發送消息給轉發器(Exchange)。轉發器是非常簡單的,一邊接收從生産者發來的消息,另一邊把消息推送到隊列中。轉發器必須清楚的知道消息如何處理它收到的每一條消息。是否應該追加到一個指定的隊列?是否應該追加到多個隊列?或者是否應該丢棄?這些規則通過轉發器的類型進行定義。

RabbitMQ (三) 釋出/訂閱

下面列出一些可用的轉發器類型:

Direct

Topic

Headers

Fanout

目前我們關注最後一個fanout,聲明轉發器類型的代碼:

channel.exchangeDeclare("logs","fanout");

fanout類型轉發器特别簡單,把所有它介紹到的消息,廣播到所有它所知道的隊列。不過這正是我們前述的日志系統所需要的。

2、匿名轉發器(nameless exchange)

前面說到生産者隻能發送消息給轉發器(Exchange),但是我們前兩篇部落格中的例子并沒有使用到轉發器,我們仍然可以發送和接收消息。這是因為我們使用了一個預設的轉發器,它的辨別符為””。之前發送消息的代碼:

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一個參數為轉發器的名稱,我們設定為”” : 如果存在routingKey(第二個參數),消息由routingKey決定發送到哪個隊列。

現在我們可以指定消息發送到的轉發器:

channel.basicPublish( "logs","", null, message.getBytes());

3、臨時隊列(Temporary queues)

前面的部落格中我們都為隊列指定了一個特定的名稱。能夠為隊列命名對我們來說是很關鍵的,我們需要指定消費者為某個隊列。當我們希望在生産者和消費者間共享隊列時,為隊列命名是很重要的。

不過,對于我們的日志系統我們并不關心隊列的名稱。我們想要接收到所有的消息,而且我們也隻對目前正在傳遞的資料的感興趣。為了滿足我們的需求,需要做兩件事:

第一, 無論什麼時間連接配接到Rabbit我們都需要一個新的空的隊列。為了實作,我們可以使用随機數建立隊列,或者更好的,讓伺服器給我們提供一個随機的名稱。

第二, 一旦消費者與Rabbit斷開,消費者所接收的那個隊列應該被自動删除。

​​Java​​中我們可以使用queueDeclare()方法,不傳遞任何參數,來建立一個非持久的、唯一的、自動删除的隊列且隊列名稱由伺服器随機産生。

String queueName = channel.queueDeclare().getQueue();

一般情況這個名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。

4、綁定(Bindings)

RabbitMQ (三) 釋出/訂閱

我們已經建立了一個fanout轉發器和隊列,我們現在需要通過binding告訴轉發器把消息發送給我們的隊列。

channel.queueBind(queueName, “logs”, ””)參數1:隊列名稱 ;參數2:轉發器名稱

5、完整的例子

RabbitMQ (三) 釋出/訂閱

日志發送端:

[java]  ​​view plain​​  ​​copy​​

  ​​​ ​​

1. package com.zhy.rabbit._03_bindings_exchanges;  
2.   
3. import java.io.IOException;  
4. import java.util.Date;  
5.   
6. import com.rabbitmq.client.Channel;  
7. import com.rabbitmq.client.Connection;  
8. import com.rabbitmq.client.ConnectionFactory;  
9.   
10. public class EmitLog  
11. {  
12. private final static String EXCHANGE_NAME = "ex_log";  
13.   
14. public static void main(String[] args) throws IOException  
15.     {  
16. // 建立連接配接和頻道  
17. new ConnectionFactory();  
18. "localhost");  
19.         Connection connection = factory.newConnection();  
20.         Channel channel = connection.createChannel();  
21. // 聲明轉發器和類型  
22. "fanout" );  
23.           
24. new Date().toLocaleString()+" : log something";  
25. // 往轉發器上發送消息  
26. "", null, message.getBytes());  
27.   
28. " [x] Sent '" + message + "'");  
29.   
30.         channel.close();  
31.         connection.close();  
32.   
33.     }  
34.   
35. }      

沒什麼太大的改變,聲明隊列的代碼,改為聲明轉發器了,同樣的消息的傳遞也交給了轉發器。

接收端1 :ReceiveLogsToSave.java:

[java]  ​​view plain​​  ​​copy​​

  ​​​ ​​​

1. package com.zhy.rabbit._03_bindings_exchanges;  
2.   
3. import java.io.File;  
4. import java.io.FileNotFoundException;  
5. import java.io.FileOutputStream;  
6. import java.io.IOException;  
7. import java.text.SimpleDateFormat;  
8. import java.util.Date;  
9.   
10. import com.rabbitmq.client.Channel;  
11. import com.rabbitmq.client.Connection;  
12. import com.rabbitmq.client.ConnectionFactory;  
13. import com.rabbitmq.client.QueueingConsumer;  
14.   
15. public class ReceiveLogsToSave  
16. {  
17. private final static String EXCHANGE_NAME = "ex_log";  
18.   
19. public static void main(String[] argv) throws java.io.IOException,  
20.             java.lang.InterruptedException  
21.     {  
22. // 建立連接配接和頻道  
23. new ConnectionFactory();  
24. "localhost");  
25.         Connection connection = factory.newConnection();  
26.         Channel channel = connection.createChannel();  
27.   
28. "fanout");  
29. // 建立一個非持久的、唯一的且自動删除的隊列  
30.         String queueName = channel.queueDeclare().getQueue();  
31. // 為轉發器指定隊列,設定binding  
32. "");  
33.   
34. " [*] Waiting for messages. To exit press CTRL+C");  
35.   
36. new QueueingConsumer(channel);  
37. // 指定接收者,第二個參數為自動應答,無需手動應答  
38. true, consumer);  
39.   
40. while (true)  
41.         {  
42.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
43. new String(delivery.getBody());  
44.   
45.             print2File(message);  
46.         }  
47.   
48.     }  
49.   
50. private static void print2File(String msg)  
51.     {  
52. try  
53.         {  
54. class.getClassLoader().getResource("").getPath();  
55. new SimpleDateFormat("yyyy-MM-dd")  
56. new Date());  
57. new File(dir, logFileName+".txt");  
58. new FileOutputStream(file, true);  
59. "\r\n").getBytes());  
60.             fos.flush();  
61.             fos.close();  
62. catch (FileNotFoundException e)  
63.         {  
64.             e.printStackTrace();  
65. catch (IOException e)  
66.         {  
67.             e.printStackTrace();  
68.         }  
69.     }  
70. }      

随機建立一個隊列,然後将隊列與轉發器綁定,然後将消費者與該隊列綁定,然後寫入日志檔案。

接收端2:ReceiveLogsToConsole.java

[java]  ​​view plain​​  ​​copy​​

  ​​​ ​​​

1. package com.zhy.rabbit._03_bindings_exchanges;  
2.   
3. import com.rabbitmq.client.Channel;  
4. import com.rabbitmq.client.Connection;  
5. import com.rabbitmq.client.ConnectionFactory;  
6. import com.rabbitmq.client.QueueingConsumer;  
7.   
8. public class ReceiveLogsToConsole  
9. {  
10. private final static String EXCHANGE_NAME = "ex_log";  
11.   
12. public static void main(String[] argv) throws java.io.IOException,  
13.             java.lang.InterruptedException  
14.     {  
15. // 建立連接配接和頻道  
16. new ConnectionFactory();  
17. "localhost");  
18.         Connection connection = factory.newConnection();  
19.         Channel channel = connection.createChannel();  
20.   
21. "fanout");  
22. // 建立一個非持久的、唯一的且自動删除的隊列  
23.         String queueName = channel.queueDeclare().getQueue();  
24. // 為轉發器指定隊列,設定binding  
25. "");  
26.   
27. " [*] Waiting for messages. To exit press CTRL+C");  
28.   
29. new QueueingConsumer(channel);  
30. // 指定接收者,第二個參數為自動應答,無需手動應答  
31. true, consumer);  
32.   
33. while (true)  
34.         {  
35.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
36. new String(delivery.getBody());  
37. " [x] Received '" + message + "'");  
38.   
39.         }  
40.   
41.     }  
42.   
43. }      

随機建立一個隊列,然後将隊列與轉發器綁定,然後将消費者與該隊列綁定,然後列印到控制台。

現在把兩個接收端運作,然後運作3次發送端:

輸出結果:

發送端:

 [x] Sent '2014-7-10 16:04:54 : log something'

 [x] Sent '2014-7-10 16:04:58 : log something'

 [x] Sent '2014-7-10 16:05:02 : log something'

接收端1:

RabbitMQ (三) 釋出/訂閱

接收端2:

 [*] Waiting for messages. To exit press CTRL+C

 [x] Received '2014-7-10 16:04:54 : log something'

 [x] Received '2014-7-10 16:04:58 : log something'

 [x] Received '2014-7-10 16:05:02 : log something'