天天看點

RabbitMQ詳解(二)

一、RabbitMQ架構淺析

二、收發“hello world”

三、任務分發機制

1.MQ架構圖  

RabbitMQ Server: 也叫broker server,是一種傳輸服務,維護一條從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。

但是這個保證也不是100%的保證,但是對于普通的應用來說這已經足夠了。當然對于商業系統來說,可以再做一層資料一緻性的guard,就可以徹底保證系統的一緻性了。

Client A & B: 也叫Producer,資料的發送方。createmessages and publish (send) them to a broker server (RabbitMQ).

一個Message有兩個部分:payload(有效載荷)和label(标簽)。payload顧名思義就是傳輸的資料。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

Client 1,2,3:也叫Consumer,資料的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,隻有payload,label已經被删掉了。對于Consumer來說,它是不知道誰發送的這個資訊的。就是協定本身不支援。但是當然了如果Producer發送的payload包含了Producer的資訊就另當别論了。

2.channle和connection

Connection: 就是一個TCP的連接配接。Producer和Consumer都是通過TCP連接配接到RabbitMQ Server的。程式的起始處就是建立這個TCP連接配接。

Channels: 虛拟連接配接。它建立在上述的TCP連接配接中。資料流動都是在Channel中進行的。也就是說,一般情況是程式起始建立TCP連接配接,第二步就是建立這個Channel。

對于OS來說,建立和關閉TCP連接配接是有代價的,頻繁的建立關閉TCP連接配接對于系統的性能有很大的影響,而且TCP的連接配接數也有限制,這也限制了系統處理高并發的能力。但是,在TCP連接配接中建立Channel是沒有上述代價的。

對于Producer或者Consumer來說,可以并發的使用多個Channel進行Publish或者Receive。 

3.ack确認機制

如果Message被某個consumer消費了,那麼該Message就會被從queue中移除。//當然也可以讓同一到個Message發送到很多Consumer

如果沒有被任何consumer消費,那麼這個Message會被Cache,不會被丢棄。資料被consumer正确的Consumer收到時,資料就會被從queue中删除

正确的收到:使用ack機制實作//可以顯式在程式中去ack,也可以自動的ack。如果資料沒有被ack:rabbitmq server會把該消息傳輸到下一個consumer

如果這個app忘記了ack。那麼rabbitmq server不會再發送資料給它。因為server認為這個consumer的處理能力有限

使用ack也可以起到一定的限流的作用:在consumer處理完成資料後發送ack,甚至在額外的延時後發送ack,将有效的balance consumer的load

當然對于實際的例子,比如我們可能會對某些資料進行merge,比如merge 4s内的資料,然後sleep 4s後再擷取資料。特别是在監聽系統的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端使用者也不會感覺到。

4.Reject a message 

有兩種方式,第一種的Reject可以讓RabbitMQ Server将該Message 發送到下一個Consumer。第二種是從queue中立即删除該Message。

5.Creating a queue

Consumer和Procuder都可以通過 queue.declare 建立queue。對于某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以建立私有的queue。這樣隻有app本身才可以使用這個queue。queue也可以自動删除,被标為auto-delete的queue在最後一個Consumer unsubscribe後就會被自動删除。那麼如果是建立一個已經存在的queue呢?那麼不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次建立如果參數和第一次不一樣,那麼該操作雖然成功,但是queue的屬性并不會被修改。

那麼誰應該負責建立這個queue呢?是Consumer,還是Producer?

如果queue不存在,當然Consumer不會得到任何的Message。但是如果queue不存在,那麼Producer Publish的Message會被丢棄。是以,還是為了資料不丢失,Consumer和Producer都try to create the queue!反正不管怎麼樣,這個接口都不會出問題。

queue對load balance的處理是完美的。對于多個Consumer來說,RabbitMQ 使用循環的方式(round-robin)的方式均衡的發送給不同的Consumer。

6.Exchanges

從架構圖可以看出,Procuder Publish的Message進入了Exchange。接着通過“routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue裡。queue也是通過這個routing keys來做的綁定。

有三種類型的Exchanges:direct, fanout,topic。 每個實作了不同的路由算法(routing algorithm)。

Direct exchange: 如果 routing key 比對, 那麼Message就會被傳遞到相應的queue中。其實在queue建立時,它會自動的以queue的名字作為routing key來綁定那個exchange。

Fanout exchange: 會向響應的queue廣播。

Topic exchange: 對key進行模式比對,比如ab*可以傳遞到所有ab*的queue。

7.Virtual hosts

每個virtual host本質上都是一個RabbitMQ Server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個不同的application中使用RabbitMQ。

python --version //用python2的 安裝python2-pika 

1.發送消息

<code>#!/usr/bin/env python</code>

