最近閱讀《RabbitMq實戰指南》了解了rpc(remote procedure call 遠端過程調用)的實作。下面是測試的例子:
服務端
/**
* <p>
*
* rpc伺服器,1、開啟隊列,2、消費消息,3、把response發送到回調隊列。
*
* </p>
* @author hz16092620
* @date 2018年9月16日 上午10:08:23
* @version
*/
public class RpcServer {
public static void main(String[] args) {
consumerMessage();
}
/**
* 服務端消費消息
* */
public static void consumerMessage() {
Connection conn = RabbitConnection.createConnection();//自定義的擷取連結的方法
try {
String queneName = "rpc_liuhp_quene";
final Channel channel = conn.createChannel();
channel.queueDeclare(queneName, false, true, false, null);
//消費消息,推模式
channel.basicQos(10);//最多消費消息個數
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
StringBuilder sb = new StringBuilder();
for (byte b : body) {
sb.append((char) b);
}
System.out.println(sb.toString());
BasicProperties props = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
channel.basicPublish("", properties.getReplyTo(), props, "result".getBytes());//消費消息之後傳回result
}
};
channel.basicConsume(queneName, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
//連接配接不關閉,一直處于打開狀态
}
}
}
用戶端:
/**
* <p>
*
* 用戶端發送消息,然後接收消息,
*
* </p>
* @author hz16092620
* @date 2018年9月16日 上午10:27:08
* @version
*/
public class RpcClient {
public static void main(String[] args) {
createClient();
}
/**
* 交換器發送資料
* */
public static void createClient() {
Connection conn = RabbitConnection.createConnection();
Channel channel = null;
try {
channel = conn.createChannel();
String queneName = channel.queueDeclare().getQueue();
final String uuid = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties().builder().correlationId(uuid).replyTo(queneName).build();
channel.basicPublish("", "rpc_liuhp_quene", props, String.valueOf(new Random().nextInt(100)).getBytes());// 發送消息
// 接受傳回的結果
// 方式一 queneingConsumer 很多的問題,已經在3.0之後遺棄了。這個暫時不研究
// QueueingConsumer consume = new QueueingConsumer(channel);
/*
* while (true) { QueueingConsumer.Delivery delivery =
* consume.nextDelivery(); if
* (delivery.getProperties().getCorrelationId().equals(uuid)) {
* System.out.println(new String(delivery.getBody())); } break; }
*/
// 方式二 defaultConsumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
while (true) {
if (properties.getCorrelationId().equals(uuid)) {
StringBuilder sb = new StringBuilder();
for (byte b : body) {
sb.append((char) b);
}
System.out.println(sb.toString());
}
break;
}
}
};
channel.basicConsume(queneName, true, consumer);
Thread.sleep(30000L);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}