天天看点

3Python全栈之路系列之RabbitMQPython全栈之路系列之RabbitMQ

安装EPEL源

<code>[root@anshengme ~]</code><code># yum -y install epel-release</code>

安装erlang

<code>[root@anshengme ~]</code><code># yum -y install erlang</code>

安装RabbitMQ

<code>[root@anshengme ~]</code><code># yum -y install rabbitmq-server</code>

启动并设置开机器启动

在启动<code>RabbitMQ</code>之前需要hostname的解析,要不然启动不起来

<code>[root@anshengme ~]</code><code># cat /etc/hosts</code>

<code>127.0</code><code>.</code><code>0.1</code>   <code>localhost localhost.localdomain localhost4 localhost4.localdomain4 anshengme</code>

<code>::</code><code>1</code>         <code>localhost localhost.localdomain localhost6 localhost6.localdomain6</code>

<code>[root@anshengme ~]</code><code># systemctl start rabbitmq-server</code>

<code>[root@anshengme ~]</code><code># systemctl enable rabbitmq-server</code>

<code>Created symlink </code><code>from</code> <code>/</code><code>etc</code><code>/</code><code>systemd</code><code>/</code><code>system</code><code>/</code><code>multi</code><code>-</code><code>user.target.wants</code><code>/</code><code>rabbitmq</code><code>-</code><code>server.service to </code><code>/</code><code>usr</code><code>/</code><code>lib</code><code>/</code><code>systemd</code><code>/</code><code>system</code><code>/</code><code>rabbitmq</code><code>-</code><code>server.service.</code>

查看启动状态

<code>[root@anshengme ~]</code><code># netstat -tulnp |grep 5672</code>

<code>tcp        </code><code>0</code>      <code>0</code> <code>0.0</code><code>.</code><code>0.0</code><code>:</code><code>25672</code>           <code>0.0</code><code>.</code><code>0.0</code><code>:</code><code>*</code>               <code>LISTEN      </code><code>37507</code><code>/</code><code>beam.smp      </code>

<code>tcp6       </code><code>0</code>      <code>0</code> <code>:::</code><code>5672</code>                 <code>:::</code><code>*</code>                    <code>LISTEN      </code><code>37507</code><code>/</code><code>beam.smp</code>

<code>pika</code>模块是官方认可的操作<code>RabbitMQ</code>的API接口。

安装pika

测试

<code>&gt;&gt;&gt; </code><code>import</code> <code>pika</code>

如果你启动了多个消费者,那么生产者生产的任务会根据顺序的依次让消费者来执行,这就是<code>Work Queues</code>模式

<a href="https://s1.51cto.com/wyfs02/M02/95/6D/wKiom1kVKWmS6h7vAABZtJSB-Ow435.png" target="_blank"></a>

生产者代码

<code>#!/usr/bin/env python</code>

<code># _*_ codin:utf-8 _*_</code>

<code>import</code> <code>pika</code>

<code># 连接到RabbitMQ 这是一个阻塞的连接</code>

<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code><code>'192.168.56.100'</code><code>))</code>

<code># 生成一个管道</code>

<code>channel </code><code>=</code> <code>connection.channel()</code>

<code># 通过管道创建一个队列</code>

<code>channel.queue_declare(queue</code><code>=</code><code>'hello'</code><code>)</code>

<code># 在队列内发送数据,body内容,routing_key队列,exchange交换器,通过交换器往hello队列内发送Hello World!数据</code>

<code>channel.basic_publish(exchange</code><code>=</code><code>'</code><code>', routing_key='</code><code>hello</code><code>', body='</code><code>Hello World!')</code>

<code># 关闭连接</code>

<code>connection.close()</code>

消费者代码

<code>#!/usr/bin/env python</code>

<code># 如果消费者连接到这个队列的时候,队列没有生成,那么消费者就生成这个队列,如果这个队列已经生成了,那么就忽略它</code>

<code># 回调函数</code>

<code>def</code> <code>callback(ch, method, properties, body):</code>

<code>    </code><code>print</code><code>(</code><code>" [x] Received %r"</code> <code>%</code> <code>body)</code>

<code>    </code> 

