如果我們需要在遠端計算機上運作一個函數并等待結果,這種模式通常被稱為遠端過程調用或RPC。
在本教程中,我們将使用RabbitMQ建構一個RPC系統:一個用戶端和一個RPC伺服器。我們将建立一個傳回斐波那契數字的模拟RPC服務。
整個過程示意圖如下:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcugXaz1ibvhGd5B3LcNHbhlmcvRXd09CXn1WavwVbvNmLx1GdpJmYhJnL3d3dvw1LcpDc0RHaiojIsJye.png)
用戶端将請求發送至rpc_queue(我們定義的消息隊列),然後等待響應;服務端擷取請求,并處理請求,然後将請求結果傳回給隊列,用戶端得知請求被響應後擷取結果。
在結果被響應之前,用戶端是被阻塞的,主線程會等待RPC響應
如果每個RPC請求都建立一個回調隊列。這是非常低效,我們建立一個單一的用戶端回調隊列。
這引發了一個新的問題,在該隊列中收到回複時,不清楚回複屬于哪個請求。這就需要用到 correlationId屬性。我們為沒有請求設定唯一的correlationId值。然後,當我們在回調隊列中收到一條消息時,我們将擷取這個值,将響應與請求的進行correlationId比對。如果我們一緻就是我們需要的結果,否則就不是。
用戶端代RPCClient 碼如下:
package com.adtec.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
//建立一個連接配接和一個通道,并為回調聲明一個唯一的'回調'隊列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//定義一個臨時變量的接受隊列名
replyQueueName = channel.queueDeclare().getQueue();
}
//發送RPC請求
public String call(String message) throws IOException, InterruptedException {
//生成一個唯一的字元串作為回調隊列的編号
String corrId = UUID.randomUUID().toString();
//發送請求消息,消息使用了兩個屬性:replyto和correlationId
//服務端根據replyto傳回結果,用戶端根據correlationId判斷響應是不是給自己的
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//釋出一個消息,requestQueueName路由規則
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//由于我們的消費者交易處理是在單獨的線程中進行的,是以我們需要在響應到達之前暫停主線程。
//這裡我們建立的 容量為1的阻塞隊列ArrayBlockingQueue,因為我們隻需要等待一個響應。
final BlockingQueue<String> response = new ArrayBlockingQueue<String>();
// String basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//檢查它的correlationId是否是我們所要找的那個
if (properties.getCorrelationId().equals(corrId)) {
//如果是,則響應BlockingQueue
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
}
上面的代碼中用到了阻塞隊列ArrayBlockingQueue,欲知其原理可以移步:http://www.infoq.com/cn/articles/java-blocking-queue/我覺得這個解釋還不錯。
服務端代RPCServer 碼如下:
package rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
//具體處理方法
private static int fib(int n) {
if (n == )
return ;
if (n == )
return ;
return fib(n - ) + fib(n - );
}
public static void main(String[] argv) {
//建立連接配接、通道,并聲明隊列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos();
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
// 傳回處理結果隊列
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 确認消息,已經收到後面參數 multiple:是否批量.true:将一次性确認所有小于envelope.getDeliveryTag()的消息。
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC
// server owner thread
synchronized (this) {
this.notify();
}
}
}
};
//取消自動确認
boolean autoAck = false ;
channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {
}
}
}
}
測試時先運作服務端,再運作用戶端
為了友善觀察結果,最好将用戶端和服務端在不同workspace實作