天天看點

RabbitMQ詳解(三)

一、分發到多Consumer(fanout)

二、Routing路由(Direct)

三、主題路由(Topic)

将同一個Message deliver到多個Consumer中。這個模式也被稱為"publish/subscribe"

建立一個日志系統,包含兩部分:第一部分發出log(Producer),第二部分接收到并列印(Consumer)。兩個Consumer,第一個将log寫到實體磁盤上;第二個将log輸出的螢幕。

1.發送消息流程:

    1.Producer發送的Message實際上是發到了Exchange中。

    2.Exchanges從Producer接收message投遞到queue中

    3.Prducer發送的消息隻是到達了Exchange中,Exchange具有不同的類型實作不同的分發方式

Exchnges的類型:direct、topic和fanout

fanout就是廣播模式,會将所有的Message都放到它所知道的queue中

channel.exchange_declare(exchange='logs',  

    type='fanout')   //建立一個名字為logs,類型為fanout的Exchange:

<code>[root@node</code><code>112</code> <code>~]# rabbitmqctl list_exchanges //檢視所有的Exchanges</code>

<code>Listing exchanges ...</code>

<code>logs  fanout</code>

<code>amq.direct    direct</code>

<code>amq.fanout    fanout</code>

<code>amq.headers    headers</code>

<code>amq.match    headers</code>

<code>amq.rabbitmq.log    topic</code>

<code>amq.rabbitmq.trace    topic</code>

<code>amq.topic    topic</code>

<code>...done.</code>

注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ預設建立的。 

通過exchange,而不是routing_key來publish Message:

channel.basic_publish(exchange='logs',  

    routing_key='',  

    body=message)  

2.臨時隊列

截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。

但是對于我們将要建構的日志系統,并不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們隻對目前的log感興趣。為了實作這個目标,我們需要兩件事情:

    1)每當Consumer連接配接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那麼RabbitMQ會随機為我們選擇這個名字。方法:

    result = channel.queue_declare() 

    通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    2)當Consumer關閉連接配接時,這個queue要被deleted。可以加個exclusive的參數。方法:

    result = channel.queue_declare(exclusive=True)   //每次擷取的都是新的,單獨使用的

3.Bindings綁定

    建立好fanout類型的Exchange和沒有名字的queue後(實際上是RabbitMQ幫我們取的名字)Exchange通過bindings把它的Message發送到目标queue

    channel.queue_bind(exchange='logs',  

        queue=result.method.queue)      

    使用指令rabbitmqctl list_bindings 檢視bindings

4.最終代碼

拓撲圖:

Producer,在這裡就是産生log的program,基本上和前幾個都差不多。最主要的差別就是publish通過了exchange而不是routing_key。

emit_log.py script:

===========================================================================

<code>#!/usr/bin/env python</code>

<code>import pika</code>

<code>import sys</code>

<code>connection = pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>    </code><code>host=</code><code>'localhost'</code><code>))</code>

<code>channel = connection.channel()</code>

<code>channel.exchange_declare(exchange=</code><code>'logs'</code><code>,</code>

<code>    </code><code>type=</code><code>'fanout'</code><code>)</code>

<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>1:</code><code>]) or </code><code>"info: Hello World!"</code>

<code>channel.basic_publish(exchange=</code><code>'logs'</code><code>,</code>

<code>    </code><code>routing_key=</code><code>''</code><code>,</code>

<code>    </code><code>body=message)</code>

<code>print</code> <code>" [x] Sent %r"</code> <code>% (message,)</code>

<code>connection.close()</code>

還有一點要注意的是我們聲明了exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丢棄的。

Consumer:receive_logs.py:

<code>result = channel.queue_declare(exclusive=True)</code>

<code>queue_name = result.method.queue</code>

<code>channel.queue_bind(exchange=</code><code>'logs'</code><code>,</code>

<code>    </code><code>queue=queue_name)</code>

<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>

<code>def callback(ch, method, properties, body):</code>

<code>    </code><code>print</code> <code>" [x] %r"</code> <code>% (body,)</code>

<code>channel.basic_consume(callback,</code>

<code>    </code><code>queue=queue_name,</code>

<code>    </code><code>no_ack=True)</code>

<code>channel.start_consuming()</code>