<code># 消费,当收到hello队列的消息的时候就,就调用callback函数,no_ack消费者在处理任务的时候要不需要确认任务已经处理完成,改为False则要确认</code>

<code>channel.basic_consume(callback, queue</code><code>=</code><code>'hello'</code><code>, no_ack</code><code>=</code><code>True</code><code>)</code>

<code># 开始接受任务,阻塞</code>

<code>channel.start_consuming()</code>

队列持久化

试想,如果我们的消费者在执行任务执行到一半时,突然down掉了,我们可以更改<code>no_ack=False</code>来让消费者每次执行完成完成之后确认执行完毕了再把这个任务在队列中移除移除掉,但是如果RabbitMQ的服务器停止我们的任务仍然会丢失。

首先,我们需要确保的<code>RabbitMQ</code>永远不会在我们的队列中失去,为了做到这一点,我们需要把<code>durable=True</code>,声明一个新名称的队列,为<code>task_queue</code>:

<code>channel.queue_declare(queue</code><code>=</code><code>'task_queue'</code><code>, durable</code><code>=</code><code>True</code><code>)</code>

<code>durable</code>需要在生产者和消费者上面都需要写上,且<code>durable</code>只会让我们的队列持久化,并不能够让消息持久化。

消息持久化

消息持久化只需要在添加消息的时候添加一个<code>delivery_mode=2</code>

<code>channel.basic_publish(exchange</code><code>=</code><code>'',</code>

<code>                      </code><code>routing_key</code><code>=</code><code>'world'</code><code>,</code>

<code>                      </code><code>body</code><code>=</code><code>'Hello World!'</code><code>,</code>

<code>                      </code><code>properties</code><code>=</code><code>pika.BasicProperties(</code>

<code>                          </code><code># 2=消息持久化</code>

<code>                          </code><code>delivery_mode</code><code>=</code><code>2</code><code>,</code>

<code>                      </code><code>))</code>

在消费者的callback函数内添加以下代码:

<code>ch.basic_ack(delivery_tag </code><code>=</code> <code>method.delivery_tag)</code>

每一个消费者同时只处理一个任务,比如说现在有三个消费者,刚开始来了三个任务,平均分配给了三个消费者,那么这三个消费者目前都在同时执行任务,当第四个任务到来的时候依旧会分配给第一个消费者,第五个任务到来的时候会分配给第二个消费者,以此类推。

那么以上的状况有什么不妥呢?譬如说不同的消费者执行任务的时间不同,我们现在需要的时候,当三个消费者都在执行任务的时候,比如说第二个消费者任务执行完了,其他消费者都还在执行任务,当第四个任务到来的时候希望交给第二个消费者,若要实现此功能,只需要在消费者加上一下代码即可:

<code>channel.basic_qos(prefetch_count</code><code>=</code><code>1</code><code>)</code>

完整的代码如下

<code>import</code> <code>time</code>

<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>    </code><code>host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>

<code>print</code><code>(</code><code>' [*] Waiting for messages. To exit press CTRL+C'</code><code>)</code>

<code>    </code><code>time.sleep(</code><code>10</code><code>)</code>

<code>    </code><code>print</code><code>(</code><code>" [x] Done"</code><code>)</code>

<code>    </code><code>ch.basic_ack(delivery_tag</code><code>=</code><code>method.delivery_tag)</code>

<code>channel.basic_consume(callback,</code>

<code>                      </code><code>queue</code><code>=</code><code>'task_queue'</code><code>)</code>

<code>                      </code> 

<code>import</code> <code>sys</code>

<code>for</code> <code>n </code><code>in</code> <code>range</code><code>(</code><code>10</code><code>):</code>

<code>    </code><code>message </code><code>=</code> <code>"Hello World! %s"</code> <code>%</code> <code>(n </code><code>+</code> <code>1</code><code>)</code>

<code>    </code><code>channel.basic_publish(exchange</code><code>=</code><code>'',</code>

<code>                          </code><code>routing_key</code><code>=</code><code>'task_queue'</code><code>,</code>

<code>                          </code><code>body</code><code>=</code><code>message,</code>

<code>                          </code><code>properties</code><code>=</code><code>pika.BasicProperties(</code>

<code>                              </code><code>delivery_mode</code><code>=</code><code>2</code><code>,  </code><code># make message persistent</code>

<code>                          </code><code>))</code>

