一、分发到多Consumer(fanout)
二、Routing路由(Direct)
三、主题路由(Topic)
将同一个Message deliver到多个Consumer中。这个模式也被称为"publish/subscribe"
创建一个日志系统,包含两部分:第一部分发出log(Producer),第二部分接收到并打印(Consumer)。两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。
1.发送消息流程:
1.Producer发送的Message实际上是发到了Exchange中。
2.Exchanges从Producer接收message投递到queue中
3.Prducer发送的消息只是到达了Exchange中,Exchange具有不同的类型实现不同的分发方式
Exchnges的类型:direct、topic和fanout
fanout就是广播模式,会将所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs',
type='fanout') //创建一个名字为logs,类型为fanout的Exchange:
<code>[root@node</code><code>112</code> <code>~]# rabbitmqctl list_exchanges //查看所有的Exchanges</code>
<code>Listing exchanges ...</code>
<code>logs fanout</code>
<code>amq.direct direct</code>
<code>amq.fanout fanout</code>
<code>amq.headers headers</code>
<code>amq.match headers</code>
<code>amq.rabbitmq.log topic</code>
<code>amq.rabbitmq.trace topic</code>
<code>amq.topic topic</code>
<code>...done.</code>
注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。
通过exchange,而不是routing_key来publish Message:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
2.临时队列
截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
1)每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:
result = channel.queue_declare()
通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:
result = channel.queue_declare(exclusive=True) //每次获取的都是新的,单独使用的
3.Bindings绑定
创建好fanout类型的Exchange和没有名字的queue后(实际上是RabbitMQ帮我们取的名字)Exchange通过bindings把它的Message发送到目标queue
channel.queue_bind(exchange='logs',
queue=result.method.queue)
使用命令rabbitmqctl list_bindings 查看bindings
4.最终代码
拓扑图:
Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。
emit_log.py script:
===========================================================================
<code>#!/usr/bin/env python</code>
<code>import pika</code>
<code>import sys</code>
<code>connection = pika.BlockingConnection(pika.ConnectionParameters(</code>
<code> </code><code>host=</code><code>'localhost'</code><code>))</code>
<code>channel = connection.channel()</code>
<code>channel.exchange_declare(exchange=</code><code>'logs'</code><code>,</code>
<code> </code><code>type=</code><code>'fanout'</code><code>)</code>
<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>1:</code><code>]) or </code><code>"info: Hello World!"</code>
<code>channel.basic_publish(exchange=</code><code>'logs'</code><code>,</code>
<code> </code><code>routing_key=</code><code>''</code><code>,</code>
<code> </code><code>body=message)</code>
<code>print</code> <code>" [x] Sent %r"</code> <code>% (message,)</code>
<code>connection.close()</code>
还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:
<code>result = channel.queue_declare(exclusive=True)</code>
<code>queue_name = result.method.queue</code>
<code>channel.queue_bind(exchange=</code><code>'logs'</code><code>,</code>
<code> </code><code>queue=queue_name)</code>
<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>
<code>def callback(ch, method, properties, body):</code>
<code> </code><code>print</code> <code>" [x] %r"</code> <code>% (body,)</code>
<code>channel.basic_consume(callback,</code>
<code> </code><code>queue=queue_name,</code>
<code> </code><code>no_ack=True)</code>
<code>channel.start_consuming()</code>
试运行:
Consumer1:$ python receive_logs.py > logs_from_rabbit.log //追加到文件
Consumer2:python receive_logs.py //输出到屏幕
Producer:python emit_log.py
也可通过修改callback自己写文件
输出结果如图:
对于上一个日志系统改进。能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
1.Bindings绑定
之前的绑定
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。
实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。
使用一个key来创建binding :
queue=queue_name,
routing_key='black')
对于fanout的exchange来说,这个参数是被忽略的。
2.Direct Exchange
通过Bindings key完全匹配
图Direct路由模型
exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。
3.多重绑定(Multiple Bindings)
多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。
图muliti-bindings
4.生产者和消费者
生产者:
<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>, </code>
<code> </code><code>type=</code><code>'direct'</code><code>) </code>
<code>//创建一个direct的exchange。使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。</code>
<code>publish:</code>
<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>, </code>
<code> </code><code>routing_key=severity, </code>
<code> </code><code>body=message) </code>
<code>//涉及三种severity:</code><code>'info'</code><code>, </code><code>'warning'</code><code>, </code><code>'error'</code><code>.</code>
消费者:
<code>result = channel.queue_declare(exclusive=True) </code>
<code>queue_name = result.method.queue </code>
<code>for severity in severities: </code>
<code> </code><code>channel.queue_bind(exchange=</code><code>'direct_logs'</code><code>, </code>
<code> </code><code>queue=queue_name, </code>
<code> </code><code>routing_key=severity) </code>
<code>//queue需要绑定severity</code>
5.最终版本
图:direct_2
emit_log_direct.py
<code>channel.exchange_declare(exchange=</code><code>'direct_logs'</code><code>,</code>
<code> </code><code>type=</code><code>'direct'</code><code>)</code>
<code>severity = sys.argv[</code><code>1</code><code>] if len(sys.argv) > </code><code>1</code> <code>else </code><code>'info'</code>
<code>message = </code><code>' '</code><code>.join(sys.argv[</code><code>2:</code><code>]) or </code><code>'Hello World!'</code>
<code>channel.basic_publish(exchange=</code><code>'direct_logs'</code><code>,</code>
<code> </code><code>routing_key=severity,</code>
<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (severity, message)</code>
receive_logs_direct.py:
<code>#!/usr/bin/env python </code>
<code>import pika </code>
<code>import sys </code>
<code>connection = pika.BlockingConnection(pika.ConnectionParameters( </code>
<code> </code><code>host=</code><code>'localhost'</code><code>)) </code>
<code>channel = connection.channel() </code>
<code>severities = sys.argv[</code><code>1:</code><code>] </code>
<code>if not severities: </code>
<code> </code><code>print</code> <code>>> sys.stderr, </code><code>"Usage: %s [info] [warning] [error]"</code> <code>% \ </code>
<code> </code><code>(sys.argv[</code><code>0</code><code>],) </code>
<code> </code><code>sys.exit(</code><code>1</code><code>) </code>
<code>for severity in severities: </code>
<code> </code><code>routing_key=severity) </code>
<code>print</code> <code>' [*] Waiting for logs. To exit press CTRL+C'</code>
<code>def callback(ch, method, properties, body): </code>
<code> </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,) </code>
<code>channel.basic_consume(callback, </code>
<code> </code><code>queue=queue_name, </code>
<code> </code><code>no_ack=True) </code>
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
//把warning和error的log记录到一个文件中
$ python receive_logs_direct.py info warning error
//打印所有log到屏幕
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以点号“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。
对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
* (星号) 代表任意 一个单词
# (hash) 0个或者多个单词
示例:
Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。
第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"<celerity>.<colour>.<species>"。
在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是 "*.*.rabbit" 和 "lazy.#":
Q1 感兴趣所有orange颜色的动物
Q2 感兴趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。
如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。
Topic exchange和其他exchange
由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:
如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。
如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。
2.代码实现
The code for emit_log_topic.py:
========================================================================
<code>channel.exchange_declare(exchange=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>type=</code><code>'topic'</code><code>)</code>
<code>routing_key = sys.argv[</code><code>1</code><code>] if len(sys.argv) > </code><code>1</code> <code>else </code><code>'anonymous.info'</code>
<code>channel.basic_publish(exchange=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>routing_key=routing_key,</code>
<code>print</code> <code>" [x] Sent %r:%r"</code> <code>% (routing_key, message)</code>
The code for receive_logs_topic.py:
========================================================================
<code> </code>
<code>binding_keys = sys.argv[</code><code>1:</code><code>]</code>
<code>if not binding_keys:</code>
<code> </code><code>print</code> <code>>> sys.stderr, </code><code>"Usage: %s [binding_key]..."</code> <code>% (sys.argv[</code><code>0</code><code>],)</code>
<code> </code><code>sys.exit(</code><code>1</code><code>)</code>
<code>for binding_key in binding_keys:</code>
<code> </code><code>channel.queue_bind(exchange=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>queue=queue_name,</code>
<code> </code><code>routing_key=binding_key)</code>
<code> </code><code>print</code> <code>" [x] %r:%r"</code> <code>% (method.routing_key, body,)</code>
3.运行和结果
python receive_logs_topic.py "#" //接收所有的log
python receive_logs_topic.py "kern.*" //接收所有kern facility的log
python receive_logs_topic.py "*.critical" //仅仅接收critical的log:
python receive_logs_topic.py "kern.*" "*.critical" //可以创建多个绑定:
python emit_log_topic.py "kern.critical" "A critical kernel error" //Producer产生一个log:"kern.critical" type:
参考:
http://www.rabbitmq.com/tutorials/tutorial-three-python.html
本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051247,如需转载请自行联系原作者