天天看点

RabbitMQ详解(二)

一、RabbitMQ架构浅析

二、收发“hello world”

三、任务分发机制

1.MQ架构图  

RabbitMQ Server: 也叫broker server,是一种传输服务,维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。

Client A & B: 也叫Producer,数据的发送方。createmessages and publish (send) them to a broker server (RabbitMQ).

一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。

Client 1,2,3:也叫Consumer,数据的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。就是协议本身不支持。但是当然了如果Producer发送的payload包含了Producer的信息就另当别论了。

2.channle和connection

Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。

Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。

对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。 

3.ack确认机制

如果Message被某个consumer消费了,那么该Message就会被从queue中移除。//当然也可以让同一到个Message发送到很多Consumer

如果没有被任何consumer消费,那么这个Message会被Cache,不会被丢弃。数据被consumer正确的Consumer收到时,数据就会被从queue中删除

正确的收到:使用ack机制实现//可以显式在程序中去ack,也可以自动的ack。如果数据没有被ack:rabbitmq server会把该消息传输到下一个consumer

如果这个app忘记了ack。那么rabbitmq server不会再发送数据给它。因为server认为这个consumer的处理能力有限

使用ack也可以起到一定的限流的作用:在consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance consumer的load

当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。

4.Reject a message 

有两种方式,第一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从queue中立即删除该Message。

5.Creating a queue

Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

那么谁应该负责创建这个queue呢?是Consumer,还是Producer?

如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。

queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不同的Consumer。

6.Exchanges

从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

Fanout exchange: 会向响应的queue广播。

Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

7.Virtual hosts

每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。

python --version //用python2的 安装python2-pika 

1.发送消息

<code>#!/usr/bin/env python</code>

<code>import</code> <code>pika</code>

<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>host</code><code>=</code><code>'localhost'</code><code>))</code>

<code>channel </code><code>=</code> <code>connection.channel()</code>

<code>channel.queue_declare(queue</code><code>=</code><code>'hello'</code><code>)</code>

<code>channel.basic_publish(exchange</code><code>=</code><code>'',</code>

<code>routing_key</code><code>=</code><code>'hello'</code><code>,</code>

<code>body</code><code>=</code><code>'Hello World!'</code><code>)</code>

<code>print</code> <code>" [x] Sent 'Hello World!'"</code>

<code>connection.close()</code>

=====================================================================

建立连接-&gt;创建channel-&gt;创建名字为hello的队列-&gt;发送消息-&gt;关闭连接

从架构图可以看出,Producer只能发送到exchange,它是不能直接发送到queue的。现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。routing_key就是指定的queue名字。

关闭连接

[root@node112 test]# rabbitmqctl list_queues //查看已经发送的队列

Listing queues ...

Hello 1 //被消费后,会变成0

...done.

2.接受消息

<code>print</code> <code>' [*] Waiting for messages. To exit press CTRL+C'</code>

<code>def</code> <code>callback(ch, method, properties, body):</code>

<code>print</code> <code>" [x] Received %r"</code> <code>%</code> <code>(body,)</code>

<code>channel.basic_consume(callback,</code>

<code>queue</code><code>=</code><code>'hello'</code><code>,</code>

<code>no_ack</code><code>=</code><code>True</code><code>)</code>

<code>channel.start_consuming()</code>

建立连接-&gt;创建channel-&gt;创建名字为hello的队列-&gt;消费消息-&gt;关闭连接

subscribe了。在这之前,需要声明一个回调函数来处理接收到的数据。

3.运行测试

$ python send.py  

[x] Sent 'Hello World!'

send.py 每次运行完都会停止。注意:现在数据已经存到queue里了。接收它:

$ python receive.py  

[*] Waiting for messages. To exit press CTRL+C  

[x] Received 'Hello World!'  

RabbitMQ Server将queue的Message发送给不同的Consumer以处理计算密集型的任务

1.任务分发机制

new_task.py //发送者

====================================================================

<code>#!/usr/bin/env python  </code>

<code>import</code> <code>pika  </code>

<code>import</code> <code>sys  </code>

<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(  </code>

<code>host</code><code>=</code><code>'localhost'</code><code>))  </code>

<code>channel </code><code>=</code> <code>connection.channel()  </code>

<code>channel.queue_declare(queue</code><code>=</code><code>'task_queue'</code><code>, durable</code><code>=</code><code>True</code><code>)    </code>

<code>message </code><code>=</code> <code>' '</code><code>.join(sys.argv[</code><code>1</code><code>:]) </code><code>or</code> <code>"Hello World!"</code>  

