天天看點

RabbitMQ(六):回調隊列callback queue、關聯辨別correlation id、實作簡單的RPC系統

部落格翻譯自:RabbitMQ Tutorials Java版

RabbitMQ(一):Hello World程式

RabbitMQ(二):Work Queues、循環分發、消息确認、持久化、公平分發

RabbitMQ(三):Exchange交換器--fanout

RabbitMQ(四):Exchange交換器--direct

RabbitMQ(五):Exchange交換器--topic

RabbitMQ(六):回調隊列callback queue、關聯辨別correlation id、實作簡單的RPC系統

RabbitMQ(七):常用方法說明 與 學習小結

遠端過程調用(RPC):

在第二篇部落格中,我們學會了如何使用工作隊列将耗時的任務分發給多個工作者。但假如我們想調用遠端電腦上的一個函數(或方法)并等待函數執行的結果,這時候該怎麼辦呢?好吧,這是一個不同的故事。這種模式通常稱為遠端過程調用RPC(

Remote Procedure Call

)。

在今天的教程中,我們将會使用RabbitMQ來建立一個RPC系統:一個用戶端和一個可擴充的RPC服務端。因為我們沒有任何現成的耗時任務,我們将會建立一個假的RPC服務,它将傳回斐波那契數(

Fibonacci numbers

)。

用戶端接口(Client interface):

為了示範如何使用RPC服務,我們将建立一個簡單的用戶端類。它負責暴露一個名為

call

的方法,該方法将發送一個RPC請求并阻塞,直到接收到回答。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
           

回調隊列(Callback queue):

使用RabbitMQ來做RPC很容易。用戶端發送一個請求消息,服務端以一個響應消息回應。為了可以接收到響應,需要與請求(消息)一起,發送一個回調的隊列。我們使用預設的隊列(Java獨有的):

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...
           

消息屬性

AMQP 0-9-1協定預定義了消息的14種屬性。大部分屬性都很少用到,除了下面的幾種:

  • deliveryMode

    :标記一個消息是持久的(值為2)還是短暫的(2以外的任何值),你可能還記得我們的第二個教程中用到過這個屬性。
  • contentType

    :描述編碼的

    mime-type

    mime-type of the encoding

    )。比如最常使用

    JSON

    格式,就可以将該屬性設定為

    application/json

  • ③ 

    replyTo

    :通常用來命名一個回調隊列。
  • ④ 

    correlationId

    :用來關聯RPC的響應和請求。

我們需要引入一個新的類:

import com.rabbitmq.client.AMQP.BasicProperties;
           

關聯辨別(Correlation Id):

在上面的方法中,我們為每一個RPC請求都建立了一個新的回調隊列。這樣做顯然很低效,但幸好我們有更好的方式:讓我們為每一個用戶端建立一個回調隊列。

這樣做又引入了一個新的問題,在回調隊列中收到響應後不知道到底是屬于哪個請求的。這時候,

CorrelationId

就可以派上用場了。對每一個請求,我們都建立一個唯一性的值作為

CorrelationId

。之後,當我們從回調隊列中收到消息的時候,就可以查找這個屬性,基于這一點,我們就可以将一個響應和一個請求進行關聯。如果我們看到一個不知道的 

CorrelationId

值,我們就可以安全地丢棄該消息,因為它不屬于我們的請求。

你可能會問,為什麼要忽視回調隊列中的不知道的消息,而不是直接以一個錯誤失敗(failing with an error)。這是由于服務端可能存在的競争條件。盡管不會,但這種情況仍有可能發生:RPC服務端在發給我們答案之後就挂掉了,還沒來得及為請求發送一個确認資訊。如果發生這種情況,重新開機後的RPC服務端将會重新處理該請求(因為沒有給RabbitMQ發送确認消息,RabbitMQ會重新發送消息給RPC服務)。這就是為什麼我們要在用戶端優雅地處理重複響應,并且理想情況下,RPC服務要是幂等的。

總結:

RabbitMQ(六):回調隊列callback queue、關聯辨別correlation id、實作簡單的RPC系統

我們的RPC系統的工作流程如下:

當用戶端啟動後,它會建立一個異步的獨特的回調隊列。對于一個RPC請求,用戶端将會發送一個配置了兩個屬性的消息:一個是

replyTo

屬性,設定為這個回調隊列;另一個是

correlation id

屬性,每一個請求都會設定為一個具有唯一性的值。這個請求将會發送到

rpc_queue

隊列。

RPC工作者(即圖中的

server

)将會等待

rpc_queue

隊列的請求。當有請求到來時,它就會開始幹活(計算斐波那契數)并将結果通過發送消息來傳回,該傳回消息發送到

replyTo

指定的隊列。

用戶端将等待回調隊列傳回資料。當傳回的消息到達時,它将檢查

correlation id

屬性。如果該屬性值和請求比對,就将響應傳回給程式。