試運作:

    Consumer1:$ python receive_logs.py &gt; logs_from_rabbit.log  //追加到檔案

    Consumer2:python receive_logs.py //輸出到螢幕

    Producer:python emit_log.py

也可通過修改callback自己寫檔案

輸出結果如圖:

對于上一個日志系統改進。能夠使用不同的severity來監聽不同等級的log。比如我們希望隻有error的log才儲存到磁盤上。

1.Bindings綁定

之前的綁定

channel.queue_bind(exchange=exchange_name,  

    queue=queue_name)  

綁定其實就是關聯了exchange和queue。或者這麼說:queue對exchagne的内容感興趣,exchange要把它的Message deliver到queue中。

實際上,綁定可以帶routing_key 這個參數。其實這個參數的名稱和basic_publish 的參數名是相同了。為了避免混淆,我們把它成為binding key。

    使用一個key來建立binding :

    queue=queue_name,  

    routing_key='black') 

對于fanout的exchange來說,這個參數是被忽略的。

2.Direct Exchange

通過Bindings key完全比對

圖Direct路由模型

exchange X和兩個queue綁定在一起。Q1的binding key是orange。Q2的binding key是black和green。

當P publish key是orange時,exchange會把它放到Q1。如果是black或者green那麼就會到Q2。其餘的Message都會被丢棄。

3.多重綁定(Multiple Bindings)

多個queue綁定同一個key是可以的。對于下圖的例子,Q1和Q2都綁定了black。也就是說,對于routing key是black的Message,會被deliver到Q1和Q2。其餘的Message都會被丢棄。

圖muliti-bindings

4.生産者和消費者

生産者:

<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>,  </code>

<code>    </code><code>type=</code><code>'direct'</code><code>)  </code>

<code>//建立一個direct的exchange。使用log的severity作為routing key,這樣Consumer可以針對不同severity的log進行不同的處理。</code>

<code>publish:</code>

<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>,  </code>

<code>    </code><code>routing_key=severity, </code>

<code>    </code><code>body=message)  </code>

<code>//涉及三種severity:</code><code>'info'</code><code>, </code><code>'warning'</code><code>, </code><code>'error'</code><code>.</code>

消費者:

<code>result = channel.queue_declare(exclusive=True)  </code>

<code>queue_name = result.method.queue  </code>

<code>for severity in severities:  </code>

<code>    </code><code>channel.queue_bind(exchange=</code><code>'direct_logs'</code><code>,  </code>

<code>        </code><code>queue=queue_name,  </code>

<code>        </code><code>routing_key=severity) </code>

<code>//queue需要綁定severity</code>

5.最終版本

圖:direct_2

emit_log_direct.py 

<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>,</code>

<code>    </code><code>type=</code><code>'direct'</code><code>)</code>

<code>severity = sys.argv[</code><code>1</code><code>] if len(sys.argv) &gt; </code><code>1</code> <code>else </code><code>'info'</code>

<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>2:</code><code>]) or </code><code>'Hello World!'</code>

<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>,</code>

<code>    </code><code>routing_key=severity,</code>

<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (severity, message)</code>

receive_logs_direct.py: 

<code>#!/usr/bin/env python  </code>

<code>import pika  </code>

<code>import sys  </code>

<code>connection = pika.BlockingConnection(pika.ConnectionParameters(  </code>

<code>    </code><code>host=</code><code>'localhost'</code><code>))  </code>

<code>channel = connection.channel()  </code>

<code>severities = sys.argv[</code><code>1:</code><code>]  </code>

<code>if not severities:  </code>

<code>    </code><code>print</code> <code>&gt;&gt; sys.stderr, </code><code>"Usage: %s [info] [warning] [error]"</code> <code>% \  </code>

<code>        </code><code>(sys.argv[</code><code>0</code><code>],)  </code>

<code>    </code><code>sys.exit(</code><code>1</code><code>)  </code>

<code>for severity in severities:      </code>

<code>        </code><code>routing_key=severity)  </code>

<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>  

<code>def callback(ch, method, properties, body):  </code>

<code>    </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,)  </code>

<code>channel.basic_consume(callback,  </code>

<code>    </code><code>queue=queue_name,  </code>

<code>    </code><code>no_ack=True)  </code>

