天天看點

python rabitmq_RabbitMQ:施用python釋出/訂閱消息

RabbitMQ:使用python釋出/訂閱消息

在上一章節中,我們學習了RabbitMQ:基礎,運作和管理,在本節中我們來學習一下如何釋出和訂閱rabbitmq消息,我們使用python來開發應用程式。

我們先來看一下釋出/訂閱rabbitmq消息的流程。先來看消息生産者Publisher如何釋出消息流程:

引用

1、打開RabbitMQ連接配接;

2、建立Channel通道;

3、聲名一個exchange交換機;

4、生成一條消息;

5、釋出消息;

6、關閉Channel通道;

7、關閉RabbitMQ連接配接。

然後我們再來看一下Consumer訂閱流程和消費消息:

引用

1、打開RabbitMQ連接配接;

2、建立Channel通道;

3、聲名一個exchange交換機;

4、聲名一個queue隊列;

5、将queue隊列綁定到exchange交換機;

6、消費消息;

7、關閉Channel通道;

8、關閉RabbitMQ連接配接。

在寫代碼之前,我們需要先安裝一下使用python連結RabbitMQ的依賴庫——pika。我們使用easy_install來安裝(easy_install的具體安裝過程請參看Centos 6 上安裝easy_instal):

引用

# ./easy_install pika

好了,準備工作已經好了,現在我們開始寫代碼:

引用

# mkdir -p /data/rabbitmq-pika

# cd /data/rabbitmq-pika

# mkdir c2

# cd c2

在本節中,我們主要來開發一個帶Publisher confirms的消息釋出和訂閱程式。我們先來開發消息生産者hello_world_producer_with_comfirms.py:

引用

# touch hello_world_producer_with_comfirms.py

# chmod +x hello_world_producer_with_comfirms.py

# vi hello_world_producer_with_comfirms.py

hello_world_producer_with_comfirms.py代碼如下:

#coding=utf-8

import pika,sys

from pika import spec

#在"/"虛拟主機vhost上通過使用者guest建立channel通道

user_name = 'guest'

user_passwd = 'guest'

target_host = 'localhost'

vhost = '/'

cred = pika.PlainCredentials(user_name,user_passwd)

conn_params = pika.ConnectionParameters(target_host,

virtual_host = vhost,

credentials = cred)

conn_broker = pika.BlockingConnection(conn_params)

channel = conn_broker.channel()

#定義消息釋出後publisher接受到的确認資訊處理函數

def confirm_handler(frame):

if type(frame.method) == spec.Confirm.SelectOk:

"""生産者建立的channel處于‘publisher comfirms’模式"""

print 'Channel in "confirm" mode!'

elif type(frame.method) == spec.Basic.Nack:

"""生産者接受到消息發送失敗并且消息丢失的消息"""

print 'Message lost!'

elif type(frame.method) == spec.Basic.ack:

if frame.method.delivery_tag in msg_ids:

"""生産者接受到成功釋出的消息"""

print 'Confirm received!'

msg_ids.remove(frame.method.delivery_tag)

#将生産者建立的channel處于"publisher confirms"模式

channel.confirm_delivery(callback = confirm_handler)

#建立一個direct類型的、持久化的、沒有consumer時隊列是否自動删除的exchage交換機

channel.exchange_declare(exchange = 'hello-exch',

type = 'direct',

passive = False,

durable = True,

auto_delete = False)

#使用接收到的資訊建立消息

#使用接收到的資訊建立消息

msg = sys.argv[1]

msg_props = pika.BasicProperties()

msg_props.content_type = 'text/plain'

#持久化消息

msg_props.delivery_mode = 2

msg_ids = []

print 'ready to publish...'

#釋出消息

channel.basic_publish(body = msg,

exchange = 'hello-exch',

properties = msg_props,

routing_key = 'hala')

print 'published!'

msg_ids.append(len(msg_ids) + 1)

print len(msg_ids)

channel.close()

conn_broker.close()

接下來,我們先來開發一個帶消息确認資訊的消費者hello_world_consumer_with_ack.py:

引用

# hello_world_consumer_with_ack.py

# chmod +x hello_world_consumer_with_ack.py

# vi hello_world_consumer_with_ack.py

hello_world_consumer_with_ack.py代碼如下:#coding=utf-8

import pika

#在"/"虛拟主機vhost上通過使用者guest建立channel通道

user_name = 'guest'

user_passwd = 'guest'

target_host = 'localhost'

vhost = '/'

cred = pika.PlainCredentials(user_name,user_passwd)

conn_params = pika.ConnectionParameters(target_host,

virtual_host = vhost,

credentials = cred)

conn_broker = pika.BlockingConnection(conn_params)

conn_channel = conn_broker.channel()

#建立一個direct類型的、持久化的、沒有consumer時,隊列是否自動删除exchage交換機

conn_channel.exchange_declare(exchange = 'hello-exch',

type = 'direct',

passive = False,

durable = True,

auto_delete = False)

#建立一個持久化的、沒有consumer時隊列是否自動删除的名為“hell-queue”

conn_channel.queue_declare(queue = 'hello-queue',

durable = True,

auto_delete = False)

#将“hello-queue”隊列通過routing_key綁定到“hello-exch”交換機

conn_channel.queue_bind(queue = 'hello-queue',

exchange = 'hello-exch',

routing_key = 'hala')

#定義一個消息确認函數,消費者成功處理完消息後會給隊列發送一個确認資訊,然後該消息會被删除

def ack_info_handler(channel,method,header,body):

"""ack_info_handler """

print 'ack_info_handler() called!'

if body == 'quit':

channel.basic_cancel(consumer_tag = 'hello-hala')

channel.stop_sonsuming()

else:

print body

channel.basic_ack(delivery_tag = method.delivery_tag)

conn_channel.basic_consume(ack_info_handler,

queue = 'hello-queue',

no_ack = False,

consumer_tag = 'hello-hala')

print 'ready to consume msg...'

conn_channel.start_consuming()

測試:

引用

# /opt/mq/rabbitmq/sbin/rabbitmq-server start &

# python ./hello_world_consumer_with_ack.py

# python ./hello_world_producer_with_comfirms.py 'hello-world'

對于上面的hello_world_producer_with_comfirms.py,本人調試了很長時間,期間總是在:

引用

#将生産者建立的channel處于"publisher confirms"模式

channel.confirm_delivery(callback = confirm_handler)

報錯,最後在重新下載下傳pika-0.9.13.tar.gz并安裝後才成功執行上述代碼:

引用

# wget https://pypi.python.org/packages/source/p/pika/pika-0.9.13.tar.gz --no-check-certificate

# chmod +x pika-0.9.13.tar.gz

# tar xzvf pika-0.9.13.tar.gz

# cd pika-0.9.13

# python setup.py install