天天看點

RabbitMQ入門(6)--遠端調用

摘要 實作遠端調用

RabbitMQ RabbitMQ入門

目錄[-]

  • 遠端過程調用(RPC)
  • (使用Java用戶端)
  • 使用者接口
  • 回收隊列
  • 相關性ID (原:Correlation Id)
  • 摘要
  • 把所有的放在一起

在指南的第二部分,我們學習了如何使用工作隊列将耗時的任務分布到多個工作者中。

但是假如我們需要調用遠端計算機的函數,等待結果呢?好吧,這又是另一個故事了。這模式通常被稱為遠端過程調用或RPC。

在這部分,我們将會使用RabbitMQ建構一個RPC系統:一個用戶端和一個可擴充的RPC伺服器。由于我們還沒有值得分散的耗時任務,我們将會建立一個虛拟的RPC服務,用來傳回Fibonacci(斐波納契數列)。

為了說明RPC服務如何使用,我們将會建立一個簡單德用戶端類。它會暴露一個叫

call

的方法,用來發送一個RPC請求,在響應回複之前都會一直阻塞:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);      

RPC方面的注意 

雖然RPC在電腦運算方面是一個十分普通的模式,但是它依舊常常受批判的。 

如果一個程式員沒有意識到函數

call

是本地的還是一個遲鈍的RPC。這結果是不可預知的很讓你困惑的,并且會增加不必要的複雜調試。與簡化軟體相反,誤用RPC會導緻不可維護的意大利面條代碼(譯者注:原文是spaghetti code可能形容代碼很長很亂)。

思想中煎熬,考慮下接下來的建議: 

確定明顯區分哪個是函數

call

是本地調用的,哪個是遠端調用的。

給你的系統加上文檔,讓元件之間的依賴項清晰可見的。

處理錯誤事件。當RPC伺服器很久沒有響應了,用戶端應該如何響應? 

當關于RPC的所有疑問消除,在你可以的情況下,你應該使用一個異步的管道,代替RPC中阻塞,結果會異步的放入接下來的計算平台。

一般來說在RabbitMQ上做RPC是容易的。一個用戶端發送一個請求消息,一個伺服器傳回響應消息。為了接受到響應,我們需要再請求中帶上一個

callback

隊列的位址。我們可以使用預設隊列(那個在Java用戶端上市獨占的)。讓我們試一下:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...      

消息屬性 

這AMQP協定預先确定了消息中的14個屬性。他們大多數屬性很少使用,除了下面這些例外: 

deliveryMode

:将一個消息标記為持久化(值為2)或者瞬态的(其他值)。你可能從第二部分中記起這個屬性。 

contentType

:用來描述媒體類型的編碼。例如常常使用的JSON編碼,這是一個好的慣例,設定這個屬性為:

application/json

。 

replyTo

:通常來命名回收隊列的名字。 

correlationId

:對RPC加速響應請求是很有用的。

我們需要這個新的引用:

import com.rabbitmq.client.AMQP.BasicProperties;      

在目前方法中我們建議為每一個RPC請求建立一個回收隊列。這個效率十分低下的,但幸運的是有一個更好的方式- 讓我們為每一個用戶端建立一個單一的回收隊列。 

這樣又出現了新的問題,沒有清晰的判斷隊列中的響應是屬于哪個請求的。這個時候

coorrelationId

屬性發揮了作用。我們将每個請求的這個屬性設定為唯一值。以後當我們在回收隊列中接收消息時,我們将會檢視這個屬性,依據這個屬性值,我們是能将每個響應比對的對應的請求上。如果我們遇見個未知的

correlationId

值,我們可以安全的丢棄這個消息-因為它不屬于任何一個我們的請求。

你可能會問,為什麼我們要忽略哪些在回收隊列中未知的消息,而不是以一個錯誤結束?因為在伺服器竟态條件下,這種情況是可能的。RPC伺服器發送給我們答應之後,在發送一個确認消息之前,就死掉了,雖然這種可能性不大,但是它依舊存在可能。如果這事情發生了,RPC伺服器重新開機之後,将會再一次處理請求。這就是為什麼我們要溫和地處理重複的響應,這RPC理想情況下是幂等的。

RabbitMQ入門(6)--遠端調用

我們的RPC将會像這樣工作:

當用戶端啟動,它會建立一個匿名的獨占的回收隊列。 

對于一個RPC請求,用戶端會發送一個消息中有兩個屬性:

replyTo

,要發送的的回收隊列和

correlationId

,對于每一個請求都是唯一值。 

這請求發送到

rpc_queue

隊列中。 

這RPC工作者(亦稱:伺服器)等候隊列中的請求。當請求出現,它處理這工作并發送攜帶結果的資訊到用戶端,使用的隊列是消息屬性

replTo

中的那個。 

用戶端等待回收隊列中的資料。當一個消息出現,它會檢查

correlationId

屬性。如果它符合請求中的值,它會傳回這響應給應用程式。

斐波那契任務:

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}      

我們聲明我們的斐波那契函數。它假定一個合法的正整數做為輸入參數。(不要期望這個可以處理大量數字,它可能是最慢的遞歸實作了)。 

我們的RPC伺服器

RPCServer.java

的代碼:

private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}      

這伺服器代碼是相當簡單明了的: 

如往常一樣,我們開始建立連接配接,通道和聲明隊列。 

我們可能想運作不止一個伺服器程序。為了均衡的負載到多個伺服器上,我們需要設定

channel.basicQos

中的

prefetchCount

屬性。 

我們使用

basicConsume

通路隊列。然後進入

while

循環,我們等待請求消息,處理工作,發送響應。

我們RPC用戶端

RPCClient.java

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery =consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response; 
}

public void close() throws Exception {
    connection.close();
}      

The client code is slightly more involved: 

這用戶端代碼是更加清晰: 

我們建立一個連接配接和通道并且聲明一個獨占的

callback

隊列用來等待答複。 

我們訂閱這個

callback

隊列,以便于我們可以接收到RPC響應。 

我們的

call

方法做這真正的RPC請求。 

接着,我們首次生成一個唯一的

correlationId

數字并且儲存它,在循環中使用這個值找到合适的響應。 

接下來,我們釋出請求消息,帶着兩個屬性:

replyTo

correlationId

這時候,我們可以坐下來,等着合适的響應抵達。 

這循環中做了個簡單德工作,檢查每一個響應消息中

correlationId

值,是否是它要尋找的。如果是,它會儲存這響應。 

最終,我們把響應傳回給使用者。

制造用戶端請求:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");   
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();      

現在是時候讓我們回顧下我們RPCClient.java和RPCServer.java中的全部例子的源碼(包含基本的異常處理)。 

編譯和如往常一樣建立類路徑(看指南的第一部分)

$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java      

我們的RPC服務現在準備好了,我們啟動着伺服器:

$ java -cp $CP RPCServer
 [x] Awaiting RPC requests      

為了請求一個斐波那契數字,運作用戶端:

$ java -cp $CP RPCClient
 [x] Requesting fib(30)      

現在的設計不僅僅可以實作一個RPC服務,并且它還有幾項重要的優勢: 

如果RPC伺服器反應太遲緩,你可以通過運作另一個程式來擴充。試着通過一個新的控制平台來運作第二個RPC伺服器。在用戶端這邊,RPC要求僅發送和接收一個消息。像

queueDeclare

非同步調用是被要求的。是以,RPC用戶端僅僅需要一個網絡循環的單一RPC請求。