天天看點

RabbitMQ實作RPC(java)

如果我們需要在遠端計算機上運作一個函數并等待結果,這種模式通常被稱為遠端過程調用或RPC。

在本教程中,我們将使用RabbitMQ建構一個RPC系統:一個用戶端和一個RPC伺服器。我們将建立一個傳回斐波那契數字的模拟RPC服務。

整個過程示意圖如下:

RabbitMQ實作RPC(java)

用戶端将請求發送至rpc_queue(我們定義的消息隊列),然後等待響應;服務端擷取請求,并處理請求,然後将請求結果傳回給隊列,用戶端得知請求被響應後擷取結果。

在結果被響應之前,用戶端是被阻塞的,主線程會等待RPC響應

如果每個RPC請求都建立一個回調隊列。這是非常低效,我們建立一個單一的用戶端回調隊列。

這引發了一個新的問題,在該隊列中收到回複時,不清楚回複屬于哪個請求。這就需要用到 correlationId屬性。我們為沒有請求設定唯一的correlationId值。然後,當我們在回調隊列中收到一條消息時,我們将擷取這個值,将響應與請求的進行correlationId比對。如果我們一緻就是我們需要的結果,否則就不是。

用戶端代RPCClient 碼如下:

package com.adtec.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {
        //建立一個連接配接和一個通道,并為回調聲明一個唯一的'回調'隊列
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
        //定義一個臨時變量的接受隊列名    
        replyQueueName = channel.queueDeclare().getQueue();
    }
    //發送RPC請求  
    public String call(String message) throws IOException, InterruptedException {
         //生成一個唯一的字元串作為回調隊列的編号
        String corrId = UUID.randomUUID().toString();
        //發送請求消息,消息使用了兩個屬性:replyto和correlationId
        //服務端根據replyto傳回結果,用戶端根據correlationId判斷響應是不是給自己的
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
                .build();

        //釋出一個消息,requestQueueName路由規則
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        //由于我們的消費者交易處理是在單獨的線程中進行的,是以我們需要在響應到達之前暫停主線程。
        //這裡我們建立的 容量為1的阻塞隊列ArrayBlockingQueue,因為我們隻需要等待一個響應。
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>();

        // String basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                //檢查它的correlationId是否是我們所要找的那個
                if (properties.getCorrelationId().equals(corrId)) {
                    //如果是,則響應BlockingQueue
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });

        return response.take();
    }

    public void close() throws IOException {
        connection.close();
    }

    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();

            System.out.println(" [x] Requesting fib(30)");
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    fibonacciRpc.close();
                } catch (IOException _ignore) {
                }
            }
        }
    }
}
           

上面的代碼中用到了阻塞隊列ArrayBlockingQueue,欲知其原理可以移步:http://www.infoq.com/cn/articles/java-blocking-queue/我覺得這個解釋還不錯。

服務端代RPCServer 碼如下:

package rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    //具體處理方法
    private static int fib(int n) {
        if (n == )
            return ;
        if (n == )
            return ;
        return fib(n - ) + fib(n - );
    }

    public static void main(String[] argv) {
         //建立連接配接、通道,并聲明隊列 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos();

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();

                    String response = "";

                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);

                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        // 傳回處理結果隊列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        //  确認消息,已經收到後面參數 multiple:是否批量.true:将一次性确認所有小于envelope.getDeliveryTag()的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            //取消自動确認
            boolean autoAck = false ;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {
                }
        }
    }
}
           

測試時先運作服務端,再運作用戶端

為了友善觀察結果,最好将用戶端和服務端在不同workspace實作

用戶端結果

RabbitMQ實作RPC(java)
服務端結果
RabbitMQ實作RPC(java)
官網執行個體位置:http://www.rabbitmq.com/tutorials/tutorial-six-java.html