<code>    </code><code>print</code><code>(</code><code>" [x] Sent %r"</code> <code>%</code> <code>message)</code>

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

属性

描述

<code>fanout</code>

所有bind到此exchange的queue都可以接收消息

<code>direct</code>

通过routingKey和exchange决定的那个唯一的queue可以接收消息

<code>topic</code>

所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

fanout(发布订阅)

只要有消费者,那么我生产者发布一条消息的时候所有的消费者都会被收到

<a href="https://s1.51cto.com/wyfs02/M01/95/6E/wKioL1kVKmbR6t5hAAA2MyN-t_g055.png" target="_blank"></a>

<code># 消费者</code>

<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>

<code>channel.exchange_declare(exchange</code><code>=</code><code>'logs'</code><code>, </code><code>type</code><code>=</code><code>'fanout'</code><code>)</code>

<code># 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除</code>

<code>result </code><code>=</code> <code>channel.queue_declare(exclusive</code><code>=</code><code>True</code><code>)</code>

<code># 获取queue的name</code>

<code>queue_name </code><code>=</code> <code>result.method.queue</code>

<code># 把queue绑定到exchange</code>

<code>channel.queue_bind(exchange</code><code>=</code><code>'logs'</code><code>, queue</code><code>=</code><code>queue_name)</code>

<code>    </code><code>print</code><code>(</code><code>" [x] %r"</code> <code>%</code> <code>body)</code>

<code>channel.basic_consume(callback,queue</code><code>=</code><code>queue_name,no_ack</code><code>=</code><code>True</code><code>)</code>

<code># 生产者</code>

<code># fanout发送给所有人</code>

<code>channel.basic_publish(exchange</code><code>=</code><code>'logs'</code><code>, routing_key</code><code>=</code><code>'', body</code><code>=</code><code>"Hello World!"</code><code>)</code>

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

<a href="https://s4.51cto.com/wyfs02/M00/95/6E/wKiom1kVKprTftxpAABNvwKE9II350.png" target="_blank"></a>

<code>channel.exchange_declare(exchange</code><code>=</code><code>'direct_logs'</code><code>,</code>

<code>                         </code><code>type</code><code>=</code><code>'direct'</code><code>)</code>

<code>                         </code> 

<code>severity </code><code>=</code> <code>sys.argv[</code><code>1</code><code>] </code><code>if</code> <code>len</code><code>(sys.argv) &gt; </code><code>1</code> <code>else</code> <code>'info'</code>

<code>message </code><code>=</code> <code>' '</code><code>.join(sys.argv[</code><code>2</code><code>:]) </code><code>or</code> <code>'Hello World!'</code>

<code>channel.basic_publish(exchange</code><code>=</code><code>'direct_logs'</code><code>,</code>

<code>                      </code><code>routing_key</code><code>=</code><code>severity,</code>

<code>                      </code><code>body</code><code>=</code><code>message)</code>

<code>print</code><code>(</code><code>" [x] Sent %r:%r"</code> <code>%</code> <code>(severity, message))</code>

<code>        </code><code>host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>

<code>severities </code><code>=</code> <code>sys.argv[</code><code>1</code><code>:]</code>

<code>if</code> <code>not</code> <code>severities:</code>

<code>    </code><code>sys.stderr.write(</code><code>"Usage: %s [info] [warning] [error]\n"</code> <code>%</code> <code>sys.argv[</code><code>0</code><code>])</code>

<code>    </code><code>sys.exit(</code><code>1</code><code>)</code>

<code>for</code> <code>severity </code><code>in</code> <code>severities:</code>

<code>    </code><code>channel.queue_bind(exchange</code><code>=</code><code>'direct_logs'</code><code>,</code>

<code>                       </code><code>queue</code><code>=</code><code>queue_name,</code>

<code>                       </code><code>routing_key</code><code>=</code><code>severity)</code>

<code>                       </code> 

<code>print</code><code>(</code><code>' [*] Waiting for logs. To exit press CTRL+C'</code><code>)</code>