<code>channel.basic_publish(exchange</code><code>=</code><code>'',  </code>

<code>routing_key</code><code>=</code><code>'task_queue'</code><code>,  </code>

<code>body</code><code>=</code><code>message,  </code>

<code>properties</code><code>=</code><code>pika.BasicProperties(  </code>

<code>delivery_mode </code><code>=</code> <code>2</code><code>, </code><code># make message persistent  </code>

<code>))  </code>

<code>print</code> <code>" [x] Sent %r"</code> <code>%</code> <code>(message,)  </code>

worker.py //收集者

===================================================================

<code>import</code> <code>time    </code>

<code>channel.queue_declare(queue</code><code>=</code><code>'task_queue'</code><code>, durable</code><code>=</code><code>True</code><code>)  </code>

<code>print</code> <code>' [*] Waiting for messages. To exit press CTRL+C'</code>    

<code>def</code> <code>callback(ch, method, properties, body):  </code>

<code>print</code> <code>" [x] Received %r"</code> <code>%</code> <code>(body,)  </code>

<code>time.sleep( body.count(</code><code>'.'</code><code>) )  </code>

<code>print</code> <code>" [x] Done"</code>  

<code>ch.basic_ack(delivery_tag </code><code>=</code> <code>method.delivery_tag)    </code>

<code>channel.basic_qos(prefetch_count</code><code>=</code><code>1</code><code>)  </code>

<code>channel.basic_consume(callback,  </code>

<code>queue</code><code>=</code><code>'task_queue'</code><code>)</code>

2.Round-robin循环分发

RabbitMQ对于load较大的情况,可以通过增加consumer和多创建VirtualHost解决

<code>Consumer</code><code>1:</code><code># python worker.py </code>

<code>Consumer</code><code>2:</code><code># python worker.py </code>

<code>Producer:#[root@node</code><code>112</code> <code>test]# for i in First Second Third Fourth Fifth ; do python new_task.py $i messages  ; done</code>

<code> </code><code>[x] Sent </code><code>'First messages'</code>

<code> </code><code>[x] Sent </code><code>'Second messages'</code>

<code> </code><code>[x] Sent </code><code>'Third messages'</code>

<code> </code><code>[x] Sent </code><code>'Fourth messages'</code>

<code> </code><code>[x] Sent </code><code>'Fifth messages'</code>

<code>验证:</code>

<code>Consumer</code><code>1:</code>

<code>[root@node</code><code>112</code> <code>test]# python worker.py </code>

<code>[*] Waiting for messages. To exit press CTRL+C</code>

<code>[x] Received </code><code>'Second messages'</code>

<code>[x] Done</code>

<code>[x] Received </code><code>'Fourth messages'</code>

<code>Consumer</code><code>2:</code>

<code>[x] Received </code><code>'First messages'</code>

<code>[x] Received </code><code>'Third messages'</code>

<code>[x] Received </code><code>'Fifth messages'</code>

默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。

3.消息确认

no-ack:Consumer收到消息后,RabbitMQ Server会立即把这个message标记为完成,然后从queue中退出 //

ack:数据被接收并且被处理后(RabbitMQ Server收到ACK)才会去安全的删除数据

如果Consumer退出了但是没有发送ack,RabbitMQ会把这个Message发送到下一个Consumer。保证在Consumer异常退出的情况下数据不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

默认情况下,消息确认是打开的(enabled)。

<code>ch.basic_ack(delivery_tag </code><code>=</code> <code>method.delivery_tag)  </code>

<code>queue</code><code>=</code><code>'hello'</code><code>)</code>

这样即使你通过Ctr-C中断了worker.py,那么Message也不会丢失了,它会被分发到下一个Consumer。

如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged  

4.消息持久化

将queue和Message持久化

队列持久化:channel.queue_declare(queue='hello', durable=True)  

再次强调,Producer和Consumer都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的:

接下来,需要持久化Message,即在Publish的时候指定一个properties,方式如下:

<code>routing_key</code><code>=</code><code>"task_queue"</code><code>,  </code>

<code>))</code>

防止数据丢失:

1.Consumer在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。

2.持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。

3.持久化Message,理由同上。

但是数据依然存在丢失的风险。//例如在存储到磁盘的时间过程中

RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。

方案:把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes。

或者在{系统panic/异常重启/断电}时,给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。

5.公平分发

默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。

那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

通过 basic.qos 方法设置prefetch_count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法如下:

channel.basic_qos(prefetch_count=1)  

注意:这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

转自:http://blog.csdn.net/column/details/rabbitmq.html

官网:http://www.rabbitmq.com

http://blog.csdn.net/anzhsoft/article/details/19563091

本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051211,如需转载请自行联系原作者