RabbitMQ入门(三)订阅模式
在之前的文章RabbitMQ入门(二)工作队列中,我们创建了一个工作队列。工作队列背后的假设是每一项任务都被准确地传送至一个worker。在本文中,我们将会做一些不同的事情——我们将会把一个消息发送至许多消费者中。这种模式被称为
订阅模式(publish/subscribe)
。
为了解释这种模式,我们将会构建一个简单的日志系统。它包含两个程序——第一个将会产生消息,第二个将会接收并输出这些消息。
在我们的日志系统中,每一个正在运行的接收程序都会收到消息。在这种方式下,我们可以运行一个接收程序来接收并将日志保存至硬盘;同时,我们还能运行另一个接收程序,在屏幕上观察到日志的输出。
特别地,发送的这些消息都会被广播到所有的接收程序。
交换(Exchanges)
在之前的文章中,我们向队列发送消息,从队列中接受消息。现在是时候介绍RabbitMQ中的全部消息转发模式。
让我们快速地浏览下之前文章中讲了些什么:
- 一个生产者(Producer)是用于产生消息的用户应用程序;
- 一个队列(Queue)是缓存区,用于储存消息;
- 一个消费者(Consumer)是用于接收消息的用户应用程序。
RabbitMQ中消息传输模式的核心思想是生产者绝不会直接向队列发送任何消息。实际上,通常情况下生产者甚至都不会知道消息是否会被发送至队列。
生产者会将消息发送至
交换(exchange)
。
交换
并不复杂。一方面它从生产者中接受消息,另一方面将消息推送至队列。
交换
必须知道,当它接受一个消息时,它该怎么做。是否这个消息会附加至一个特殊的队列?是否它会附加至许多队列?或者它会被丢弃。这个规则用
交换类型(exchange type)
来定义。
有一些可用的
交换类型
:
直接分发(direct)
,
通配分发(topic)
,
headers
和
复制分发(fanout)
。我们将会集中讲最后一个——fanout。我们创建一个
交换
,类型为fanout,并取名为logs:
channel.exchange_declare(exchange=\'logs\',
exchange_type=\'fanout\')
fanout交换非常简单。顾名思义,它会将所有它知道的接收队列的消息都广播出去。而这也正是我们的日志系统所需要的。
现在,我们可以发布已经命名好的队列了:
channel.basic_publish(exchange=\'logs\',
routing_key=\'\',
body=message)
临时队列
你也许还记得在之前的文章中,我们需要给队列取名。但是呢,给队列命名太麻烦了——我们需要将workers指定到同一个队列。当你需要在生产者和消费者之间共享队列的时候,给队列命名又是很重要的。
这种情形并不适合我们的日志系统。我们想要监听所有的消息,而不是部分消息。同时,我们仅对当前的流动消息感兴趣,而不是之前的消息。为了解决这个问题,我们需要做两件事情。
首先,无论何时我们连接到RabbitMQ,我们需要一个新的空队列。为此,我们创建一个随机命名的队列,或者更好的是,让RabbitMQ Server来给我们创建一个随机命名的队列。因此,我们可以利用
queue_declare
命令,设置
queuq
参数为空:
result = channel.queue_declare(queue=\'\')
此时,
result.method.queue
会包含一个随机命名的队列,比如说,它会和
amq.gen-JzTY20BRgKO-HjmUJj0wLg
类似。
其次,一旦消息者的连接关闭,我们需要删除队列。这可以用
exclusive
参数搞定:
result = channel.queue_declare(queue=\'\', exclusive=True)
绑定(Bindings)
我们已经创建了一个fanout 交换和队列。现在我们需要告诉交换,将消息发送至队列。交换与队列之间的关系叫做
绑定(Bindings)
。
channel.queue_bind(exchange=\'logs\',
queue=result.method.queue)
从现在开始,
logs
交换将会在我们的队列后追加消息。
代码
生产者代码(emit_log.py):
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
channel.exchange_declare(exchange=\'logs\', exchange_type=\'fanout\')
message = \' \'.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=\'logs\', routing_key=\'\', body=message)
print(" [x] Sent %r" % message)
connection.close()
消费者代码(receive_log.py):
# -*- coding: utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
channel.exchange_declare(exchange=\'logs\', exchange_type=\'fanout\')
result = channel.queue_declare(queue=\'\', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange=\'logs\', queue=queue_name)
print(\' [*] Waiting for logs. To exit press CTRL+C\')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
开启四个终端,其中一个用于保存日志:
python3 receive_log.py > logs_from_rabbit.log
另一个用于观察日志输出:
python3 receive_log.py
日志产生:
python3 emit_log.py
监听绑定:
sudo rabbitmqctl list_bindings
运行截图如下:
本次分享到此结束,感谢大家阅读~