<code>    </code><code>print</code><code>(</code><code>" [x] %r:%r"</code> <code>%</code> <code>(method.routing_key, body))</code>

<code>                      </code><code>queue</code><code>=</code><code>queue_name,</code>

<code>                      </code><code>no_ack</code><code>=</code><code>True</code><code>)</code>

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

表达式符号说明:

符号

<code>#</code>

表示可以匹配<code>0个</code>或<code>多个</code>单词

<code>*</code>

表示只能匹配<code>一个</code>单词

发送者路由值

队列中

是否匹配

ansheng.me

ansheng.*

不匹配

ansheng.#

匹配

<code>channel.exchange_declare(exchange</code><code>=</code><code>'topic_logs'</code><code>,</code>

<code>                         </code><code>type</code><code>=</code><code>'topic'</code><code>)</code>

<code>binding_keys </code><code>=</code> <code>sys.argv[</code><code>1</code><code>:]</code>

<code>if</code> <code>not</code> <code>binding_keys:</code>

<code>    </code><code>sys.stderr.write(</code><code>"Usage: %s [binding_key]...\n"</code> <code>%</code> <code>sys.argv[</code><code>0</code><code>])</code>

<code>for</code> <code>binding_key </code><code>in</code> <code>binding_keys:</code>

<code>    </code><code>channel.queue_bind(exchange</code><code>=</code><code>'topic_logs'</code><code>,</code>

<code>                       </code><code>routing_key</code><code>=</code><code>binding_key)</code>

<code>routing_key </code><code>=</code> <code>sys.argv[</code><code>1</code><code>] </code><code>if</code> <code>len</code><code>(sys.argv) &gt; </code><code>1</code> <code>else</code> <code>'anonymous.info'</code>

