天天看点

RabbitMQ学习(六):Remote procedure call说明正文

说明

通过前面五篇教程,我们已经学习了RabbitMQ的基本使用方法,了解了RabbitMQ的消息派发方式,消息的持久化,消息确认机制… 通过简单的程序,对RabbitMQ的消息传递模型有了初步了解,学习了Fanout,Direct,Topic三种类型交换机的使用,明白了消息路由的实现机制。本篇教程将继续翻译官方的第六篇教程RPC,学习如何通过RabbitMQ实现远程过程调用。

正文

在第二篇教程中,我们学习了如何使用任务队列在多个消费者中派发耗时任务。

但是,如果我们需要在远程计算上运行一个函数并且等待返回结果呢?这是一种完全不同的函数调用方式,通常这种模式被称为远程过程调用或RPC。

在本节教程中,我们将使用RabbitMQ构建一个RPC系统:包含一个客户端和一个可扩展的RPC服务。这里我们因为没有真实的耗时任务可以发送,所有创建一个返回斐波那契数列的虚拟RPC服务。

Client interface

我们将创建一个简单的客户端类来说明如何使用一个RPC服务,它将暴露一个名为call的方法,这个方法发送一个RPC请求并会一直阻塞直到接收到答案。

RPCClient rpcClient = new RPCClient();
String response = rpcClient.call(4);
System.out.println(" [.] Got '" + response + "'");
           

RPC注意事项

虽然RPC是计算机中一种常见的模式,但是它一直备受批评。当一个程序员不清楚函数调用是发生在本地还是一个缓慢的RPC调用时,就会出现问题。导致一个不能完全掌控的系统,并增加一些不必要的调试复杂性。错误的使用RPC不但不能简化软件,还会导致一些无法维护的意大利面条式的代码。

为避免这些,可以考虑以下建议:

  • 确保一个函数调用发生在本地或远程调用是明显的。
  • 系统文档化,使得各个组件间的依赖关系清晰化。
  • 处理错误情况,当一个RPC服务长时间宕机时,客户端如何处理?

如果有疑问时,尽量避免RPC。如果可能,你应该使用一个异步的管道代替阻塞式的PRC,结果将被异步地推送到下一个计算阶段。

Callback queue

使用RabbitMQ实现RPC是容易的。一个客户端发送一个请求消息,服务端回复一个应答消息。为了接收到应答消息,我们需要在发送请求时携带一个回调队列的地址。我们可以使用默认队列(在java客户端中它是被独占的)。

String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties properties = new AMQP.BasicProperties()
        .builder()
        .correlationId(corrId)
        .replyTo(replyQueueName)
        .build();

channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));
           

Message properties

AMQP 0-9-1协议预先为消息制定了一组包含14个属性,除了以下列出的属性,其他属性都很少使用:

  • deliveryMode: 标记消息是否持久化,在第二个教程中我们使用过这个属性。
  • contentType: 用来描述编码的mime类型。例如 我们经常使用的JSON编码,我们可以设置这个属性值为application/json。
  • replyTo: 通常被用来设置回调队列的名称。
  • correlationId: 用来将RPC应答消息和请求一一对应。

Correlation Id

在上面的方法中我们建议为每个RPC请求创建一个回调队列。但这种方式十分低效,幸好我们有一个更好的方式-----为每个客户端创建一个回调队列。

但这引发了一个新的问题,当我们在队列中接收到一个应答消息时,并不清楚这个消息是属于哪个请求的。这时应该使用correlationId这个属性值,我们可以为每个请求设置一个唯一值,之后当我们从回调队列接收到一个消息时,就可以查询这个属性值,基于它我们可以将以应答消息和请求相匹配。当接收到一个不认识的correlationId时,我们可以安全地丢弃这个消息,因为他不属于我们的请求。

你可能会问我们为什么要忽略回调队列中不认识的消息,而不是错误失败?这是因为在服务端存在一个竞争的可能性。虽然不太可能,但是仍有可能RPC在只发送答案而没有发送确认消息前宕机,如果发生,在RPC服务器重启时它将会重新处理这个请求。这就是为什么在客户端我们必须优雅地处理重复的应答消息,而且RPC在理想的情况下应该是幂等的。

Summary

RabbitMQ学习(六):Remote procedure call说明正文

我们的PRC服务的工作流程:

  • 对于一个RPC请求,客户端发送一个携带两个属性的消息:replyTo用来为每个请求创建一个匿名的独占队列,correlationId用来为每个请求设置一个唯一的标识。
  • 请求被发送到一个名为rpc_queue的队列。
  • PRC服务端一直在该队列等待请求,当请求出现时,它将处理请求并发送一个消息到replyTo指定的队列,作为结果返回给客户端。
  • 客户端在回复队列上等待数据,当一个消息出现时,它将检查correlationId这个属性,如果与请求的值相匹配,就会把这个应答消息返回给应用程序。

Putting it all together

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n - 1) + fib(n - 2);
}
           

我们创建了一个斐波那契函数,假定只有正整数为有效参数。不要期望它能处理很大的数字,因为使用的是递归实现,可能会很慢。

RPCServer.java

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties()
                        .builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }

            };
            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
           

服务端的代码十分简单:

  • 通常我们先创建连接,通道,声明队列。
  • 我们可能希望运行更多的服务端程序。为了在多个服务程序间负载均衡,我们需要通过channel.basicQos来设置prefetchCount参数值。
  • 我们使用basicConsume访问队列,在这个方法中我们提供一个对象(DeliverCallback)形式的回调,它将处理消息并返回应答消息。

RPCClient.java

public class RPCClient implements AutoCloseable {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] args) {
        try (RPCClient rpcClient = new RPCClient()) {
            String number = "4";
            System.out.println(" [x] Requesting fib (" + number + ")");
            String response = rpcClient.call(number);
            System.out.println(" [.] Got '" + response + "'");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {});

        String result = response.take();
        channel.basicCancel(ctag);
        return result;

    }

    @Override
    public void close() throws Exception {
        connection.close();
    }
}

           

客户端程序稍微复杂些:

  • 我们创建一个连接和通道
  • 我们的call方法创建一个真正的RPC请求
  • 这里,我们首先创建一个唯一的correlationId并保存它,我们的消费者回调时将使用这个值匹配相应的应答消息
  • 然后,我们创建一个专门用来回复的具有独占性的队列,并订阅这个队列
  • 接下来,我们发送一个请求,携带两个参数:replyTo和correlationId
  • 这时我们可以一直等待直到相应的回应消息出现
  • 因为我们的消息者在另一个线程中传递处理消息,所有需在应答消息到达前阻塞主线程。使用阻塞队列是一个可行的解决方案,这里我们创建一个ArrayBlockingQueue,设置它的容量为1,因为我们只需要等待一个应答消息
  • 消费者正在做一个简单的工作,对于每一个消费的应答消息,都会检查它的correlationId的值,看它是否是我们正在寻找的,若是,就将这个应答消息放到阻塞队列
  • 与此同时,主线程正在等待从阻塞队列中取出应答消息
  • 最后我们返回应答消息给用户

源码地址:https://github.com/Edenwds/rabbitmq_study/tree/master/rpc