天天看點

RabbitMQ(四)

一、RabbitMQ的RPC實作

二、格式化資料ProtoBuf

三、Publisher的消息确認機制

在雲計算環境中,很多時候需要用它其他機器的計算資源,我們有可能會在接收到Message進行處理時,會把一部分計算任務配置設定到其他節點來完成。那麼,RabbitMQ如何使用RPC呢?在本篇文章中,我們将會通過其它節點求來斐波納契完成示例。

1.Client interface

為了展示一個RPC服務是如何使用的,我們将建立一段很簡單的用戶端class。 它将會向外提供名字為call的函數,這個call會發送RPC請求并且阻塞直到收到RPC運算的結果。代碼如下:

fibonacci_rpc = FibonacciRpcClient()

result = fibonacci_rpc.call(4)

print "fib(4) is %r" % (result,)

2.回調函數callback queue 

在RabbitMQ進行RPC遠端調用是比較容易的。client發送請求的Message然後server傳回響應結果。為了收到響應client在publish message時需要提供一個”callback“(回調)的queue位址。code如下:

<code>result = channel.queue_declare(exclusive=True)</code>

<code>callback_queue = result.method.queue</code>

<code>channel.basic_publish(exchange=</code><code>''</code><code>,</code>

<code>    </code><code>routing_key=</code><code>'rpc_queue'</code><code>,</code>

<code>    </code><code>properties=pika.BasicProperties(</code>

<code>        </code><code>reply_to = callback_queue,</code>

<code>        </code><code>),</code>

<code>    </code><code>body=request)</code>

<code># ... and some </code><code>code</code> <code>to read a response message from the callback_queue ...</code>

3.Message properties

AMQP 預定義了14個屬性。它們中的絕大多很少會用到。以下幾個是平時用的比較多的:

    delivery_mode: 持久化一個Message(通過設定值為2)。其他任意值都是非持久化。請移步RabbitMQ消息隊列(三):任務分發機制

    content_type: 描述mime-type 的encoding。比如設定為JSON編碼:設定該property為application/json。

    reply_to: 一般用來指明用于回調的queue(Commonly used to name a callback queue)。

    correlation_id: 在請求中關聯處理RPC響應(correlate RPC responses with requests)。

4.相關id //Correlation id

在上個小節裡,實作方法是對每個RPC請求都會建立一個callback queue。這是不高效的。幸運的是,在這裡有一個解決方法:為每個client建立唯一的callback queue。

這又有其他問題了:收到響應後它無法确定是否是它的,因為所有的響應都寫到同一個queue了。上一小節的correlation_id在這種情況下就派上用場了:對于每個request,都設定唯一的一個值,在收到響應後,通過這個值就可以判斷是否是自己的響應。如果不是自己的響應,就不去處理。    

5.總結

工作流程:

    當用戶端啟動時,它建立了匿名的exclusive callback queue.//獨占的exclusive

    用戶端的RPC請求時将同時設定兩個properties: reply_to設定為callback queue;correlation_id設定為每個request一個獨一無二的值.//correlation相關

    請求将被發送到an rpc_queue queue.

    RPC端或者說server一直在等待那個queue的請求。當請求到達時,它将通過在reply_to指定的queue回複一個message給client。

    client一直等待callback queue的資料。當message到達時,它将檢查correlation_id的值,如果值和它request發送時的一緻那麼就将傳回響應。

6.最終代碼

The code for rpc_server.py:

==================================================================================

<code>#!/usr/bin/env python</code>

<code>import pika</code>

<code>connection = pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>        </code><code>host=</code><code>'localhost'</code><code>))</code>

<code>channel = connection.channel()</code>

<code>channel.queue_declare(queue=</code><code>'rpc_queue'</code><code>)</code>

<code>def fib(n):</code>

<code>    </code><code>if n == </code><code>0:</code>

<code>        </code><code>return </code><code>0</code>

<code>    </code><code>elif n == </code><code>1:</code>

<code>        </code><code>return </code><code>1</code>

<code>    </code><code>else:</code>

<code>        </code><code>return fib(n</code><code>-1</code><code>) + fib(n</code><code>-2</code><code>)</code>

<code>def on_request(ch, method, props, body):</code>

<code>    </code><code>n = int(body)</code>

<code>    </code><code>print</code> <code>" [.] fib(%s)"</code>  <code>% (n,)</code>

<code>    </code><code>response = fib(n)</code>

<code>    </code><code>ch.basic_publish(exchange=</code><code>''</code><code>,</code>

<code>                     </code><code>routing_key=props.reply_to,</code>

<code>                     </code><code>properties=pika.BasicProperties(correlation_id = \</code>

<code>                                                     </code><code>props.correlation_id),</code>

<code>                     </code><code>body=str(response))</code>

<code>    </code><code>ch.basic_ack(delivery_tag = method.delivery_tag)</code>

<code>channel.basic_qos(prefetch_count=</code><code>1</code><code>)</code>

<code>channel.basic_consume(on_request, queue=</code><code>'rpc_queue'</code><code>)</code>

