java-rabbitmq-官網執行個體06
描述:
使用兩個互通的隊列,模拟 RPC 調用
運作:
D6_RPCClient.main();
D6_RPCServer.main();
package com.example.tutorials;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* 使用兩個隊列,互相接收對方的消息。使用 AMQP.BasicProperties 傳輸唯一"互聯ID",實作RPC請求
* rpc 調用,轉換為大寫
* @create 2017-08-30
* amqp-client 4.2.0
**/
public class D6_RPCClient {
public static void main(String[] argv) {
D6_RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new D6_RPCClient();
//生成一個,互聯ID
final String corrId = UUID.randomUUID().toString();
//發送消息
System.out.println("輸入要發送的内容,退出輸入 x ");
String message ;
while(true){
Scanner scanner = new Scanner(System.in);
message = scanner.next();
if("x".equals(message))
break;
System.out.println(" [請求] 内容 = "+message);
response = fibonacciRpc.call(message,corrId);
System.out.println(" [響應] 内容 = " + response );
}
}
catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
finally {
if (fibonacciRpc!= null) {
try {
fibonacciRpc.close();
}
catch (IOException _ignore) {}
}
}
}
private Connection connection;
private Channel channel;
/**
* 由 client 發送消息到 "rpc_queue" 隊列,server 消費消息,之後 server 将響應寫入"應答隊列"
*/
private String requestQueueName = "rpc_queue";
/**
* 應答隊列
*/
private String replyQueueName;
/**
* 阻塞隊列
*/
private final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
/**
* rpc 用戶端執行個體
* @throws IOException
* @throws TimeoutException
*/
public D6_RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//設定登入賬号
factory.setHost(ServerInfo.host);
factory.setPort(ServerInfo.port);
factory.setUsername(ServerInfo.uName);
factory.setPassword(ServerInfo.uPwd);
//連結伺服器
connection = factory.newConnection();
channel = connection.createChannel();
//聲明一個私有排他、自動删除的隊列
replyQueueName = channel.queueDeclare().getQueue();
System.out.println(" [x]應答隊列名:"+replyQueueName);
}
/**
* 調用
* @param message 消息内容
* @param corrId 關聯id
* @return
* @throws IOException
* @throws InterruptedException
*/
public String call(String message,final String corrId) throws IOException, InterruptedException {
//消息的屬性,路由報頭等等
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId) //互聯ID
.replyTo(replyQueueName) //應答隊列
.build();
//釋出送消息
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//消費者,接收"應答隊列"消息
boolean autoAck=true;//自動應答
channel.basicConsume(replyQueueName, autoAck, new DefaultConsumer(channel) {
@Override
public void handleConsumeOk(String consumerTag) {
super.handleConsumeOk(consumerTag);
System.out.println(" [接收應答消息] 已經注冊成功! ");
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(String.format(
" [響應隊列] 請求corrId = %s; 響應corrId = %s; replyTo = %s;"
,corrId
,properties.getCorrelationId()
,properties.getReplyTo()));
if (properties.getCorrelationId().equals(corrId)) {
//将元素e插入到隊列末尾,如果插入成功,則傳回true;如果插入失敗(即隊列已滿),則傳回false;
String str=new String(body, "UTF-8");
boolean wSuccess = response.offer(str);
System.out.println(
" [接收應答消息] 已收到響應,并寫入阻塞隊列("+wSuccess+",size="+response.size()+");響應内容="+str);
}
}
});
//使用阻塞隊列,在 handleDelivery 沒有接收到回調消息時一直處于阻塞狀态
String result=response.take();
System.out.println(" [接收應答消息] 阻塞隊列已經擷取到内容! ");
return result;
}
public void close() throws IOException {
connection.close();
}
}
package com.example.tutorials;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 使用兩個隊列,互相接收對方的消息。使用 AMQP.BasicProperties 傳輸唯一"互聯ID",實作RPC請求
* rpc 調用,轉換為大寫
* @create 2017-08-30
* amqp-client 4.2.0
**/
public class D6_RPCServer {
/**
* 由 client 發送消息到 "rpc_queue" 隊列,server 消費消息,之後 server 将響應寫入"應答隊列"
*/
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) throws TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//設定登入賬号
factory.setHost(ServerInfo.host);
factory.setPort(ServerInfo.port);
factory.setUsername(ServerInfo.uName);
factory.setPassword(ServerInfo.uPwd);
Connection connection = null;
try {
//連結伺服器
connection = factory.newConnection();
final Channel channel = connection.createChannel();
//定義一個隊列
boolean duiable=false;//持久化
boolean exclusive = false;//排他隊列
boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
channel.queueDeclare(RPC_QUEUE_NAME, duiable, exclusive, autoDelete, null);
//輪詢分發消息
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
//建立消費者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleConsumeOk(String consumerTag) {
super.handleConsumeOk(consumerTag);
System.out.println(" [x] 已經注冊成功! ");
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String corrId =properties.getCorrelationId();
//消息的屬性,路由報頭等等
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)//互聯ID
.build();
System.out.println("=====================");
System.out.println(String.format(" [響應隊列] corrId = %s;replyTo = %s;"
,corrId
,replyProps.getReplyTo()));
//響應内容
String response = "";
try {
//接收到的消息
String message = new String(body,"UTF-8");
//int n = Integer.parseInt(message);
System.out.println(" [接收到] 内容 = " + message );
response += getMsg(message);//計算斐波那契數列
System.out.println(" [接收到] 計算結果 = "+response);
}
catch (RuntimeException e){
System.out.println(" [異常] " + e.toString());
}
finally {
//發送消息到應答隊列
channel.basicPublish( "" //交換器
, properties.getReplyTo() //路由鍵
, replyProps //消息屬性
, response.getBytes("UTF-8"));
System.out.println(" [響應到] " +
String.format("ReplyTo = %s;消息:%s"
,properties.getReplyTo()
,response));
//手動應答
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println(" [響應到] " +
String.format("消息ID,basicAck = %s;"
,envelope.getDeliveryTag()));
}
}
};
//啟動一個消費者,監聽 RPC_QUEUE_NAME 隊列
boolean autoAck=false;//自動應答
channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
//loop to prevent reaching finally block
while(true) {
try {
Thread.sleep(100);
} catch (InterruptedException _ignore) {
System.out.println(" [loop] exception," + _ignore.getMessage());
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {}
}
}
/**
* 轉換為大寫
* @param
* @return
*/
private static String getMsg(String str) {
return String.format("%s => %s",str,str.toUpperCase());
}
}