<code>channel.basic_publish(exchange</code><code>=</code><code>'topic_logs'</code><code>,</code>

<code>                      </code><code>routing_key</code><code>=</code><code>routing_key,</code>

<code>print</code><code>(</code><code>" [x] Sent %r:%r"</code> <code>%</code> <code>(routing_key, message))</code>

客户端发送一个任务到服务端,服务端把任务的执行结果再返回给客户端

<a href="https://s4.51cto.com/wyfs02/M00/95/6E/wKiom1kVK2rB1RUhAACEQPOtmes668.png" target="_blank"></a>

RPC Server

<code># _*_coding:utf-8_*_</code>

<code># 声明一个RPC QUEUE</code>

<code>channel.queue_declare(queue</code><code>=</code><code>'rpc_queue'</code><code>)</code>

<code>def</code> <code>fib(n):</code>

<code>    </code><code>if</code> <code>n </code><code>=</code><code>=</code> <code>0</code><code>:</code>

<code>        </code><code>return</code> <code>0</code>

<code>    </code><code>elif</code> <code>n </code><code>=</code><code>=</code> <code>1</code><code>:</code>

<code>        </code><code>return</code> <code>1</code>

<code>    </code><code>else</code><code>:</code>

<code>        </code><code>return</code> <code>fib(n </code><code>-</code> <code>1</code><code>) </code><code>+</code> <code>fib(n </code><code>-</code> <code>2</code><code>)</code>

<code>        </code> 

<code>def</code> <code>on_request(ch, method, props, body):</code>

<code>    </code><code># 接受传过来的值</code>

<code>    </code><code>n </code><code>=</code> <code>int</code><code>(body)</code>

<code>    </code><code>print</code><code>(</code><code>" [.] fib(%s)"</code> <code>%</code> <code>n)</code>

<code>    </code><code># 交给fib函数进行斐波那契处理</code>

<code>    </code><code>response </code><code>=</code> <code>fib(n)</code>

<code>    </code><code># 把结果发回去,此时消费者变成生产者</code>

<code>    </code><code>ch.basic_publish(exchange</code><code>=</code><code>'',</code>

<code>                     </code><code>routing_key</code><code>=</code><code>props.reply_to,</code>

<code>                     </code><code># 客户端传过来的UUID顺便发回去</code>

<code>                     </code><code>properties</code><code>=</code><code>pika.BasicProperties(correlation_id</code><code>=</code><code>props.correlation_id),</code>

<code>                     </code><code>body</code><code>=</code><code>str</code><code>(response))</code>

<code>    </code><code># 持久化</code>

<code># 同时只处理一个任务</code>

<code>channel.basic_consume(on_request, queue</code><code>=</code><code>'rpc_queue'</code><code>)</code>

<code>print</code><code>(</code><code>" [x] Awaiting RPC requests"</code><code>)</code>

RPC Client

<code>import</code> <code>uuid</code>

<code>class</code> <code>FibonacciRpcClient(</code><code>object</code><code>):</code>

<code>    </code><code>def</code> <code>__init__(</code><code>self</code><code>):</code>

<code>        </code><code>self</code><code>.connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>            </code><code>host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>

<code>            </code> 

<code>        </code><code>self</code><code>.channel </code><code>=</code> <code>self</code><code>.connection.channel()</code>

<code>        </code><code>result </code><code>=</code> <code>self</code><code>.channel.queue_declare(exclusive</code><code>=</code><code>True</code><code>)</code>

<code>        </code><code># 服务端返回处理完毕的数据新Queue名称</code>

<code>        </code><code>self</code><code>.callback_queue </code><code>=</code> <code>result.method.queue</code>

<code>        </code><code>self</code><code>.channel.basic_consume(</code><code>self</code><code>.on_response, no_ack</code><code>=</code><code>True</code><code>,</code>

<code>                                   </code><code>queue</code><code>=</code><code>self</code><code>.callback_queue)</code>

<code>                                   </code> 

<code>    </code><code>def</code> <code>on_response(</code><code>self</code><code>, ch, method, props, body):</code>

<code>        </code><code># corr_id等于刚刚发送过去的ID,就代表这条消息是我的</code>

<code>        </code><code>if</code> <code>self</code><code>.corr_id </code><code>=</code><code>=</code> <code>props.correlation_id:</code>

<code>            </code><code>self</code><code>.response </code><code>=</code> <code>body</code>

<code>    </code><code>def</code> <code>call(</code><code>self</code><code>, n):</code>

<code>        </code><code>self</code><code>.response </code><code>=</code> <code>None</code>

<code>        </code><code># 生成一个唯一ID,相当于每个任务的ID</code>

<code>        </code><code>self</code><code>.corr_id </code><code>=</code> <code>str</code><code>(uuid.uuid4())</code>

<code>        </code><code>self</code><code>.channel.basic_publish(exchange</code><code>=</code><code>'',</code>

<code>                                   </code><code>routing_key</code><code>=</code><code>'rpc_queue'</code><code>,</code>

<code>                                   </code><code>properties</code><code>=</code><code>pika.BasicProperties(</code>

<code>                                       </code><code># 让服务端处理完成之后把数据放到这个Queue里面</code>

<code>                                       </code><code>reply_to</code><code>=</code><code>self</code><code>.callback_queue,</code>

<code>                                       </code><code># 加上一个任务ID</code>

<code>                                       </code><code>correlation_id</code><code>=</code><code>self</code><code>.corr_id,</code>

<code>                                   </code><code>),</code>

<code>                                   </code><code>body</code><code>=</code><code>str</code><code>(n))</code>

<code>        </code><code>while</code> <code>self</code><code>.response </code><code>is</code> <code>None</code><code>:</code>

<code>            </code><code># 不断地去Queue接受消息,但不是阻塞的,而是一直循环的去取</code>

<code>            </code><code>self</code><code>.connection.process_data_events()</code>

<code>        </code><code>return</code> <code>int</code><code>(</code><code>self</code><code>.response)</code>

<code>fibonacci_rpc </code><code>=</code> <code>FibonacciRpcClient()</code>

<code>print</code><code>(</code><code>" [x] Requesting fib(30)"</code><code>)</code>

<code>response </code><code>=</code> <code>fibonacci_rpc.call(</code><code>30</code><code>)</code>

<code>print</code><code>(</code><code>" [.] Got %r"</code> <code>%</code> <code>response)</code>

本文转自 Edenwy  51CTO博客,原文链接:http://blog.51cto.com/edeny/1924932,如需转载请自行联系原作者