一、分發到多Consumer(fanout)
二、Routing路由(Direct)
三、主題路由(Topic)
将同一個Message deliver到多個Consumer中。這個模式也被稱為"publish/subscribe"
建立一個日志系統,包含兩部分:第一部分發出log(Producer),第二部分接收到并列印(Consumer)。兩個Consumer,第一個将log寫到實體磁盤上;第二個将log輸出的螢幕。
1.發送消息流程:
1.Producer發送的Message實際上是發到了Exchange中。
2.Exchanges從Producer接收message投遞到queue中
3.Prducer發送的消息隻是到達了Exchange中,Exchange具有不同的類型實作不同的分發方式
Exchnges的類型:direct、topic和fanout
fanout就是廣播模式,會将所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs',
type='fanout') //建立一個名字為logs,類型為fanout的Exchange:
<code>[root@node</code><code>112</code> <code>~]# rabbitmqctl list_exchanges //檢視所有的Exchanges</code>
<code>Listing exchanges ...</code>
<code>logs fanout</code>
<code>amq.direct direct</code>
<code>amq.fanout fanout</code>
<code>amq.headers headers</code>
<code>amq.match headers</code>
<code>amq.rabbitmq.log topic</code>
<code>amq.rabbitmq.trace topic</code>
<code>amq.topic topic</code>
<code>...done.</code>
注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ預設建立的。
通過exchange,而不是routing_key來publish Message:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
2.臨時隊列
截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。
但是對于我們将要建構的日志系統,并不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們隻對目前的log感興趣。為了實作這個目标,我們需要兩件事情:
1)每當Consumer連接配接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那麼RabbitMQ會随機為我們選擇這個名字。方法:
result = channel.queue_declare()
通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2)當Consumer關閉連接配接時,這個queue要被deleted。可以加個exclusive的參數。方法:
result = channel.queue_declare(exclusive=True) //每次擷取的都是新的,單獨使用的
3.Bindings綁定
建立好fanout類型的Exchange和沒有名字的queue後(實際上是RabbitMQ幫我們取的名字)Exchange通過bindings把它的Message發送到目标queue
channel.queue_bind(exchange='logs',
queue=result.method.queue)
使用指令rabbitmqctl list_bindings 檢視bindings
4.最終代碼
拓撲圖:
Producer,在這裡就是産生log的program,基本上和前幾個都差不多。最主要的差別就是publish通過了exchange而不是routing_key。
emit_log.py script:
===========================================================================
<code>#!/usr/bin/env python</code>
<code>import pika</code>
<code>import sys</code>
<code>connection = pika.BlockingConnection(pika.ConnectionParameters(</code>
<code> </code><code>host=</code><code>'localhost'</code><code>))</code>
<code>channel = connection.channel()</code>
<code>channel.exchange_declare(exchange=</code><code>'logs'</code><code>,</code>
<code> </code><code>type=</code><code>'fanout'</code><code>)</code>
<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>1:</code><code>]) or </code><code>"info: Hello World!"</code>
<code>channel.basic_publish(exchange=</code><code>'logs'</code><code>,</code>
<code> </code><code>routing_key=</code><code>''</code><code>,</code>
<code> </code><code>body=message)</code>
<code>print</code> <code>" [x] Sent %r"</code> <code>% (message,)</code>
<code>connection.close()</code>
還有一點要注意的是我們聲明了exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丢棄的。
Consumer:receive_logs.py:
<code>result = channel.queue_declare(exclusive=True)</code>
<code>queue_name = result.method.queue</code>
<code>channel.queue_bind(exchange=</code><code>'logs'</code><code>,</code>
<code> </code><code>queue=queue_name)</code>
<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>
<code>def callback(ch, method, properties, body):</code>
<code> </code><code>print</code> <code>" [x] %r"</code> <code>% (body,)</code>
<code>channel.basic_consume(callback,</code>
<code> </code><code>queue=queue_name,</code>
<code> </code><code>no_ack=True)</code>
<code>channel.start_consuming()</code>
試運作:
Consumer1:$ python receive_logs.py > logs_from_rabbit.log //追加到檔案
Consumer2:python receive_logs.py //輸出到螢幕
Producer:python emit_log.py
也可通過修改callback自己寫檔案
輸出結果如圖:
對于上一個日志系統改進。能夠使用不同的severity來監聽不同等級的log。比如我們希望隻有error的log才儲存到磁盤上。
1.Bindings綁定
之前的綁定
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
綁定其實就是關聯了exchange和queue。或者這麼說:queue對exchagne的内容感興趣,exchange要把它的Message deliver到queue中。
實際上,綁定可以帶routing_key 這個參數。其實這個參數的名稱和basic_publish 的參數名是相同了。為了避免混淆,我們把它成為binding key。
使用一個key來建立binding :
queue=queue_name,
routing_key='black')
對于fanout的exchange來說,這個參數是被忽略的。
2.Direct Exchange
通過Bindings key完全比對
圖Direct路由模型
exchange X和兩個queue綁定在一起。Q1的binding key是orange。Q2的binding key是black和green。
當P publish key是orange時,exchange會把它放到Q1。如果是black或者green那麼就會到Q2。其餘的Message都會被丢棄。
3.多重綁定(Multiple Bindings)
多個queue綁定同一個key是可以的。對于下圖的例子,Q1和Q2都綁定了black。也就是說,對于routing key是black的Message,會被deliver到Q1和Q2。其餘的Message都會被丢棄。
圖muliti-bindings
4.生産者和消費者
生産者:
<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>, </code>
<code> </code><code>type=</code><code>'direct'</code><code>) </code>
<code>//建立一個direct的exchange。使用log的severity作為routing key,這樣Consumer可以針對不同severity的log進行不同的處理。</code>
<code>publish:</code>
<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>, </code>
<code> </code><code>routing_key=severity, </code>
<code> </code><code>body=message) </code>
<code>//涉及三種severity:</code><code>'info'</code><code>, </code><code>'warning'</code><code>, </code><code>'error'</code><code>.</code>
消費者:
<code>result = channel.queue_declare(exclusive=True) </code>
<code>queue_name = result.method.queue </code>
<code>for severity in severities: </code>
<code> </code><code>channel.queue_bind(exchange=</code><code>'direct_logs'</code><code>, </code>
<code> </code><code>queue=queue_name, </code>
<code> </code><code>routing_key=severity) </code>
<code>//queue需要綁定severity</code>
5.最終版本
圖:direct_2
emit_log_direct.py
<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>,</code>
<code> </code><code>type=</code><code>'direct'</code><code>)</code>
<code>severity = sys.argv[</code><code>1</code><code>] if len(sys.argv) > </code><code>1</code> <code>else </code><code>'info'</code>
<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>2:</code><code>]) or </code><code>'Hello World!'</code>
<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>,</code>
<code> </code><code>routing_key=severity,</code>
<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (severity, message)</code>
receive_logs_direct.py:
<code>#!/usr/bin/env python </code>
<code>import pika </code>
<code>import sys </code>
<code>connection = pika.BlockingConnection(pika.ConnectionParameters( </code>
<code> </code><code>host=</code><code>'localhost'</code><code>)) </code>
<code>channel = connection.channel() </code>
<code>severities = sys.argv[</code><code>1:</code><code>] </code>
<code>if not severities: </code>
<code> </code><code>print</code> <code>>> sys.stderr, </code><code>"Usage: %s [info] [warning] [error]"</code> <code>% \ </code>
<code> </code><code>(sys.argv[</code><code>0</code><code>],) </code>
<code> </code><code>sys.exit(</code><code>1</code><code>) </code>
<code>for severity in severities: </code>
<code> </code><code>routing_key=severity) </code>
<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>
<code>def callback(ch, method, properties, body): </code>
<code> </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,) </code>
<code>channel.basic_consume(callback, </code>
<code> </code><code>queue=queue_name, </code>
<code> </code><code>no_ack=True) </code>
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
//把warning和error的log記錄到一個檔案中
$ python receive_logs_direct.py info warning error
//列印所有log到螢幕
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以點号“."分割的字元表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。
對于routing_key,有兩個特殊字元(在正規表達式裡叫元字元):
* (星号) 代表任意 一個單詞
# (hash) 0個或者多個單詞
示例:
Producer發送消息時需要設定routing_key,routing_key包含三個單詞和兩個點号。
第一個key是描述了celerity(靈巧,靈活),第二個是colour(色彩),第三個是species(物種):"<celerity>.<colour>.<species>"。
在這裡我們建立了兩個綁定: Q1 的binding key 是"*.orange.*"; Q2 是 "*.*.rabbit" 和 "lazy.#":
Q1 感興趣所有orange顔色的動物
Q2 感興趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"将會發送到Q1和Q2中。消息"lazy.orange.elephant" 也會發送到Q1和Q2。但是"quick.orange.fox" 會發送到Q1;"lazy.brown.fox"會發送到Q2。"lazy.pink.rabbit" 也會發送到Q2,但是盡管兩個routing_key都比對,它也隻是發送一次。"quick.brown.fox" 會被丢棄。
如果發送的單詞不是3個呢? 答案要看情況,因為#是可以比對0個或任意個單詞。比如"orange" or "quick.orange.male.rabbit",它們會被丢棄。如果是lazy那麼就會進入Q2。類似的還有 "lazy.orange.male.rabbit",盡管它包含四個單詞。
Topic exchange和其他exchange
由于有"*" (star) and "#" (hash), Topic exchange 非常強大并且可以轉化為其他的exchange:
如果binding_key 是 "#" - 它會接收所有的Message,不管routing_key是什麼,就像是fanout exchange。
如果 "*" (star) and "#" (hash) 沒有被使用,那麼topic exchange就變成了direct exchange。
2.代碼實作
The code for emit_log_topic.py:
========================================================================
<code>channel.exchange_declare(exchange=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>type=</code><code>'topic'</code><code>)</code>
<code>routing_key = sys.argv[</code><code>1</code><code>] if len(sys.argv) > </code><code>1</code> <code>else </code><code>'anonymous.info'</code>
<code>channel.basic_publish(exchange=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>routing_key=routing_key,</code>
<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (routing_key, message)</code>
The code for receive_logs_topic.py:
========================================================================
<code> </code>
<code>binding_keys = sys.argv[</code><code>1:</code><code>]</code>
<code>if not binding_keys:</code>
<code> </code><code>print</code> <code>>> sys.stderr, </code><code>"Usage: %s [binding_key]..."</code> <code>% (sys.argv[</code><code>0</code><code>],)</code>
<code> </code><code>sys.exit(</code><code>1</code><code>)</code>
<code>for binding_key in binding_keys:</code>
<code> </code><code>channel.queue_bind(exchange=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>queue=queue_name,</code>
<code> </code><code>routing_key=binding_key)</code>
<code> </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,)</code>
3.運作和結果
python receive_logs_topic.py "#" //接收所有的log
python receive_logs_topic.py "kern.*" //接收所有kern facility的log
python receive_logs_topic.py "*.critical" //僅僅接收critical的log:
python receive_logs_topic.py "kern.*" "*.critical" //可以建立多個綁定:
python emit_log_topic.py "kern.critical" "A critical kernel error" //Producer産生一個log:"kern.critical" type:
參考:
http://www.rabbitmq.com/tutorials/tutorial-three-python.html
本文轉自MT_IT51CTO部落格,原文連結:http://blog.51cto.com/hmtk520/2051247,如需轉載請自行聯系原作者