天天看點

python rabitmq_python操作RabbitMQ

RabbitMQ

RabbitMQ是一個在AMQP(Advanced Message Queuing Protocol)基礎之上的完整、可複用的企業級消息系統。他遵循Mozilla Public License開源協定。

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入隊列的消息(針對應用程式的資料)來通信,而無需專用連接配接來連結它們。消 息傳遞指的是程式之間通過在消息中發送資料進行通信,而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠端過程調用的技術。排隊指的是應用程式通過 隊列來通信。隊列的使用除去了接收和發送應用程式同時執行的要求。

RabbitMQ安裝

安裝配置epel源

$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

安裝erlang

$ yum -y install erlang

安裝RabbitMQ

$ yum -y install rabbitmq-server

注意:service rabbitmq-server start/stop

安裝API

pip install pika

or

easy_install pika

or

源碼

https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue實作生産者消費者模型

import Queue

import threading

message = Queue.Queue(10)

def producer(i):

while True:

message.put(i)

def consumer(i):

while True:

msg = message.get()

for i in range(12):

t = threading.Thread(target=producer, args=(i,))

t.start()

for i in range(10):

t = threading.Thread(target=consumer, args=(i,))

t.start()

對于RabbitMQ來說,生産和消費不再針對記憶體裡的一個Queue對象,而是某台伺服器上的RabbitMQ Server實作的消息隊列。

import pika

# ######################### 生産者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(

host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!')

print("[x] Sent 'Hello World!'")

connection.close()

import pika

# ########################## 消費者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(

host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

channel.basic_consume(callback,

queue='hello',

no_ack=True)

print('[*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

1、acknowledgment 消息不丢失

no-ack = False,如果消費者遇到情況( its channel is closed, connection is closed, or TCP connection is lost )挂掉了,那麼,RabbitMQ會重新将該任務添加到隊列中。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

import time

time.sleep(10)

print 'ok'

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,

queue='hello',

no_ack=False)

print('[*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

2、durable 消息不丢失

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

channel = connection.channel()

# make message persistent

channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!',

properties=pika.BasicProperties(

delivery_mode=2, # make message persistent

))

print("[x] Sent 'Hello World!'")

connection.close()

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

channel = connection.channel()

# make message persistent

channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

import time

time.sleep(10)

print 'ok'

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,

queue='hello',

no_ack=False)

print('[*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

3、消息擷取順序

預設消息隊列裡的資料是按照順序被消費者拿走,例如:消費者1 去隊列中擷取 奇數 序列的任務,消費者1去隊列中擷取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

channel = connection.channel()

# make message persistent

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):

print("[x] Received %r" % body)

import time

time.sleep(10)

print 'ok'

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,

queue='hello',

no_ack=False)

print('[*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

4、釋出訂閱

python rabitmq_python操作RabbitMQ

釋出訂閱和簡單的消息隊列差別在于,釋出訂閱會将消息發送給所有的訂閱者,而消息隊列中的資料被消費一次便消失。是以,RabbitMQ實作釋出和訂閱時,會為每一個訂閱者建立一個隊列,而釋出者釋出消息時,會将消息放置在所有相關隊列中。

exchange type = fanout

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='logs',

type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange='logs',

routing_key='',

body=message)

print("[x] Sent %r" % message)

connection.close()

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='logs',

type='fanout')

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

channel.queue_bind(exchange='logs',

queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):

print("[x] %r" % body)

channel.basic_consume(callback,

queue=queue_name,

no_ack=True)

channel.start_consuming()

5、關鍵字發送

python rabitmq_python操作RabbitMQ

exchange type = direct

之前事例,發送消息時明确指定某個隊列并向其中發送消息,RabbitMQ還支援根據關鍵字發送,即:隊列綁定關鍵字,發送者将資料根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該将資料發送至指定隊列。

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',

type='direct')

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

severities = sys.argv[1:]

if not severities:

sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])

sys.exit(1)

for severity in severities:

channel.queue_bind(exchange='direct_logs',

queue=queue_name,

routing_key=severity)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):

print("[x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,

queue=queue_name,

no_ack=True)

channel.start_consuming()

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',

type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='direct_logs',

routing_key=severity,

body=message)

print("[x] Sent %r:%r" % (severity, message))

connection.close()

6、模糊比對

python rabitmq_python操作RabbitMQ

exchange type = topic

在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之後發送者将資料發送到exchange,exchange将傳入”路由值“和 ”關鍵字“進行比對,比對成功,則将資料發送到指定隊列。

# 表示可以比對 0 個 或 多個 單詞

* 表示隻能比對 一個 單詞

發送者路由值 隊列中

old.boy.python old.* -- 不比對

old.boy.python old.# -- 比對

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',

type='topic')

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:

sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])

sys.exit(1)

for binding_key in binding_keys:

channel.queue_bind(exchange='topic_logs',

queue=queue_name,

routing_key=binding_key)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):

print("[x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,

queue=queue_name,

no_ack=True)

channel.start_consuming()

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',

type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='topic_logs',

routing_key=routing_key,

body=message)

print("[x] Sent %r:%r" % (routing_key, message))

connection.close()

注意:

sudo rabbitmqctl add_user alex 123

# 設定使用者為administrator角色

sudo rabbitmqctl set_user_tags alex administrator

# 設定權限

sudo rabbitmqctl set_permissions -p "/" alex '.''.''.'

# 然後重新開機rabbiMQ服務

sudo /etc/init.d/rabbitmq-server restart

# 然後可以使用剛才的使用者遠端連接配接rabbitmq server了。

------------------------------

credentials = pika.PlainCredentials("alex","123")

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))