天天看點

(轉) RabbitMQ學習之遠端過程調用(RPC)(java)

在一般使用RabbitMQ做RPC很容易。用戶端發送一個請求消息然後伺服器回複一個響應消息。為了收到一個響應,我們需要發送一個'回調'的請求的隊列位址。我們可以使用預設隊列(在Java用戶端除外)。

AMQP協定給消息定義了14個屬性。大部分的屬性很少使用,除了下面幾個:

  deliveryMode: 将消息标記為持久(值為2)或瞬态(任何其他值)。你可能記得在第二個教程中使用了這個屬性。

  contentType:用來設定mime類型。例如經常使用的JSON格式資料,就需要将此屬性設定為:application/json。

  replyTo: 通常用來命名一個回調隊列.

  correlationId: 用來關聯RPC請求的響應.

RPC工作流程:

(轉) RabbitMQ學習之遠端過程調用(RPC)(java)

1)、用戶端啟動時,建立了一個匿名的回調隊列。

2)、在一個RPC請求中,用戶端發送一個消息,它有兩個屬性:1.REPLYTO,用來設定回調隊列名;2.correlationId,對于每個請求都被設定成唯一的值。

3)、請求被發送到rpc_queue隊列.

4)、RPC工作者(又名:伺服器)等待接收該隊列的請求。當收到一個請求,它就會處理并把結果發送給用戶端,使用的隊列是replyTo字段指定的。

5)、用戶端等待接收回調隊列中的資料。當接到一個消息,它會檢查它的correlationId屬性。如果它和設定的相比對,就會把響應傳回給應用程式。

1、RPC伺服器的RPCServer.java,接收消息調用rpc并傳回結果

[java] view plain copy

 print?

package cn.slimsmart.rabbitmq.demo.rpc;  

import java.security.MessageDigest;  

import com.rabbitmq.client.AMQP;  

import com.rabbitmq.client.AMQP.BasicProperties;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.ConnectionFactory;  

import com.rabbitmq.client.QueueingConsumer;  

//RPC調用服務端  

public class RPCServer {  

    private static final String RPC_QUEUE_NAME = "rpc_queue";  

    public static void main(String[] args) throws Exception {  

        //• 先建立連接配接、通道,并聲明隊列  

        ConnectionFactory factory = new ConnectionFactory();  

        factory.setHost("192.168.36.217");  

        factory.setUsername("admin");  

        factory.setPassword("admin");  

        factory.setPort(AMQP.PROTOCOL.PORT);  

        Connection connection = factory.newConnection();  

        Channel channel = connection.createChannel();  

        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  

        //•可以運作多個伺服器程序。通過channel.basicQos設定prefetchCount屬性可将負載平均配置設定到多台伺服器上。  

        channel.basicQos(1);  

        QueueingConsumer consumer = new QueueingConsumer(channel);  

        //打開應答機制autoAck=false  

        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  

        System.out.println(" [x] Awaiting RPC requests");  

        while (true) {  

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

            BasicProperties props = delivery.getProperties();  

            BasicProperties replyProps = new BasicProperties.Builder()  

                    .correlationId(props.getCorrelationId()).build();  

            String message = new String(delivery.getBody());  

            System.out.println(" [.] getMd5String(" + message + ")");  

            String response = getMd5String(message);  

            //傳回處理結果隊列  

            channel.basicPublish("", props.getReplyTo(), replyProps,  

                    response.getBytes());  

            //發送應答   

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

        }  

    }  

    // 模拟RPC方法 擷取MD5字元串  

    public static String getMd5String(String str) {  

        MessageDigest md5 = null;  

        try {  

            md5 = MessageDigest.getInstance("MD5");  

        } catch (Exception e) {  

            System.out.println(e.toString());  

            e.printStackTrace();  

            return "";  

        char[] charArray = str.toCharArray();  

        byte[] byteArray = new byte[charArray.length];  

        for (int i = 0; i < charArray.length; i++)  

            byteArray[i] = (byte) charArray[i];  

        byte[] md5Bytes = md5.digest(byteArray);  

        StringBuffer hexValue = new StringBuffer();  

        for (int i = 0; i < md5Bytes.length; i++) {  

            int val = ((int) md5Bytes[i]) & 0xff;  

            if (val < 16)  

                hexValue.append("0");  

            hexValue.append(Integer.toHexString(val));  

        return hexValue.toString();  

}  

2.用戶端RPCClient.java,發送rpc調用消息,接收結果

//RPC調用用戶端  

public class RPCClient {  

    private Connection connection;  

    private Channel channel;  

    private String requestQueueName = "rpc_queue";  

    private String replyQueueName;  

    private QueueingConsumer consumer;  

    public RPCClient() throws Exception {  

        //• 先建立一個連接配接和一個通道,并為回調聲明一個唯一的'回調'隊列  

        connection = factory.newConnection();  

        channel = connection.createChannel();  

        //• 注冊'回調'隊列,這樣就可以收到RPC響應  

        replyQueueName = channel.queueDeclare().getQueue();  

        consumer = new QueueingConsumer(channel);  

        channel.basicConsume(replyQueueName, true, consumer);  

    //發送RPC請求  

    public String call(String message) throws Exception {  

        String response = null;  

        String corrId = java.util.UUID.randomUUID().toString();  

        //發送請求消息,消息使用了兩個屬性:replyto和correlationId  

        BasicProperties props = new BasicProperties.Builder()  

                .correlationId(corrId).replyTo(replyQueueName).build();  

        channel.basicPublish("", requestQueueName, props, message.getBytes());  

        //等待接收結果  

            //檢查它的correlationId是否是我們所要找的那個  

            if (delivery.getProperties().getCorrelationId().equals(corrId)) {  

                response = new String(delivery.getBody());  

                break;  

            }  

        return response;  

    public void close() throws Exception {  

        connection.close();  

3、運作client主函數RPCMain.java

public class RPCMain {  

        RPCClient rpcClient = new RPCClient();  

        System.out.println(" [x] Requesting getMd5String(abc)");     

        String response = rpcClient.call("abc");  

        System.out.println(" [.] Got '" + response + "'");  

        rpcClient.close();  

先運作服務端,再運作RPCMain,發送消息調用RPC。

這裡介紹的是該設計不是實作RPC服務的唯一可能,但它有一些重要的優點:

1)如果RPC伺服器速度太慢,你可以通過運作多個RPC伺服器。嘗試在一個新的控制台上運作第二RPCServer。

2)RPC用戶端隻發送和接收一個消息。不需要queueDeclare那樣要求同步調用。是以,RPC用戶端隻需要在一個網絡上發送和接收為一個單一的RPC請求。