摘要 實作遠端調用
RabbitMQ RabbitMQ入門
目錄[-]
- 遠端過程調用(RPC)
- (使用Java用戶端)
- 使用者接口
- 回收隊列
- 相關性ID (原:Correlation Id)
- 摘要
- 把所有的放在一起
在指南的第二部分,我們學習了如何使用工作隊列将耗時的任務分布到多個工作者中。
但是假如我們需要調用遠端計算機的函數,等待結果呢?好吧,這又是另一個故事了。這模式通常被稱為遠端過程調用或RPC。
在這部分,我們将會使用RabbitMQ建構一個RPC系統:一個用戶端和一個可擴充的RPC伺服器。由于我們還沒有值得分散的耗時任務,我們将會建立一個虛拟的RPC服務,用來傳回Fibonacci(斐波納契數列)。
為了說明RPC服務如何使用,我們将會建立一個簡單德用戶端類。它會暴露一個叫
call
的方法,用來發送一個RPC請求,在響應回複之前都會一直阻塞:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
RPC方面的注意
雖然RPC在電腦運算方面是一個十分普通的模式,但是它依舊常常受批判的。
如果一個程式員沒有意識到函數
是本地的還是一個遲鈍的RPC。這結果是不可預知的很讓你困惑的,并且會增加不必要的複雜調試。與簡化軟體相反,誤用RPC會導緻不可維護的意大利面條代碼(譯者注:原文是spaghetti code可能形容代碼很長很亂)。
call
思想中煎熬,考慮下接下來的建議:
確定明顯區分哪個是函數
call
是本地調用的,哪個是遠端調用的。
給你的系統加上文檔,讓元件之間的依賴項清晰可見的。
處理錯誤事件。當RPC伺服器很久沒有響應了,用戶端應該如何響應?
當關于RPC的所有疑問消除,在你可以的情況下,你應該使用一個異步的管道,代替RPC中阻塞,結果會異步的放入接下來的計算平台。
一般來說在RabbitMQ上做RPC是容易的。一個用戶端發送一個請求消息,一個伺服器傳回響應消息。為了接受到響應,我們需要再請求中帶上一個
callback
隊列的位址。我們可以使用預設隊列(那個在Java用戶端上市獨占的)。讓我們試一下:
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
消息屬性
這AMQP協定預先确定了消息中的14個屬性。他們大多數屬性很少使用,除了下面這些例外:
:将一個消息标記為持久化(值為2)或者瞬态的(其他值)。你可能從第二部分中記起這個屬性。
deliveryMode
:用來描述媒體類型的編碼。例如常常使用的JSON編碼,這是一個好的慣例,設定這個屬性為:
contentType
。
application/json
:通常來命名回收隊列的名字。
replyTo
:對RPC加速響應請求是很有用的。
correlationId
我們需要這個新的引用:
import com.rabbitmq.client.AMQP.BasicProperties;
在目前方法中我們建議為每一個RPC請求建立一個回收隊列。這個效率十分低下的,但幸運的是有一個更好的方式- 讓我們為每一個用戶端建立一個單一的回收隊列。
這樣又出現了新的問題,沒有清晰的判斷隊列中的響應是屬于哪個請求的。這個時候
coorrelationId
屬性發揮了作用。我們将每個請求的這個屬性設定為唯一值。以後當我們在回收隊列中接收消息時,我們将會檢視這個屬性,依據這個屬性值,我們是能将每個響應比對的對應的請求上。如果我們遇見個未知的
correlationId
值,我們可以安全的丢棄這個消息-因為它不屬于任何一個我們的請求。
你可能會問,為什麼我們要忽略哪些在回收隊列中未知的消息,而不是以一個錯誤結束?因為在伺服器竟态條件下,這種情況是可能的。RPC伺服器發送給我們答應之後,在發送一個确認消息之前,就死掉了,雖然這種可能性不大,但是它依舊存在可能。如果這事情發生了,RPC伺服器重新開機之後,将會再一次處理請求。這就是為什麼我們要溫和地處理重複的響應,這RPC理想情況下是幂等的。

我們的RPC将會像這樣工作:
當用戶端啟動,它會建立一個匿名的獨占的回收隊列。
對于一個RPC請求,用戶端會發送一個消息中有兩個屬性:
,要發送的的回收隊列和
replyTo
correlationId
,對于每一個請求都是唯一值。
這請求發送到
rpc_queue
隊列中。
這RPC工作者(亦稱:伺服器)等候隊列中的請求。當請求出現,它處理這工作并發送攜帶結果的資訊到用戶端,使用的隊列是消息屬性
replTo
中的那個。
用戶端等待回收隊列中的資料。當一個消息出現,它會檢查
屬性。如果它符合請求中的值,它會傳回這響應給應用程式。
correlationId
斐波那契任務:
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我們聲明我們的斐波那契函數。它假定一個合法的正整數做為輸入參數。(不要期望這個可以處理大量數字,它可能是最慢的遞歸實作了)。
我們的RPC伺服器
RPCServer.java
的代碼:
private static final String RPC_QUEUE_NAME = "rpc_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
這伺服器代碼是相當簡單明了的:
如往常一樣,我們開始建立連接配接,通道和聲明隊列。
我們可能想運作不止一個伺服器程序。為了均衡的負載到多個伺服器上,我們需要設定
channel.basicQos
中的
prefetchCount
屬性。
我們使用
basicConsume
通路隊列。然後進入
while
循環,我們等待請求消息,處理工作,發送響應。
我們RPC用戶端
RPCClient.java
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery =consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
The client code is slightly more involved:
這用戶端代碼是更加清晰:
我們建立一個連接配接和通道并且聲明一個獨占的
callback
隊列用來等待答複。
我們訂閱這個
callback
隊列,以便于我們可以接收到RPC響應。
我們的
call
方法做這真正的RPC請求。
接着,我們首次生成一個唯一的
correlationId
數字并且儲存它,在循環中使用這個值找到合适的響應。
接下來,我們釋出請求消息,帶着兩個屬性:
replyTo
和
correlationId
這時候,我們可以坐下來,等着合适的響應抵達。
這循環中做了個簡單德工作,檢查每一個響應消息中
correlationId
值,是否是它要尋找的。如果是,它會儲存這響應。
最終,我們把響應傳回給使用者。
制造用戶端請求:
RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();
現在是時候讓我們回顧下我們RPCClient.java和RPCServer.java中的全部例子的源碼(包含基本的異常處理)。
編譯和如往常一樣建立類路徑(看指南的第一部分)
$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java
我們的RPC服務現在準備好了,我們啟動着伺服器:
$ java -cp $CP RPCServer
[x] Awaiting RPC requests
為了請求一個斐波那契數字,運作用戶端:
$ java -cp $CP RPCClient
[x] Requesting fib(30)
現在的設計不僅僅可以實作一個RPC服務,并且它還有幾項重要的優勢:
如果RPC伺服器反應太遲緩,你可以通過運作另一個程式來擴充。試着通過一個新的控制平台來運作第二個RPC伺服器。在用戶端這邊,RPC要求僅發送和接收一個消息。像
queueDeclare
非同步調用是被要求的。是以,RPC用戶端僅僅需要一個網絡循環的單一RPC請求。