部落格翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程式
RabbitMQ(二):Work Queues、循環分發、消息确認、持久化、公平分發
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調隊列callback queue、關聯辨別correlation id、實作簡單的RPC系統
RabbitMQ(七):常用方法說明 與 學習小結
遠端過程調用(RPC):
在第二篇部落格中,我們學會了如何使用工作隊列将耗時的任務分發給多個工作者。但假如我們想調用遠端電腦上的一個函數(或方法)并等待函數執行的結果,這時候該怎麼辦呢?好吧,這是一個不同的故事。這種模式通常稱為遠端過程調用RPC(
Remote Procedure Call
)。
在今天的教程中,我們将會使用RabbitMQ來建立一個RPC系統:一個用戶端和一個可擴充的RPC服務端。因為我們沒有任何現成的耗時任務,我們将會建立一個假的RPC服務,它将傳回斐波那契數(
Fibonacci numbers
)。
用戶端接口(Client interface):
為了示範如何使用RPC服務,我們将建立一個簡單的用戶端類。它負責暴露一個名為
call
的方法,該方法将發送一個RPC請求并阻塞,直到接收到回答。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
回調隊列(Callback queue):
使用RabbitMQ來做RPC很容易。用戶端發送一個請求消息,服務端以一個響應消息回應。為了可以接收到響應,需要與請求(消息)一起,發送一個回調的隊列。我們使用預設的隊列(Java獨有的):
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
消息屬性
AMQP 0-9-1協定預定義了消息的14種屬性。大部分屬性都很少用到,除了下面的幾種:
- ①
:标記一個消息是持久的(值為2)還是短暫的(2以外的任何值),你可能還記得我們的第二個教程中用到過這個屬性。
deliveryMode
- ②
:描述編碼的
contentType
(
mime-type
)。比如最常使用
mime-type of the encoding
格式,就可以将該屬性設定為
JSON
。
application/json
- ③
:通常用來命名一個回調隊列。
replyTo
- ④
:用來關聯RPC的響應和請求。
correlationId
我們需要引入一個新的類:
import com.rabbitmq.client.AMQP.BasicProperties;
關聯辨別(Correlation Id):
在上面的方法中,我們為每一個RPC請求都建立了一個新的回調隊列。這樣做顯然很低效,但幸好我們有更好的方式:讓我們為每一個用戶端建立一個回調隊列。
這樣做又引入了一個新的問題,在回調隊列中收到響應後不知道到底是屬于哪個請求的。這時候,
CorrelationId
就可以派上用場了。對每一個請求,我們都建立一個唯一性的值作為
CorrelationId
。之後,當我們從回調隊列中收到消息的時候,就可以查找這個屬性,基于這一點,我們就可以将一個響應和一個請求進行關聯。如果我們看到一個不知道的
CorrelationId
值,我們就可以安全地丢棄該消息,因為它不屬于我們的請求。
你可能會問,為什麼要忽視回調隊列中的不知道的消息,而不是直接以一個錯誤失敗(failing with an error)。這是由于服務端可能存在的競争條件。盡管不會,但這種情況仍有可能發生:RPC服務端在發給我們答案之後就挂掉了,還沒來得及為請求發送一個确認資訊。如果發生這種情況,重新開機後的RPC服務端将會重新處理該請求(因為沒有給RabbitMQ發送确認消息,RabbitMQ會重新發送消息給RPC服務)。這就是為什麼我們要在用戶端優雅地處理重複響應,并且理想情況下,RPC服務要是幂等的。
總結:
我們的RPC系統的工作流程如下:
當用戶端啟動後,它會建立一個異步的獨特的回調隊列。對于一個RPC請求,用戶端将會發送一個配置了兩個屬性的消息:一個是
replyTo
屬性,設定為這個回調隊列;另一個是
correlation id
屬性,每一個請求都會設定為一個具有唯一性的值。這個請求将會發送到
rpc_queue
隊列。
RPC工作者(即圖中的
server
)将會等待
rpc_queue
隊列的請求。當有請求到來時,它就會開始幹活(計算斐波那契數)并将結果通過發送消息來傳回,該傳回消息發送到
replyTo
指定的隊列。
用戶端将等待回調隊列傳回資料。當傳回的消息到達時,它将檢查
correlation id
屬性。如果該屬性值和請求比對,就将響應傳回給程式。
放在一塊:
計算斐波那契數的任務如下:
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我們定義了斐波那契函數,它假設隻會輸入正整數(不要期望該函數在輸入很大的數的時候可以好好工作,它可能是最慢的遞歸實作)。
RPC服務
RPCServer.java
的代碼如下:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
//模拟的耗時任務,即計算斐波那契數
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
//建立連接配接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//一次隻從隊列中取出一個消息
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
//監聽消息(即RPC請求)
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
//收到RPC請求後開始處理
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
//處理完之後,傳回響應(即釋出消息)
System.out.println("[server current time] : " + System.currentTimeMillis());
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
//loop to prevent reaching finally block
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException _ignore) {
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {
}
}
}
}
RPC服務的代碼很直白:
- (1)開始先建立連接配接、通道并聲明隊列。
- (2)我們可能會運作多個服務程序,為了負載均衡我們通過設定
将任務分發給多個服務程序prefetchCount =1
- (3)我們使用了
來連接配接隊列,并通過一個basicConsume
對象提供回調。這個DefaultConsumer
對象将進行工作并傳回響應。DefaultConsumer
我們的RPC用戶端
RPCClient
代碼如下:
package com.maxwell.rabbitdemo;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
//定義一個RPC用戶端
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
//真正地請求
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
System.out.println("[client current time] : " + System.currentTimeMillis());
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
//關閉連接配接
public void close() throws IOException {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
//建立一個RPC用戶端
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
//RPC用戶端發送調用請求,并等待影響,直到接收到
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
//關閉RPC客戶的連接配接
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
}
用戶端代碼看起來有一些複雜:
- (1)建立連接配接和通道,并聲明了一個獨特的回調隊列。
- (2)訂閱這個回調隊列,是以我們可以接收RPC響應。
- (3)call方法執行RPC請求。在call方法中,我們首先生成一個具有唯一性的
值并存在變量correlationId
中。我們的corrId
中的實作方法DefaultConsumer
會使用這個值來擷取争取的響應。然後,我們釋出了這個請求消息,并設定了handleDelivery
和replyTo
這兩個屬性。好了,現在我們可以坐下來耐心等待響應到來了。correlationId
- (4)由于我們的消費者處理(指
方法)是在子線程進行的,是以我們需要在響應到來之前暫停主線程(否則主線程結束了,子線程接收到了影響傳給誰啊)。使用handleDelivery
是一種解決方案。在這裡我們建立了一個阻塞隊列BlockingQueue
并将它的容量設為1,因為我們隻需要接受一個響應就可以啦。ArrayBlockingQueue
方法所做的很簡單,當有響應來的時候,就檢查是不是和handleDelivery
比對,比對的話就放到阻塞隊列correlationId
中。ArrayBlockingQueue
- 同時,主線程正等待影響。
- (5)最終将影響傳回給使用者了。
現在,可以動手實驗了。首先,執行RPC服務端,讓它等待請求的到來。
[x] Awaiting RPC requests
然後,執行RPC用戶端,即
RPCClient
中的
main
方法,發起請求:
[x] Requesting fib(30)
[client current time] : 1500474305838
[.] Got '832040'
可以看到,用戶端很快就接受到了請求,回頭看RPC服務端的時間:
[.] fib(30)
[server current time] : 1500474305835
上面這種設計并不是RPC服務端的唯一實作,但是它有以下幾個重要的優勢:
- ① 如果RPC服務端很慢,你可以通過運作多個執行個體就可以實作擴充。
- ② 在RPC用戶端,RPC要求發送和接受一個消息。非同步的方法
是必須的。這樣,RPC用戶端隻需要為一個RPC請求隻進行一次網絡往返。queueDeclare
但我們的代碼仍然太簡單,并沒有處理更複雜但也非常重要的問題,像:
- ① 如果沒有服務端在運作,用戶端該怎麼辦
- ② 用戶端應該為一次RPC設定逾時嗎
- ③ 如果服務端發生故障并抛出異常,它還應該傳回給用戶端嗎?
- ④ 在處理消息前,先通過邊界檢查、類型判斷等手段過濾掉無效的消息等
說明:
①與原文略有出入,如有疑問,請參閱原文
②原文均是編譯後通過javacp指令直接運作程式,我是在IDE中進行的,相應的操作做了修改。
③添加了用戶端和服務端執行時間。