<code>import</code> <code>pika</code>

<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>host</code><code>=</code><code>'localhost'</code><code>))</code>

<code>channel </code><code>=</code> <code>connection.channel()</code>

<code>channel.queue_declare(queue</code><code>=</code><code>'hello'</code><code>)</code>

<code>channel.basic_publish(exchange</code><code>=</code><code>'',</code>

<code>routing_key</code><code>=</code><code>'hello'</code><code>,</code>

<code>body</code><code>=</code><code>'Hello World!'</code><code>)</code>

<code>print</code> <code>" [x] Sent 'Hello World!'"</code>

<code>connection.close()</code>

=====================================================================

建立連接配接-&gt;建立channel-&gt;建立名字為hello的隊列-&gt;發送消息-&gt;關閉連接配接

從架構圖可以看出,Producer隻能發送到exchange,它是不能直接發送到queue的。現在我們使用預設的exchange(名字是空字元)。這個預設的exchange允許我們發送給指定的queue。routing_key就是指定的queue名字。

關閉連接配接

[root@node112 test]# rabbitmqctl list_queues //檢視已經發送的隊列

Listing queues ...

Hello 1 //被消費後,會變成0

...done.

2.接受消息

<code>print</code> <code>' [*] Waiting for messages. To exit press CTRL+C'</code>

<code>def</code> <code>callback(ch, method, properties, body):</code>

<code>print</code> <code>" [x] Received %r"</code> <code>%</code> <code>(body,)</code>

<code>channel.basic_consume(callback,</code>

<code>queue</code><code>=</code><code>'hello'</code><code>,</code>

<code>no_ack</code><code>=</code><code>True</code><code>)</code>

<code>channel.start_consuming()</code>

建立連接配接-&gt;建立channel-&gt;建立名字為hello的隊列-&gt;消費消息-&gt;關閉連接配接

subscribe了。在這之前,需要聲明一個回調函數來處理接收到的資料。

3.運作測試

$ python send.py  

[x] Sent 'Hello World!'

send.py 每次運作完都會停止。注意:現在資料已經存到queue裡了。接收它:

$ python receive.py  

[*] Waiting for messages. To exit press CTRL+C  

[x] Received 'Hello World!'  

RabbitMQ Server将queue的Message發送給不同的Consumer以處理計算密集型的任務

1.任務分發機制

new_task.py //發送者

====================================================================

<code>#!/usr/bin/env python  </code>

<code>import</code> <code>pika  </code>

<code>import</code> <code>sys  </code>

<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(  </code>

<code>host</code><code>=</code><code>'localhost'</code><code>))  </code>

<code>channel </code><code>=</code> <code>connection.channel()  </code>

<code>channel.queue_declare(queue</code><code>=</code><code>'task_queue'</code><code>, durable</code><code>=</code><code>True</code><code>)    </code>

<code>message </code><code>=</code> <code>' '</code><code>.join(sys.argv[</code><code>1</code><code>:]) </code><code>or</code> <code>"Hello World!"</code>  

<code>channel.basic_publish(exchange</code><code>=</code><code>'',  </code>

<code>routing_key</code><code>=</code><code>'task_queue'</code><code>,  </code>

<code>body</code><code>=</code><code>message,  </code>

<code>properties</code><code>=</code><code>pika.BasicProperties(  </code>

<code>delivery_mode </code><code>=</code> <code>2</code><code>, </code><code># make message persistent  </code>

<code>))  </code>

<code>print</code> <code>" [x] Sent %r"</code> <code>%</code> <code>(message,)  </code>

worker.py //收集者

===================================================================

<code>import</code> <code>time    </code>

<code>channel.queue_declare(queue</code><code>=</code><code>'task_queue'</code><code>, durable</code><code>=</code><code>True</code><code>)  </code>

<code>print</code> <code>' [*] Waiting for messages. To exit press CTRL+C'</code>    

<code>def</code> <code>callback(ch, method, properties, body):  </code>

<code>print</code> <code>" [x] Received %r"</code> <code>%</code> <code>(body,)  </code>

<code>time.sleep( body.count(</code><code>'.'</code><code>) )  </code>

<code>print</code> <code>" [x] Done"</code>  

<code>ch.basic_ack(delivery_tag </code><code>=</code> <code>method.delivery_tag)    </code>

<code>channel.basic_qos(prefetch_count</code><code>=</code><code>1</code><code>)  </code>

<code>channel.basic_consume(callback,  </code>

<code>queue</code><code>=</code><code>'task_queue'</code><code>)</code>

2.Round-robin循環分發

RabbitMQ對于load較大的情況,可以通過增加consumer和多建立VirtualHost解決

<code>Consumer</code><code>1:</code><code># python worker.py </code>

