在一般使用RabbitMQ做RPC很容易。用戶端發送一個請求消息然後伺服器回複一個響應消息。為了收到一個響應,我們需要發送一個'回調'的請求的隊列位址。我們可以使用預設隊列(在Java用戶端除外)。
AMQP協定給消息定義了14個屬性。大部分的屬性很少使用,除了下面幾個:
deliveryMode: 将消息标記為持久(值為2)或瞬态(任何其他值)。你可能記得在第二個教程中使用了這個屬性。
contentType:用來設定mime類型。例如經常使用的JSON格式資料,就需要将此屬性設定為:application/json。
replyTo: 通常用來命名一個回調隊列.
correlationId: 用來關聯RPC請求的響應.
RPC工作流程:

1)、用戶端啟動時,建立了一個匿名的回調隊列。
2)、在一個RPC請求中,用戶端發送一個消息,它有兩個屬性:1.REPLYTO,用來設定回調隊列名;2.correlationId,對于每個請求都被設定成唯一的值。
3)、請求被發送到rpc_queue隊列.
4)、RPC工作者(又名:伺服器)等待接收該隊列的請求。當收到一個請求,它就會處理并把結果發送給用戶端,使用的隊列是replyTo字段指定的。
5)、用戶端等待接收回調隊列中的資料。當接到一個消息,它會檢查它的correlationId屬性。如果它和設定的相比對,就會把響應傳回給應用程式。
1、RPC伺服器的RPCServer.java,接收消息調用rpc并傳回結果
[java] view plain copy
print?
package cn.slimsmart.rabbitmq.demo.rpc;
import java.security.MessageDigest;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
//RPC調用服務端
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws Exception {
//• 先建立連接配接、通道,并聲明隊列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.36.217");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//•可以運作多個伺服器程序。通過channel.basicQos設定prefetchCount屬性可将負載平均配置設定到多台伺服器上。
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//打開應答機制autoAck=false
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
String message = new String(delivery.getBody());
System.out.println(" [.] getMd5String(" + message + ")");
String response = getMd5String(message);
//傳回處理結果隊列
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes());
//發送應答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
// 模拟RPC方法 擷取MD5字元串
public static String getMd5String(String str) {
MessageDigest md5 = null;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (Exception e) {
System.out.println(e.toString());
e.printStackTrace();
return "";
char[] charArray = str.toCharArray();
byte[] byteArray = new byte[charArray.length];
for (int i = 0; i < charArray.length; i++)
byteArray[i] = (byte) charArray[i];
byte[] md5Bytes = md5.digest(byteArray);
StringBuffer hexValue = new StringBuffer();
for (int i = 0; i < md5Bytes.length; i++) {
int val = ((int) md5Bytes[i]) & 0xff;
if (val < 16)
hexValue.append("0");
hexValue.append(Integer.toHexString(val));
return hexValue.toString();
}
2.用戶端RPCClient.java,發送rpc調用消息,接收結果
//RPC調用用戶端
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
//• 先建立一個連接配接和一個通道,并為回調聲明一個唯一的'回調'隊列
connection = factory.newConnection();
channel = connection.createChannel();
//• 注冊'回調'隊列,這樣就可以收到RPC響應
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
//發送RPC請求
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
//發送請求消息,消息使用了兩個屬性:replyto和correlationId
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
//等待接收結果
//檢查它的correlationId是否是我們所要找的那個
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
return response;
public void close() throws Exception {
connection.close();
3、運作client主函數RPCMain.java
public class RPCMain {
RPCClient rpcClient = new RPCClient();
System.out.println(" [x] Requesting getMd5String(abc)");
String response = rpcClient.call("abc");
System.out.println(" [.] Got '" + response + "'");
rpcClient.close();
先運作服務端,再運作RPCMain,發送消息調用RPC。
這裡介紹的是該設計不是實作RPC服務的唯一可能,但它有一些重要的優點:
1)如果RPC伺服器速度太慢,你可以通過運作多個RPC伺服器。嘗試在一個新的控制台上運作第二RPCServer。
2)RPC用戶端隻發送和接收一個消息。不需要queueDeclare那樣要求同步調用。是以,RPC用戶端隻需要在一個網絡上發送和接收為一個單一的RPC請求。