<code>print</code> <code>" [x] Awaiting RPC requests"</code>

<code>channel.start_consuming()</code>

(4) As usual we start by establishing the connection and declaring the queue.

(11) We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible).

(19) We declare a callback for basic_consume, the core of the RPC server. It's executed when the request is received. It does the work and sends the response back.

(32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.

The code for rpc_client.py:

<code>import uuid</code>

<code>class FibonacciRpcClient(object):</code>

<code>    </code><code>def __init__(self):</code>

<code>        </code><code>self.connection = pika.BlockingConnection(pika.ConnectionParameters(</code>

<code>                </code><code>host=</code><code>'localhost'</code><code>))</code>

<code>        </code><code>self.channel = self.connection.channel()</code>

<code>        </code><code>result = self.channel.queue_declare(exclusive=True)</code>

<code>        </code><code>self.callback_queue = result.method.queue</code>

<code>        </code><code>self.channel.basic_consume(self.on_response, no_ack=True,</code>

<code>                                   </code><code>queue=self.callback_queue)</code>

<code>    </code><code>def on_response(self, ch, method, props, body):</code>

<code>        </code><code>if self.corr_id == props.correlation_id:</code>

<code>            </code><code>self.response = body</code>

<code>    </code><code>def call(self, n):</code>

<code>        </code><code>self.response = None</code>

<code>        </code><code>self.corr_id = str(uuid.uuid</code><code>4</code><code>())</code>

<code>        </code><code>self.channel.basic_publish(exchange=</code><code>''</code><code>,</code>

<code>                                   </code><code>routing_key=</code><code>'rpc_queue'</code><code>,</code>

<code>                                   </code><code>properties=pika.BasicProperties(</code>

<code>                                         </code><code>reply_to = self.callback_queue,</code>

<code>                                         </code><code>correlation_id = self.corr_id,</code>

<code>                                         </code><code>),</code>

<code>                                   </code><code>body=str(n))</code>

<code>        </code><code>while self.response is None:</code>

<code>            </code><code>self.connection.process_data_events()</code>

<code>        </code><code>return int(self.response)</code>

<code>fibonacci_rpc = FibonacciRpcClient()</code>

<code>print</code> <code>" [x] Requesting fib(30)"</code>

<code>response = fibonacci_rpc.call(</code><code>30</code><code>)</code>

<code>print</code> <code>" [.] Got %r"</code> <code>% (response,)</code>

The client code is slightly more involved:

    (7) We establish a connection, channel and declare an exclusive 'callback' queue for replies.

    (16) We subscribe to the 'callback' queue, so that we can receive RPC responses.

    (18) The 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we're looking for. If so, it saves the response inself.response and breaks the consuming loop.

    (23) Next, we define our main call method - it does the actual RPC request.

    (24) In this method, first we generate a unique correlation_id number and save it - the 'on_response' callback function will use this value to catch the appropriate response.

    (25) Next, we publish the request message, with two properties: reply_to and correlation_id.

    (32) At this point we can sit back and wait until the proper response arrives.

    (33) And finally we return the response back to the user.

7.測試運作

$ python rpc_server.py  

 [x] Awaiting RPC requests  

通過client來請求fibonacci數: 

$ python rpc_client.py  

 [x] Requesting fib(30)   

該方案優勢:

    如果RPC server太慢,可以啟動另外一個RPC server進行擴充。

    在client端, 無鎖進行加鎖能同步操作,他所作的就是發送請求等待響應。

思考的問題:

    如果沒有server在運作,client需要怎麼做?

    RPC應該設定逾時機制嗎?

    如果server運作出錯并且抛出了異常,需要将這個問題轉發到client嗎?

    需要邊界檢查嗎?    

二、ProtoBuf

什麼是ProtoBuf?

    一種輕便高效的結構化資料存儲格式,可以用于結構化資料串行化,或者說序列化。它很适合做資料存儲或 RPC 資料交換格式。可用于通訊協定、資料存儲等領域的語言無關、平台無關、可擴充的序列化結構資料格式。目前提供了 C++、Java、Python 三種語言的 API。

    它可以作為RabbitMQ的Message的資料格式進行傳輸,由于是結構化的資料,這樣就極大的友善了Consumer的資料高效處理。當然了你可能說使用XML不也可以嗎?與XML相比,ProtoBuf有以下優勢:

    1.簡單

    2.size小了3-10倍

    3.速度快樂20-100倍

    4.易于程式設計

    5.減小了語義的歧義

當然了,的确還有很多類似的技術,比如JSON,Thrift等等,和他們相比,ProtoBuf的優勢或者劣勢在哪裡?簡單說來,ProtoBuf就是簡單,快。

由此可見,ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛。比如Hadoop就使用了它。

更多資訊,請閱 http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/。

    之前說到了queue和consumer之間的消息确認機制:通過設定ack。那麼Publisher能不到知道他post的Message有沒有到達queue,甚至更近一步,是否被某個Consumer處理呢?畢竟對于一些非常重要的資料,可能Publisher需要确認某個消息已經被正确處理。

    在我們的系統中,我們沒有是實作這種确認,也就是說,不管Message是否被Consume了,Publisher不會去care。他隻是将自己的狀态publish給上層,由上層的邏輯去處理。如果Message沒有被正确處理,可能會導緻某些狀态丢失。但是由于提供了其他強制重新整理全部狀态的機制,是以這種異常情況的影響也就可以忽略不計了。

    對于某些異步操作,比如用戶端需要建立一個FileSystem,這個可能需要比較長的時間,甚至要數秒鐘。這時候通過RPC可以解決這個問題。是以也就不存在Publisher端的确認機制了。

1.事務和Publisher Confirm

    如果采用标準的 AMQP 協定,則唯一能夠保證消息不會丢失的方式是利用事務機制 -- 令 channel 處于 transactional 模式、向其 publish 消息、執行 commit 動作。在這種方式下,事務機制會帶來大量的多餘開銷,并會導緻吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。

    為了使能 confirm 機制,client 首先要發送 confirm.select 方法幀。取決于是否設定了 no-wait 屬性,broker 會相應的判定是否以 confirm.select-ok 進行應答。一旦在 channel 上使用 confirm.select方法,channel 就将處于 confirm 模式。處于 transactional 模式的 channel 不能再被設定成 confirm 模式,反之亦然。

    一旦 channel 處于 confirm 模式,broker 和 client 都将啟動消息計數(以 confirm.select 為基礎從 1 開始計數)。broker 會在處理完消息後,在目前 channel 上通過發送 basic.ack 的方式對其進行 confirm 。delivery-tag 域的值辨別了被 confirm 消息的序列号。broker 也可以通過設定 basic.ack 中的 multiple 域來表明到指定序列号為止的所有消息都已被 broker 正确的處理了。

    在異常情況中,broker 将無法成功處理相應的消息,此時 broker 将發送 basic.nack 來代替 basic.ack 。在這個情形下,basic.nack 中各域值的含義與 basic.ack 中相應各域含義是相同的,同時 requeue 域的值應該被忽略。通過 nack 一或多條消息,broker 表明自身無法對相應消息完成處理,并拒絕為這些消息的處理負責。在這種情況下,client 可以選擇将消息 re-publish 。

    在 channel 被設定成 confirm 模式之後,所有被 publish 的後續消息都将被 confirm(即 ack) 或者被 nack 一次。但是沒有對消息被 confirm 的快慢做任何保證,并且同一條消息不會既被 confirm 又被 nack 。

2.消息在什麼時候确認

broker 将在下面的情況中對消息進行 confirm :

    broker 發現目前消息無法被路由到指定的 queues 中(如果設定了 mandatory 屬性,則 broker 會先發送 basic.return)

    非持久屬性的消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中)

    持久消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中),并被持久化到了磁盤(被 fsync)

    持久消息從其所在的所有 queue 中被 consume 了(如果必要則會被 acknowledge)

