不同程序之间通讯
1.socket
2.disk硬盘文件
3.broker中间代理
python中:
threading Queue 线程之间通讯,不能跨进程
multiprocessing Queue 父进程与子进程进行交互,或同一个父进程下的多个子进程
RabbitMQ 消息队列
MQ全称为Message Queue,一种应用程序对应用程序的通信方法
RabbitMQ 官方地址:
http://www.rabbitmq.com/安装:
erlang
http://www.erlang.org/ rabbitmq参考:
《Windows下RabbitMQ安装及入门》
http://blog.csdn.net/hzw19920329/article/details/53156015启动服务(管理员模式):rabbitmq-service start 或者:services.msc
访问管理后台:
http://localhost:15672 用户名:guest,密码:guest
查看队列:rabbitmqctl list_queues
查看状态:rabbitmqctl status
rabbitMQ轮询分发
将消息依次分发给每个消费者
生产者 –> 队列 –> 消费者1, 消费者2, 消费者3
rabbitMQ消息持久化
服务器关闭,队列消失,可以设置持久化队列名,持久化消息
消费者控制接收数量
消费者可以按需获取信息,实现能者多劳
发布者订阅者
是即时发送接收
发布者 –> 交换机 –> 队列1, 队列2, 队列3 –> 订阅者1, 订阅者2, 订阅者3
广播模式
fanout广播模式 无选择接收
direct广播模式 有选择接收 队列绑定关键字
topic广播模式 消息过滤
遇到的问题
问题:无法访问Web管理页面
解决:
启动管理模块:
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop
《RabbitMQ无法访问Web管理页面》
http://blog.csdn.net/u011642663/article/details/54691788问题:Error: unable to perform an operation
C:\Windows\System32\config\systemprofile.erlang.cookie
拷贝.erlang.cookie文件覆盖
C:\User\username.erlang.cookie
《Authentication failed》
http://blog.csdn.net/j_shine/article/details/78833456代码实例
安装第三方库:
pip install pika
生产者消费者
# 发送端
import pika
queue_name = "hello2" # 队列名称
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
# 声明queue
channel.queue_declare(queue=queue_name) # durable=True消息名称持久化
# 发送消息,需要通过路由器
channel.basic_publish(exchange="",
routing_key=queue_name,
body="hello, world!",
# 消息持久化 make message persistent
#properties=pika.BasicProperties(delivery_mode=2)
)
print("send hello")
connection.close()
# 持久化之后,服务器重启不消失
# 接收端
import pika
import time
queue_name = "hello2" # 队列名称
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
# 不知道客户端还是服务端先启动,为了确保这个队列存在,两端都需要声明
channel.queue_declare(queue=queue_name)
channel.basic_qos(prefetch_count=1) # 最多处理一个信息,处理完再接收
def callback(ch, method, properties, body): # 回调函数
print("ch:", ch)
print("method:", method)
print("properties:", properties)
print("接收到信息:", body)
time.sleep(30)
# ch.basic_ask(delivery_tag=method.delivery_tag) # 消息确认
channel.basic_consume(callback, # 如果收到消息就调用函数处理消息
queue=queue_name,
no_ack=True) #acknowledgement确认
print("waiting for message, ctrl+c break")
# 启动接收
channel.start_consuming()
1对多的发送广播
广播模式:fanout
# 发布者
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.exchange_declare(exchange="logs", exchange_type="fanout")
message = "hello world"
channel.basic_publish(exchange="logs",
routing_key="",
body=message)
print("send ok")
connection.close()
# 订阅者
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.exchange_declare(exchange="logs", exchange_type="fanout")
result = channel.queue_declare(exclusive=True) # 随机分配队列名
queue_name = result.method.queue
channel.queue_bind(exchange="logs", queue=queue_name)
print("waiting for logs")
def callback(ch, method, properties, body):
print("body:", body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
消息过滤
有选择的接收消息,广播模式:direct
# 实现1对多的发送
# 发布者
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
severity = sys.argv[1] if len(sys.argv)>1 else "info" # severity严重程度
message = " ".join(sys.argv[:2] or "hello world!")
channel.basic_publish(exchange="direct_logs",
routing_key=severity,
body=message)
print("send ok")
connection.close()
# 订阅者
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
result = channel.queue_declare(exclusive=True) # 随机分配队列名
queue_name = result.method.queue
# 获取参数
severities = sys.argv[1:]
if not severities:
sys.stderr.write("usage: %s [info] [warning] [error]" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange="direct_logs",
routing_key=severity,
queue=queue_name)
print("waiting...")
def callback(ch, method, properties, body):
print("routing_key:", method.routing_key)
print("body:", body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
通配符消息过滤
表达式符号说明:#代表一个或多个字符,*代表任何字符
广播模式:topic
# 实现1对多的发送
# 发布者
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs", exchange_type="topic")
severity = sys.argv[1] if len(sys.argv)>1 else "info" # severity严重程度
message = " ".join(sys.argv[:2] or "hello world!")
channel.basic_publish(exchange="topic_logs",
routing_key=severity,
body=message)
print("send ok")
connection.close()
# 订阅者
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs", exchange_type="topic")
result = channel.queue_declare(exclusive=True) # 随机分配队列名
queue_name = result.method.queue
# 获取参数
severities = sys.argv[1:]
if not severities:
sys.stderr.write("usage: %s [info] [warning] [error]" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange="topic_logs",
routing_key=severity,
queue=queue_name)
print("waiting...")
def callback(ch, method, properties, body):
print("routing_key:", method.routing_key)
print("body:", body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
Remote procedure call (RPC)
远程程序调用
# 服务器端
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n): # 被调用函数
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id= props.correlation_id
),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
# 客户端
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
self.channel = self.connection.channel()
# 生成随机队列
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response,
no_ack=True,
queue=self.callback_queue
)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4()) # 用于区分发送指令和接收结果的一致性
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
参考文章:
《Python之路,Day9 - 异步IO\数据库\队列\缓存》
http://www.cnblogs.com/alex3714/articles/5248247.html