说明
通过前面五篇教程,我们已经学习了RabbitMQ的基本使用方法,了解了RabbitMQ的消息派发方式,消息的持久化,消息确认机制… 通过简单的程序,对RabbitMQ的消息传递模型有了初步了解,学习了Fanout,Direct,Topic三种类型交换机的使用,明白了消息路由的实现机制。本篇教程将继续翻译官方的第六篇教程RPC,学习如何通过RabbitMQ实现远程过程调用。
正文
在第二篇教程中,我们学习了如何使用任务队列在多个消费者中派发耗时任务。
但是,如果我们需要在远程计算上运行一个函数并且等待返回结果呢?这是一种完全不同的函数调用方式,通常这种模式被称为远程过程调用或RPC。
在本节教程中,我们将使用RabbitMQ构建一个RPC系统:包含一个客户端和一个可扩展的RPC服务。这里我们因为没有真实的耗时任务可以发送,所有创建一个返回斐波那契数列的虚拟RPC服务。
Client interface
我们将创建一个简单的客户端类来说明如何使用一个RPC服务,它将暴露一个名为call的方法,这个方法发送一个RPC请求并会一直阻塞直到接收到答案。
RPCClient rpcClient = new RPCClient();
String response = rpcClient.call(4);
System.out.println(" [.] Got '" + response + "'");
RPC注意事项
虽然RPC是计算机中一种常见的模式,但是它一直备受批评。当一个程序员不清楚函数调用是发生在本地还是一个缓慢的RPC调用时,就会出现问题。导致一个不能完全掌控的系统,并增加一些不必要的调试复杂性。错误的使用RPC不但不能简化软件,还会导致一些无法维护的意大利面条式的代码。
为避免这些,可以考虑以下建议:
- 确保一个函数调用发生在本地或远程调用是明显的。
- 系统文档化,使得各个组件间的依赖关系清晰化。
- 处理错误情况,当一个RPC服务长时间宕机时,客户端如何处理?
如果有疑问时,尽量避免RPC。如果可能,你应该使用一个异步的管道代替阻塞式的PRC,结果将被异步地推送到下一个计算阶段。
Callback queue
使用RabbitMQ实现RPC是容易的。一个客户端发送一个请求消息,服务端回复一个应答消息。为了接收到应答消息,我们需要在发送请求时携带一个回调队列的地址。我们可以使用默认队列(在java客户端中它是被独占的)。
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));
Message properties
AMQP 0-9-1协议预先为消息制定了一组包含14个属性,除了以下列出的属性,其他属性都很少使用:
- deliveryMode: 标记消息是否持久化,在第二个教程中我们使用过这个属性。
- contentType: 用来描述编码的mime类型。例如 我们经常使用的JSON编码,我们可以设置这个属性值为application/json。
- replyTo: 通常被用来设置回调队列的名称。
- correlationId: 用来将RPC应答消息和请求一一对应。
Correlation Id
在上面的方法中我们建议为每个RPC请求创建一个回调队列。但这种方式十分低效,幸好我们有一个更好的方式-----为每个客户端创建一个回调队列。
但这引发了一个新的问题,当我们在队列中接收到一个应答消息时,并不清楚这个消息是属于哪个请求的。这时应该使用correlationId这个属性值,我们可以为每个请求设置一个唯一值,之后当我们从回调队列接收到一个消息时,就可以查询这个属性值,基于它我们可以将以应答消息和请求相匹配。当接收到一个不认识的correlationId时,我们可以安全地丢弃这个消息,因为他不属于我们的请求。
你可能会问我们为什么要忽略回调队列中不认识的消息,而不是错误失败?这是因为在服务端存在一个竞争的可能性。虽然不太可能,但是仍有可能RPC在只发送答案而没有发送确认消息前宕机,如果发生,在RPC服务器重启时它将会重新处理这个请求。这就是为什么在客户端我们必须优雅地处理重复的应答消息,而且RPC在理想的情况下应该是幂等的。
Summary

我们的PRC服务的工作流程:
- 对于一个RPC请求,客户端发送一个携带两个属性的消息:replyTo用来为每个请求创建一个匿名的独占队列,correlationId用来为每个请求设置一个唯一的标识。
- 请求被发送到一个名为rpc_queue的队列。
- PRC服务端一直在该队列等待请求,当请求出现时,它将处理请求并发送一个消息到replyTo指定的队列,作为结果返回给客户端。
- 客户端在回复队列上等待数据,当一个消息出现时,它将检查correlationId这个属性,如果与请求的值相匹配,就会把这个应答消息返回给应用程序。
Putting it all together
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
我们创建了一个斐波那契函数,假定只有正整数为有效参数。不要期望它能处理很大的数字,因为使用的是递归实现,可能会很慢。
RPCServer.java
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties()
.builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
服务端的代码十分简单:
- 通常我们先创建连接,通道,声明队列。
- 我们可能希望运行更多的服务端程序。为了在多个服务程序间负载均衡,我们需要通过channel.basicQos来设置prefetchCount参数值。
- 我们使用basicConsume访问队列,在这个方法中我们提供一个对象(DeliverCallback)形式的回调,它将处理消息并返回应答消息。
RPCClient.java
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] args) {
try (RPCClient rpcClient = new RPCClient()) {
String number = "4";
System.out.println(" [x] Requesting fib (" + number + ")");
String response = rpcClient.call(number);
System.out.println(" [.] Got '" + response + "'");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
@Override
public void close() throws Exception {
connection.close();
}
}
客户端程序稍微复杂些:
- 我们创建一个连接和通道
- 我们的call方法创建一个真正的RPC请求
- 这里,我们首先创建一个唯一的correlationId并保存它,我们的消费者回调时将使用这个值匹配相应的应答消息
- 然后,我们创建一个专门用来回复的具有独占性的队列,并订阅这个队列
- 接下来,我们发送一个请求,携带两个参数:replyTo和correlationId
- 这时我们可以一直等待直到相应的回应消息出现
- 因为我们的消息者在另一个线程中传递处理消息,所有需在应答消息到达前阻塞主线程。使用阻塞队列是一个可行的解决方案,这里我们创建一个ArrayBlockingQueue,设置它的容量为1,因为我们只需要等待一个应答消息
- 消费者正在做一个简单的工作,对于每一个消费的应答消息,都会检查它的correlationId的值,看它是否是我们正在寻找的,若是,就将这个应答消息放到阻塞队列
- 与此同时,主线程正在等待从阻塞队列中取出应答消息
- 最后我们返回应答消息给用户
源码地址:https://github.com/Edenwds/rabbitmq_study/tree/master/rpc