天天看點

RabbitMQ消息隊列(七):适用于雲計算叢集的遠端調用(RPC)1. 用戶端接口 Client interface2. 回調函數隊列 Callback queue3. 相關id Correlation id4. 總結 5. 最終實作

        在雲計算環境中,很多時候需要用它其他機器的計算資源,我們有可能會在接收到message進行處理時,會把一部分計算任務配置設定到其他節點來完成。那麼,rabbitmq如何使用rpc呢?在本篇文章中,我們将會通過其它節點求來斐波納契完成示例。

        為了展示一個rpc服務是如何使用的,我們将建立一段很簡單的用戶端class。 它将會向外提供名字為call的函數,這個call會發送rpc請求并且阻塞知道收到rpc運算的結果。代碼如下:

        總體來說,在rabbitmq進行rpc遠端調用是比較容易的。client發送請求的message然後server傳回響應結果。為了收到響應client在publish message時需要提供一個”callback“(回調)的queue位址。code如下:

amqp 預定義了14個屬性。它們中的絕大多很少會用到。以下幾個是平時用的比較多的: delivery_mode: 持久化一個message(通過設定值為2)。其他任意值都是非持久化。請移步rabbitmq消息隊列(三):任務分發機制 content_type: 描述mime-type 的encoding。比如設定為json編碼:設定該property為application/json。 reply_to: 一般用來指明用于回調的queue(commonly used to name a callback queue)。 correlation_id: 在請求中關聯處理rpc響應(correlate rpc responses with requests)。

       在上個小節裡,實作方法是對每個rpc請求都會建立一個callback queue。這是不高效的。幸運的是,在這裡有一個解決方法:為每個client建立唯一的callback queue。

       這又有其他問題了:收到響應後它無法确定是否是它的,因為所有的響應都寫到同一個queue了。上一小節的correlation_id在這種情況下就派上用場了:對于每個request,都設定唯一的一個值,在收到響應後,通過這個值就可以判斷是否是自己的響應。如果不是自己的響應,就不去處理。

RabbitMQ消息隊列(七):适用于雲計算叢集的遠端調用(RPC)1. 用戶端接口 Client interface2. 回調函數隊列 Callback queue3. 相關id Correlation id4. 總結 5. 最終實作

     工作流程:

當用戶端啟動時,它建立了匿名的exclusive callback queue.

用戶端的rpc請求時将同時設定兩個properties: reply_to設定為callback queue;correlation_id設定為每個request一個獨一無二的值.

請求将被發送到an rpc_queue queue.

rpc端或者說server一直在等待那個queue的請求。當請求到達時,它将通過在reply_to指定的queue回複一個message給client。

client一直等待callback queue的資料。當message到達時,它将檢查correlation_id的值,如果值和它request發送時的一緻那麼就将傳回響應。

the code for rpc_server.py:

the server code is rather straightforward:

(4) as usual we start by establishing the connection and declaring the queue.

(11) we declare our fibonacci function. it assumes only valid positive integer input. (don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible).

(19) we declare a callback for basic_consume, the core of the rpc server. it's executed when the request is received. it does the work and sends the response back.

(32) we might want to run more than one server process. in order to spread the load equally over multiple servers we need to set theprefetch_count setting.

the code for rpc_client.py:

the client code is slightly more involved:

(7) we establish a connection, channel and declare an exclusive 'callback' queue for replies.

(16) we subscribe to the 'callback' queue, so that we can receive rpc responses.

(18) the 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we're looking for. if so, it saves the response inself.response

and breaks the consuming loop.

(23) next, we define our main call method - it does the actual rpc request.

(24) in this method, first we generate a unique correlation_id number and save it - the 'on_response' callback function will use this value to catch the appropriate response.

(25) next, we publish the request message, with two properties:

reply_to and correlation_id.

(32) at this point we can sit back and wait until the proper response arrives.

(33) and finally we return the response back to the user.

開始rpc_server.py:

通過client來請求fibonacci數:

      現在這個設計并不是唯一的,但是這個實作有以下優勢:

如何rpc server太慢,你可以擴充它:啟動另外一個rpc server。

在client端, 無所進行加鎖能同步操作,他所作的就是發送請求等待響應。

      我們的code還是挺簡單的,并沒有嘗試去解決更複雜和重要的問題,比如:

如果沒有server在運作,client需要怎麼做?

rpc應該設定逾時機制嗎?

如果server運作出錯并且抛出了異常,需要将這個問題轉發到client嗎?

需要邊界檢查嗎?

尊重原創,轉載請注明出處 anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19633107

參考資料:

1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html