天天看点

RabbitMQ详解(三)

一、分发到多Consumer(fanout)

二、Routing路由(Direct)

三、主题路由(Topic)

将同一个Message deliver到多个Consumer中。这个模式也被称为"publish/subscribe"

创建一个日志系统,包含两部分:第一部分发出log(Producer),第二部分接收到并打印(Consumer)。两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

1.发送消息流程:

    1.Producer发送的Message实际上是发到了Exchange中。

    2.Exchanges从Producer接收message投递到queue中

    3.Prducer发送的消息只是到达了Exchange中,Exchange具有不同的类型实现不同的分发方式

Exchnges的类型:direct、topic和fanout

fanout就是广播模式,会将所有的Message都放到它所知道的queue中

channel.exchange_declare(exchange='logs',  

    type='fanout')   //创建一个名字为logs,类型为fanout的Exchange:

<code>[root@node</code><code>112</code> <code>~]# rabbitmqctl list_exchanges //查看所有的Exchanges</code>

<code>Listing exchanges ...</code>

<code>logs  fanout</code>

<code>amq.direct    direct</code>

<code>amq.fanout    fanout</code>

<code>amq.headers    headers</code>

<code>amq.match    headers</code>

<code>amq.rabbitmq.log    topic</code>

<code>amq.rabbitmq.trace    topic</code>

<code>amq.topic    topic</code>

<code>...done.</code>

注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。 

通过exchange,而不是routing_key来publish Message:

channel.basic_publish(exchange='logs',  

    routing_key='',  

    body=message)  

2.临时队列

截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。

但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:

    1)每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:

    result = channel.queue_declare() 

    通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:

    result = channel.queue_declare(exclusive=True)   //每次获取的都是新的,单独使用的

3.Bindings绑定

    创建好fanout类型的Exchange和没有名字的queue后(实际上是RabbitMQ帮我们取的名字)Exchange通过bindings把它的Message发送到目标queue

    channel.queue_bind(exchange='logs',  

        queue=result.method.queue)      

    使用命令rabbitmqctl list_bindings 查看bindings

4.最终代码

拓扑图:

Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。

emit_log.py script:

===========================================================================

<code>#!/usr/bin/env python</code>

<code>import pika</code>

<code>import sys</code>

<code>connection = pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>    </code><code>host=</code><code>'localhost'</code><code>))</code>

<code>channel = connection.channel()</code>

<code>channel.exchange_declare(exchange=</code><code>'logs'</code><code>,</code>

<code>    </code><code>type=</code><code>'fanout'</code><code>)</code>

<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>1:</code><code>]) or </code><code>"info: Hello World!"</code>

<code>channel.basic_publish(exchange=</code><code>'logs'</code><code>,</code>

<code>    </code><code>routing_key=</code><code>''</code><code>,</code>

<code>    </code><code>body=message)</code>

<code>print</code> <code>" [x] Sent %r"</code> <code>% (message,)</code>

<code>connection.close()</code>

还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。

Consumer:receive_logs.py:

<code>result = channel.queue_declare(exclusive=True)</code>

<code>queue_name = result.method.queue</code>

<code>channel.queue_bind(exchange=</code><code>'logs'</code><code>,</code>

<code>    </code><code>queue=queue_name)</code>

<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>

<code>def callback(ch, method, properties, body):</code>

<code>    </code><code>print</code> <code>" [x] %r"</code> <code>% (body,)</code>

<code>channel.basic_consume(callback,</code>

<code>    </code><code>queue=queue_name,</code>

<code>    </code><code>no_ack=True)</code>

<code>channel.start_consuming()</code>

试运行:

    Consumer1:$ python receive_logs.py &gt; logs_from_rabbit.log  //追加到文件

    Consumer2:python receive_logs.py //输出到屏幕

    Producer:python emit_log.py

也可通过修改callback自己写文件

输出结果如图:

对于上一个日志系统改进。能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。

1.Bindings绑定

之前的绑定

channel.queue_bind(exchange=exchange_name,  

    queue=queue_name)  

绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。

实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。

    使用一个key来创建binding :

    queue=queue_name,  

    routing_key='black') 

对于fanout的exchange来说,这个参数是被忽略的。

2.Direct Exchange

通过Bindings key完全匹配

图Direct路由模型

exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。

当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。

3.多重绑定(Multiple Bindings)

多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。

图muliti-bindings

4.生产者和消费者

生产者:

<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>,  </code>

<code>    </code><code>type=</code><code>'direct'</code><code>)  </code>

<code>//创建一个direct的exchange。使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。</code>

<code>publish:</code>

<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>,  </code>

<code>    </code><code>routing_key=severity, </code>

<code>    </code><code>body=message)  </code>

<code>//涉及三种severity:</code><code>'info'</code><code>, </code><code>'warning'</code><code>, </code><code>'error'</code><code>.</code>

消费者:

<code>result = channel.queue_declare(exclusive=True)  </code>

<code>queue_name = result.method.queue  </code>

<code>for severity in severities:  </code>

<code>    </code><code>channel.queue_bind(exchange=</code><code>'direct_logs'</code><code>,  </code>

<code>        </code><code>queue=queue_name,  </code>

<code>        </code><code>routing_key=severity) </code>

<code>//queue需要绑定severity</code>

5.最终版本

图:direct_2

emit_log_direct.py 

<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>,</code>

<code>    </code><code>type=</code><code>'direct'</code><code>)</code>

<code>severity = sys.argv[</code><code>1</code><code>] if len(sys.argv) &gt; </code><code>1</code> <code>else </code><code>'info'</code>

<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>2:</code><code>]) or </code><code>'Hello World!'</code>

<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>,</code>

<code>    </code><code>routing_key=severity,</code>

<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (severity, message)</code>

receive_logs_direct.py: 

<code>#!/usr/bin/env python  </code>

<code>import pika  </code>

<code>import sys  </code>

<code>connection = pika.BlockingConnection(pika.ConnectionParameters(  </code>

<code>    </code><code>host=</code><code>'localhost'</code><code>))  </code>

<code>channel = connection.channel()  </code>

<code>severities = sys.argv[</code><code>1:</code><code>]  </code>

<code>if not severities:  </code>

<code>    </code><code>print</code> <code>&gt;&gt; sys.stderr, </code><code>"Usage: %s [info] [warning] [error]"</code> <code>% \  </code>

<code>        </code><code>(sys.argv[</code><code>0</code><code>],)  </code>

<code>    </code><code>sys.exit(</code><code>1</code><code>)  </code>

<code>for severity in severities:      </code>

<code>        </code><code>routing_key=severity)  </code>

<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>  

<code>def callback(ch, method, properties, body):  </code>

<code>    </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,)  </code>

<code>channel.basic_consume(callback,  </code>

<code>    </code><code>queue=queue_name,  </code>

<code>    </code><code>no_ack=True)  </code>

$ python receive_logs_direct.py warning error &gt; logs_from_rabbit.log 

    //把warning和error的log记录到一个文件中

$ python receive_logs_direct.py info warning error  

    //打印所有log到屏幕    

1.Topic exchange

Message的routing_key使用限制,不能使任意的。格式是以点号“."分割的字符表。

比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。

    对于routing_key,有两个特殊字符(在正则表达式里叫元字符):

    * (星号) 代表任意 一个单词

    # (hash) 0个或者多个单词

示例:

Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。

    第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"&lt;celerity&gt;.&lt;colour&gt;.&lt;species&gt;"。

在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是  "*.*.rabbit" 和 "lazy.#":

    Q1 感兴趣所有orange颜色的动物

    Q2 感兴趣所有的rabbits和所有的lazy的

比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。

如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。

Topic exchange和其他exchange

    由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:

    如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。

    如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。

2.代码实现

The code for emit_log_topic.py:

========================================================================

<code>channel.exchange_declare(exchange=</code><code>'topic_logs'</code><code>,</code>

<code>    </code><code>type=</code><code>'topic'</code><code>)</code>

<code>routing_key = sys.argv[</code><code>1</code><code>] if len(sys.argv) &gt; </code><code>1</code> <code>else </code><code>'anonymous.info'</code>

<code>channel.basic_publish(exchange=</code><code>'topic_logs'</code><code>,</code>

<code>    </code><code>routing_key=routing_key,</code>

<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (routing_key, message)</code>

The code for receive_logs_topic.py:     

========================================================================    

<code>    </code> 

<code>binding_keys = sys.argv[</code><code>1:</code><code>]</code>

<code>if not binding_keys:</code>

<code>    </code><code>print</code> <code>&gt;&gt; sys.stderr, </code><code>"Usage: %s [binding_key]..."</code> <code>% (sys.argv[</code><code>0</code><code>],)</code>

<code>    </code><code>sys.exit(</code><code>1</code><code>)</code>

<code>for binding_key in binding_keys:</code>

<code>    </code><code>channel.queue_bind(exchange=</code><code>'topic_logs'</code><code>,</code>

<code>        </code><code>queue=queue_name,</code>

<code>        </code><code>routing_key=binding_key)</code>

<code>    </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,)</code>

3.运行和结果

    python receive_logs_topic.py "#"  //接收所有的log

    python receive_logs_topic.py "kern.*"  //接收所有kern facility的log

    python receive_logs_topic.py "*.critical"  //仅仅接收critical的log: 

    python receive_logs_topic.py "kern.*" "*.critical"  //可以创建多个绑定: 

    python emit_log_topic.py "kern.critical" "A critical kernel error"  //Producer产生一个log:"kern.critical" type: 

参考:    

http://www.rabbitmq.com/tutorials/tutorial-three-python.html

本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051247,如需转载请自行联系原作者