$ python receive_logs_direct.py warning error &gt; logs_from_rabbit.log 

    //把warning和error的log記錄到一個檔案中

$ python receive_logs_direct.py info warning error  

    //列印所有log到螢幕    

1.Topic exchange

Message的routing_key使用限制,不能使任意的。格式是以點号“."分割的字元表。

比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。

    對于routing_key,有兩個特殊字元(在正規表達式裡叫元字元):

    * (星号) 代表任意 一個單詞

    # (hash) 0個或者多個單詞

示例:

Producer發送消息時需要設定routing_key,routing_key包含三個單詞和兩個點号。

    第一個key是描述了celerity(靈巧,靈活),第二個是colour(色彩),第三個是species(物種):"&lt;celerity&gt;.&lt;colour&gt;.&lt;species&gt;"。

在這裡我們建立了兩個綁定: Q1 的binding key 是"*.orange.*"; Q2 是  "*.*.rabbit" 和 "lazy.#":

    Q1 感興趣所有orange顔色的動物

    Q2 感興趣所有的rabbits和所有的lazy的

比如routing_key是 "quick.orange.rabbit"将會發送到Q1和Q2中。消息"lazy.orange.elephant" 也會發送到Q1和Q2。但是"quick.orange.fox" 會發送到Q1;"lazy.brown.fox"會發送到Q2。"lazy.pink.rabbit" 也會發送到Q2,但是盡管兩個routing_key都比對,它也隻是發送一次。"quick.brown.fox" 會被丢棄。

如果發送的單詞不是3個呢? 答案要看情況,因為#是可以比對0個或任意個單詞。比如"orange" or "quick.orange.male.rabbit",它們會被丢棄。如果是lazy那麼就會進入Q2。類似的還有 "lazy.orange.male.rabbit",盡管它包含四個單詞。

Topic exchange和其他exchange

    由于有"*" (star) and "#" (hash), Topic exchange 非常強大并且可以轉化為其他的exchange:

    如果binding_key 是 "#" - 它會接收所有的Message,不管routing_key是什麼,就像是fanout exchange。

    如果 "*" (star) and "#" (hash) 沒有被使用,那麼topic exchange就變成了direct exchange。

2.代碼實作

The code for emit_log_topic.py:

========================================================================

<code>channel.exchange_declare(exchange=</code><code>'topic_logs'</code><code>,</code>

<code>    </code><code>type=</code><code>'topic'</code><code>)</code>

<code>routing_key = sys.argv[</code><code>1</code><code>] if len(sys.argv) &gt; </code><code>1</code> <code>else </code><code>'anonymous.info'</code>

<code>channel.basic_publish(exchange=</code><code>'topic_logs'</code><code>,</code>

<code>    </code><code>routing_key=routing_key,</code>

<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (routing_key, message)</code>

The code for receive_logs_topic.py:     

========================================================================    

<code>    </code> 

<code>binding_keys = sys.argv[</code><code>1:</code><code>]</code>

<code>if not binding_keys:</code>

<code>    </code><code>print</code> <code>&gt;&gt; sys.stderr, </code><code>"Usage: %s [binding_key]..."</code> <code>% (sys.argv[</code><code>0</code><code>],)</code>

<code>    </code><code>sys.exit(</code><code>1</code><code>)</code>

<code>for binding_key in binding_keys:</code>

<code>    </code><code>channel.queue_bind(exchange=</code><code>'topic_logs'</code><code>,</code>

<code>        </code><code>queue=queue_name,</code>

<code>        </code><code>routing_key=binding_key)</code>

<code>    </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,)</code>

3.運作和結果

    python receive_logs_topic.py "#"  //接收所有的log

    python receive_logs_topic.py "kern.*"  //接收所有kern facility的log

    python receive_logs_topic.py "*.critical"  //僅僅接收critical的log: 

    python receive_logs_topic.py "kern.*" "*.critical"  //可以建立多個綁定: 

    python emit_log_topic.py "kern.critical" "A critical kernel error"  //Producer産生一個log:"kern.critical" type: 

參考:    

http://www.rabbitmq.com/tutorials/tutorial-three-python.html

本文轉自MT_IT51CTO部落格,原文連結:http://blog.51cto.com/hmtk520/2051247,如需轉載請自行聯系原作者