RabbitMQ:使用python釋出/訂閱消息
在上一章節中,我們學習了RabbitMQ:基礎,運作和管理,在本節中我們來學習一下如何釋出和訂閱rabbitmq消息,我們使用python來開發應用程式。
我們先來看一下釋出/訂閱rabbitmq消息的流程。先來看消息生産者Publisher如何釋出消息流程:
引用
1、打開RabbitMQ連接配接;
2、建立Channel通道;
3、聲名一個exchange交換機;
4、生成一條消息;
5、釋出消息;
6、關閉Channel通道;
7、關閉RabbitMQ連接配接。
然後我們再來看一下Consumer訂閱流程和消費消息:
引用
1、打開RabbitMQ連接配接;
2、建立Channel通道;
3、聲名一個exchange交換機;
4、聲名一個queue隊列;
5、将queue隊列綁定到exchange交換機;
6、消費消息;
7、關閉Channel通道;
8、關閉RabbitMQ連接配接。
在寫代碼之前,我們需要先安裝一下使用python連結RabbitMQ的依賴庫——pika。我們使用easy_install來安裝(easy_install的具體安裝過程請參看Centos 6 上安裝easy_instal):
引用
# ./easy_install pika
好了,準備工作已經好了,現在我們開始寫代碼:
引用
# mkdir -p /data/rabbitmq-pika
# cd /data/rabbitmq-pika
# mkdir c2
# cd c2
在本節中,我們主要來開發一個帶Publisher confirms的消息釋出和訂閱程式。我們先來開發消息生産者hello_world_producer_with_comfirms.py:
引用
# touch hello_world_producer_with_comfirms.py
# chmod +x hello_world_producer_with_comfirms.py
# vi hello_world_producer_with_comfirms.py
hello_world_producer_with_comfirms.py代碼如下:
#coding=utf-8
import pika,sys
from pika import spec
#在"/"虛拟主機vhost上通過使用者guest建立channel通道
user_name = 'guest'
user_passwd = 'guest'
target_host = 'localhost'
vhost = '/'
cred = pika.PlainCredentials(user_name,user_passwd)
conn_params = pika.ConnectionParameters(target_host,
virtual_host = vhost,
credentials = cred)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()
#定義消息釋出後publisher接受到的确認資訊處理函數
def confirm_handler(frame):
if type(frame.method) == spec.Confirm.SelectOk:
"""生産者建立的channel處于‘publisher comfirms’模式"""
print 'Channel in "confirm" mode!'
elif type(frame.method) == spec.Basic.Nack:
"""生産者接受到消息發送失敗并且消息丢失的消息"""
print 'Message lost!'
elif type(frame.method) == spec.Basic.ack:
if frame.method.delivery_tag in msg_ids:
"""生産者接受到成功釋出的消息"""
print 'Confirm received!'
msg_ids.remove(frame.method.delivery_tag)
#将生産者建立的channel處于"publisher confirms"模式
channel.confirm_delivery(callback = confirm_handler)
#建立一個direct類型的、持久化的、沒有consumer時隊列是否自動删除的exchage交換機
channel.exchange_declare(exchange = 'hello-exch',
type = 'direct',
passive = False,
durable = True,
auto_delete = False)
#使用接收到的資訊建立消息
#使用接收到的資訊建立消息
msg = sys.argv[1]
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'
#持久化消息
msg_props.delivery_mode = 2
msg_ids = []
print 'ready to publish...'
#釋出消息
channel.basic_publish(body = msg,
exchange = 'hello-exch',
properties = msg_props,
routing_key = 'hala')
print 'published!'
msg_ids.append(len(msg_ids) + 1)
print len(msg_ids)
channel.close()
conn_broker.close()
接下來,我們先來開發一個帶消息确認資訊的消費者hello_world_consumer_with_ack.py:
引用
# hello_world_consumer_with_ack.py
# chmod +x hello_world_consumer_with_ack.py
# vi hello_world_consumer_with_ack.py
hello_world_consumer_with_ack.py代碼如下:#coding=utf-8
import pika
#在"/"虛拟主機vhost上通過使用者guest建立channel通道
user_name = 'guest'
user_passwd = 'guest'
target_host = 'localhost'
vhost = '/'
cred = pika.PlainCredentials(user_name,user_passwd)
conn_params = pika.ConnectionParameters(target_host,
virtual_host = vhost,
credentials = cred)
conn_broker = pika.BlockingConnection(conn_params)
conn_channel = conn_broker.channel()
#建立一個direct類型的、持久化的、沒有consumer時,隊列是否自動删除exchage交換機
conn_channel.exchange_declare(exchange = 'hello-exch',
type = 'direct',
passive = False,
durable = True,
auto_delete = False)
#建立一個持久化的、沒有consumer時隊列是否自動删除的名為“hell-queue”
conn_channel.queue_declare(queue = 'hello-queue',
durable = True,
auto_delete = False)
#将“hello-queue”隊列通過routing_key綁定到“hello-exch”交換機
conn_channel.queue_bind(queue = 'hello-queue',
exchange = 'hello-exch',
routing_key = 'hala')
#定義一個消息确認函數,消費者成功處理完消息後會給隊列發送一個确認資訊,然後該消息會被删除
def ack_info_handler(channel,method,header,body):
"""ack_info_handler """
print 'ack_info_handler() called!'
if body == 'quit':
channel.basic_cancel(consumer_tag = 'hello-hala')
channel.stop_sonsuming()
else:
print body
channel.basic_ack(delivery_tag = method.delivery_tag)
conn_channel.basic_consume(ack_info_handler,
queue = 'hello-queue',
no_ack = False,
consumer_tag = 'hello-hala')
print 'ready to consume msg...'
conn_channel.start_consuming()
測試:
引用
# /opt/mq/rabbitmq/sbin/rabbitmq-server start &
# python ./hello_world_consumer_with_ack.py
# python ./hello_world_producer_with_comfirms.py 'hello-world'
對于上面的hello_world_producer_with_comfirms.py,本人調試了很長時間,期間總是在:
引用
#将生産者建立的channel處于"publisher confirms"模式
channel.confirm_delivery(callback = confirm_handler)
報錯,最後在重新下載下傳pika-0.9.13.tar.gz并安裝後才成功執行上述代碼:
引用
# wget https://pypi.python.org/packages/source/p/pika/pika-0.9.13.tar.gz --no-check-certificate
# chmod +x pika-0.9.13.tar.gz
# tar xzvf pika-0.9.13.tar.gz
# cd pika-0.9.13
# python setup.py install