<code>Consumer</code><code>2:</code><code># python worker.py </code>

<code>Producer:#[root@node</code><code>112</code> <code>test]# for i in First Second Third Fourth Fifth ; do python new_task.py $i messages  ; done</code>

<code> </code><code>[x] Sent </code><code>'First messages'</code>

<code> </code><code>[x] Sent </code><code>'Second messages'</code>

<code> </code><code>[x] Sent </code><code>'Third messages'</code>

<code> </code><code>[x] Sent </code><code>'Fourth messages'</code>

<code> </code><code>[x] Sent </code><code>'Fifth messages'</code>

<code>驗證:</code>

<code>Consumer</code><code>1:</code>

<code>[root@node</code><code>112</code> <code>test]# python worker.py </code>

<code>[*] Waiting for messages. To exit press CTRL+C</code>

<code>[x] Received </code><code>'Second messages'</code>

<code>[x] Done</code>

<code>[x] Received </code><code>'Fourth messages'</code>

<code>Consumer</code><code>2:</code>

<code>[x] Received </code><code>'First messages'</code>

<code>[x] Received </code><code>'Third messages'</code>

<code>[x] Received </code><code>'Fifth messages'</code>

預設情況下,RabbitMQ 會順序的分發每個Message。當每個收到ack後,會将該Message删除,然後将下一個Message分發到下一個Consumer。這種分發方式叫做round-robin。

3.消息确認

no-ack:Consumer收到消息後,RabbitMQ Server會立即把這個message标記為完成,然後從queue中退出 //

ack:資料被接收并且被處理後(RabbitMQ Server收到ACK)才會去安全的删除資料

如果Consumer退出了但是沒有發送ack,RabbitMQ會把這個Message發送到下一個Consumer。保證在Consumer異常退出的情況下資料不會丢失。

這裡并沒有用到逾時機制。RabbitMQ僅僅通過Consumer的連接配接中斷來确認該Message并沒有被正确處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做資料處理。

預設情況下,消息确認是打開的(enabled)。

<code>ch.basic_ack(delivery_tag </code><code>=</code> <code>method.delivery_tag)  </code>

<code>queue</code><code>=</code><code>'hello'</code><code>)</code>

這樣即使你通過Ctr-C中斷了worker.py,那麼Message也不會丢失了,它會被分發到下一個Consumer。

如果忘記了ack,那麼後果很嚴重。當Consumer退出時,Message會重新分發。然後RabbitMQ會占用越來越多的記憶體,由于RabbitMQ會長時間運作,是以這個“記憶體洩漏”是緻命的。去調試這種錯誤,可以通過一下指令列印un-acked Messages:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged  

4.消息持久化

将queue和Message持久化

隊列持久化:channel.queue_declare(queue='hello', durable=True)  

再次強調,Producer和Consumer都應該去建立這個queue,盡管隻有一個地方的建立是真正起作用的:

接下來,需要持久化Message,即在Publish的時候指定一個properties,方式如下:

<code>routing_key</code><code>=</code><code>"task_queue"</code><code>,  </code>

<code>))</code>

防止資料丢失:

1.Consumer在資料處理結束後發送ack,這樣RabbitMQ Server會認為Message Deliver 成功。

2.持久化queue,可以防止RabbitMQ Server 重新開機或者crash引起的資料丢失。

3.持久化Message,理由同上。

但是資料依然存在丢失的風險。//例如在存儲到磁盤的時間過程中

RabbitMQ并不是為每個Message都做fsync:它可能僅僅是把它儲存到Cache裡,還沒來得及儲存到實體磁盤上。

方案:把每次的publish放到一個transaction中。這個transaction的實作需要user defined codes。

或者在{系統panic/異常重新開機/斷電}時,給各個應用留出時間去flash cache,保證每個應用都能exit gracefully。

5.公平分發

預設狀态下,RabbitMQ将第n個Message分發給第n個Consumer。當然n是取餘後的。它不管Consumer是否還有unacked Message,隻是按照這個預設機制進行分發。

那麼如果有個Consumer工作比較重,那麼就會導緻有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。那麼,RabbitMQ是如何處理這種問題呢?

通過 basic.qos 方法設定prefetch_count=1 。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會将新的Message分發給它。 設定方法如下:

channel.basic_qos(prefetch_count=1)  

注意:這種方法可能會導緻queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者建立更多的virtualHost來細化你的設計。

轉自:http://blog.csdn.net/column/details/rabbitmq.html

官網:http://www.rabbitmq.com

http://blog.csdn.net/anzhsoft/article/details/19563091

本文轉自MT_IT51CTO部落格,原文連結:http://blog.51cto.com/hmtk520/2051211,如需轉載請自行聯系原作者