放在一塊:

計算斐波那契數的任務如下:

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}
           

我們定義了斐波那契函數,它假設隻會輸入正整數(不要期望該函數在輸入很大的數的時候可以好好工作,它可能是最慢的遞歸實作)。

RPC服務

RPCServer.java

的代碼如下:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    //模拟的耗時任務,即計算斐波那契數
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
        //建立連接配接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();

            //聲明隊列
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            //一次隻從隊列中取出一個消息
            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            //監聽消息(即RPC請求)
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();

                    //收到RPC請求後開始處理
                    String response = "";
                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);
                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        //處理完之後,傳回響應(即釋出消息)
                        System.out.println("[server current time] : " + System.currentTimeMillis());
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

            //loop to prevent reaching finally block
            while (true) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException _ignore) {
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {
                }
        }
    }
}
           

RPC服務的代碼很直白:

  • (1)開始先建立連接配接、通道并聲明隊列。
  • (2)我們可能會運作多個服務程序,為了負載均衡我們通過設定 

    prefetchCount =1

    将任務分發給多個服務程序
  • (3)我們使用了

    basicConsume

    來連接配接隊列,并通過一個

    DefaultConsumer

    對象提供回調。這個

    DefaultConsumer

    對象将進行工作并傳回響應。

我們的RPC用戶端

RPCClient

代碼如下:

package com.maxwell.rabbitdemo;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    //定義一個RPC用戶端
    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
    }

    //真正地請求
    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    System.out.println("[client current time] : " + System.currentTimeMillis());
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });

        return response.take();
    }

    //關閉連接配接
    public void close() throws IOException {
        connection.close();
    }

    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            //建立一個RPC用戶端
            fibonacciRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(30)");
            //RPC用戶端發送調用請求,并等待影響,直到接收到
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    //關閉RPC客戶的連接配接
                    fibonacciRpc.close();
                } catch (IOException _ignore) {
                }
            }
        }
    }
}
           

用戶端代碼看起來有一些複雜:

  • (1)建立連接配接和通道,并聲明了一個獨特的回調隊列。
  • (2)訂閱這個回調隊列,是以我們可以接收RPC響應。
  • (3)call方法執行RPC請求。在call方法中,我們首先生成一個具有唯一性的

    correlationId

    值并存在變量

    corrId

    中。我們的

    DefaultConsumer

    中的實作方法

    handleDelivery

    會使用這個值來擷取争取的響應。然後,我們釋出了這個請求消息,并設定了

    replyTo

    correlationId

    這兩個屬性。好了,現在我們可以坐下來耐心等待響應到來了。
  • (4)由于我們的消費者處理(指

    handleDelivery

    方法)是在子線程進行的,是以我們需要在響應到來之前暫停主線程(否則主線程結束了,子線程接收到了影響傳給誰啊)。使用

    BlockingQueue

    是一種解決方案。在這裡我們建立了一個阻塞隊列

    ArrayBlockingQueue

    并将它的容量設為1,因為我們隻需要接受一個響應就可以啦。

    handleDelivery

    方法所做的很簡單,當有響應來的時候,就檢查是不是和

    correlationId

    比對,比對的話就放到阻塞隊列

    ArrayBlockingQueue

    中。
  • 同時,主線程正等待影響。
  • (5)最終将影響傳回給使用者了。

現在,可以動手實驗了。首先,執行RPC服務端,讓它等待請求的到來。

[x] Awaiting RPC requests
           

然後,執行RPC用戶端,即

RPCClient

中的

main

方法,發起請求:

[x] Requesting fib(30)
[client current time] : 1500474305838
 [.] Got '832040'
           

可以看到,用戶端很快就接受到了請求,回頭看RPC服務端的時間:

[.] fib(30)
[server current time] : 1500474305835
           

上面這種設計并不是RPC服務端的唯一實作,但是它有以下幾個重要的優勢:

  • ① 如果RPC服務端很慢,你可以通過運作多個執行個體就可以實作擴充。
  • ② 在RPC用戶端,RPC要求發送和接受一個消息。非同步的方法

    queueDeclare

    是必須的。這樣,RPC用戶端隻需要為一個RPC請求隻進行一次網絡往返。

但我們的代碼仍然太簡單,并沒有處理更複雜但也非常重要的問題,像:

  • ① 如果沒有服務端在運作,用戶端該怎麼辦
  • ② 用戶端應該為一次RPC設定逾時嗎
  • ③ 如果服務端發生故障并抛出異常,它還應該傳回給用戶端嗎?
  • ④ 在處理消息前,先通過邊界檢查、類型判斷等手段過濾掉無效的消息等

說明:

①與原文略有出入,如有疑問,請參閱原文

②原文均是編譯後通過javacp指令直接運作程式,我是在IDE中進行的,相應的操作做了修改。

③添加了用戶端和服務端執行時間。