broker 會丢失持久化消息,如果 broker 在将上述消息寫入磁盤前異常。在一定條件下,這種情況會導緻 broker 以一種奇怪的方式運作。例如,考慮下述情景:

    1.  一個 client 将持久消息 publish 到持久 queue 中

    2.  另一個 client 從 queue 中 consume 消息(注意:該消息具有持久屬性,并且 queue 是持久化的),當尚未對其進行 ack

    3.  broker 異常重新開機

    4.  client 重連并開始 consume 消息

    在上述情景下,client 有理由認為消息需要被(broker)重新 deliver 。但這并非事實:重新開機(有可能)會令 broker 丢失消息。為了確定持久性,client 應該使用 confirm 機制。如果 publisher 使用的 channel 被設定為 confirm 模式,publisher 将不會收到已丢失消息的 ack(這是因為 consumer 沒有對消息進行 ack ,同時該消息也未被寫入磁盤)。

3.程式設計實作

首先要差別AMQP協定mandatory和immediate标志位的作用。

    mandatory和immediate是AMQP協定中basic.pulish方法中的兩個标志位,它們都有當消息傳遞過程中不可達目的地時将消息傳回給生産者的功能。具體差別在于:

1. mandatory标志位

當mandatory标志位設定為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那麼會調用basic.return方法将消息返還給生産者;當mandatory設為false時,出現上述情形broker會直接将消息扔掉。

2. immediate标志位

當immediate标志位設定為true時,如果exchange在将消息route到queue(s)時發現對應的queue上沒有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生産者。

參考:

    http://www.rabbitmq.com/tutorials/tutorial-six-python.html

    http://blog.csdn.net/anzhsoft/article/details/19771671

    http://blog.csdn.net/jiao_fuyou/article/details/21594205

    http://blog.csdn.net/jiao_fuyou/article/details/21594947

    http://my.oschina.net/moooofly/blog/142095

本文轉自MT_IT51CTO部落格,原文連結:http://blog.51cto.com/hmtk520/2051267,如需轉載請自行聯系原作者