天天看點

(轉)RabbitMQ消息隊列(四):分發到多Consumer(Publish/Subscribe)1. Exchanges2. Temporary queues3. Bindings綁定4. 最終版本

上篇文章中,我們把每個Message都是deliver到某個Consumer。在這篇文章中,我們将會将同一個Message deliver到多個Consumer中。這個模式也被成為 "publish / subscribe"。

    這篇文章中,我們将建立一個日志系統,它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到并列印(Consumer)。 我們将建構兩個Consumer,第一個将log寫到實體磁盤上;第二個将log輸出的螢幕。

1. Exchanges

    關于exchange的概念在《RabbitMQ消息隊列(一): Detailed Introduction 詳細介紹》中有詳細介紹。現在做一下簡單的回顧。

    RabbitMQ 的Messaging Model就是Producer并不會直接發送Message到queue。實際上,Producer并不知道它發送的Message是否已經到達queue。

   Producer發送的Message實際上是發到了Exchange中。它的功能也很簡單:從Producer接收Message,然後投遞到queue中。Exchange需要知道如何處理Message,是把它放到那個queue中,還是放到多個queue中?這個rule是通過Exchange 的類型定義的。

(轉)RabbitMQ消息隊列(四):分發到多Consumer(Publish/Subscribe)1. Exchanges2. Temporary queues3. Bindings綁定4. 最終版本

   我們知道有三種類型的Exchange:direct, topic 和fanout。fanout就是廣播模式,會将所有的Message都放到它所知道的queue中。建立一個名字為logs,類型為fanout的Exchange:

[python] view plaincopy

  1. channel.exchange_declare(exchange='logs',  
  2.                          type='fanout')  

Listing exchanges

通過rabbitmqctl可以列出目前所有的Exchange:

[python] view plaincopy

  1. $ sudo rabbitmqctl list_exchanges  
  2. Listing exchanges ...  
  3. logs      fanout  
  4. amq.direct      direct  
  5. amq.topic       topic  
  6. amq.fanout      fanout  
  7. amq.headers     headers  
  8. ...done.  

注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ預設建立的。

現在我們可以通過exchange,而不是routing_key來publish Message了:

[python] view plaincopy

  1. channel.basic_publish(exchange='logs',  
  2.                       routing_key='',  
  3.                       body=message)  

2. Temporary queues

    截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。

    但是對于我們将要建構的日志系統,并不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們隻對目前的log感興趣。為了實作這個目标,我們需要兩件事情:

    1) 每當Consumer連接配接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那麼RabbitMQ會随機為我們選擇這個名字。方法:

[python] view plaincopy

  1. result = channel.queue_declare()  

    通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    2)當Consumer關閉連接配接時,這個queue要被deleted。可以加個exclusive的參數。方法:

[python] view plaincopy

  1. result = channel.queue_declare(exclusive=True)  

3. Bindings綁定

現在我們已經建立了fanout類型的exchange和沒有名字的queue(實際上是RabbitMQ幫我們取了名字)。那exchange怎麼樣知道它的Message發送到哪個queue呢?答案就是通過bindings:綁定。

(轉)RabbitMQ消息隊列(四):分發到多Consumer(Publish/Subscribe)1. Exchanges2. Temporary queues3. Bindings綁定4. 最終版本

方法:

[python] view plaincopy

  1. channel.queue_bind(exchange='logs',  
  2.                    queue=result.method.queue)  

現在logs的exchange就将它的Message附加到我們建立的queue了。

Listing bindings

使用指令rabbitmqctl list_bindings。

4. 最終版本

    我們最終實作的資料流圖如下:

(轉)RabbitMQ消息隊列(四):分發到多Consumer(Publish/Subscribe)1. Exchanges2. Temporary queues3. Bindings綁定4. 最終版本

Producer,在這裡就是産生log的program,基本上和前幾個都差不多。最主要的差別就是publish通過了exchange而不是routing_key。

emit_log.py script:

[python] view plaincopy

  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. channel.exchange_declare(exchange='logs',  
  8.                          type='fanout')  
  9. message = ' '.join(sys.argv[1:]) or "info: Hello World!"  
  10. channel.basic_publish(exchange='logs',  
  11.                       routing_key='',  
  12.                       body=message)  
  13. print " [x] Sent %r" % (message,)  
  14. connection.close()  

還有一點要注意的是我們聲明了exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丢棄的。

Consumer:receive_logs.py:

[python] view plaincopy

  1. #!/usr/bin/env python  
  2. import pika  
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  4.         host='localhost'))  
  5. channel = connection.channel()  
  6. channel.exchange_declare(exchange='logs',  
  7.                          type='fanout')  
  8. result = channel.queue_declare(exclusive=True)  
  9. queue_name = result.method.queue  
  10. channel.queue_bind(exchange='logs',  
  11.                    queue=queue_name)  
  12. print ' [*] Waiting for logs. To exit press CTRL+C'  
  13. def callback(ch, method, properties, body):  
  14.     print " [x] %r" % (body,)  
  15. channel.basic_consume(callback,  
  16.                       queue=queue_name,  
  17.                       no_ack=True)  
  18. channel.start_consuming()  

我們開始不是說需要兩個Consumer嗎?一個負責記錄到檔案;一個負責列印到螢幕?

其實用重定向就可以了,當然你想修改callback自己寫檔案也行。我們使用重定向的方法:

We're done. If you want to save logs to a file, just open a console and type:

[python] view plaincopy

  1. $ python receive_logs.py > logs_from_rabbit.log  

Consumer2:列印到螢幕:

[python] view plaincopy

  1. $ python receive_logs.py  

接下來,Producer:

[python] view plaincopy

  1. $ python emit_log.py  

使用指令rabbitmqctl list_bindings你可以看我們建立的queue。

一個output:

[python] view plaincopy

  1. $ sudo rabbitmqctl list_bindings  
  2. Listing bindings ...  
  3. logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []  
  4. logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []  
  5. ...done.  

這個結果還是很好了解的。