最近阅读《RabbitMq实战指南》了解了rpc(remote procedure call 远程过程调用)的实现。下面是测试的例子:
服务端
/**
* <p>
*
* rpc服务器,1、开启队列,2、消费消息,3、把response发送到回调队列。
*
* </p>
* @author hz16092620
* @date 2018年9月16日 上午10:08:23
* @version
*/
public class RpcServer {
public static void main(String[] args) {
consumerMessage();
}
/**
* 服务端消费消息
* */
public static void consumerMessage() {
Connection conn = RabbitConnection.createConnection();//自定义的获取链接的方法
try {
String queneName = "rpc_liuhp_quene";
final Channel channel = conn.createChannel();
channel.queueDeclare(queneName, false, true, false, null);
//消费消息,推模式
channel.basicQos(10);//最多消费消息个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
StringBuilder sb = new StringBuilder();
for (byte b : body) {
sb.append((char) b);
}
System.out.println(sb.toString());
BasicProperties props = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
channel.basicPublish("", properties.getReplyTo(), props, "result".getBytes());//消费消息之后返回result
}
};
channel.basicConsume(queneName, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
//连接不关闭,一直处于打开状态
}
}
}
客户端:
/**
* <p>
*
* 客户端发送消息,然后接收消息,
*
* </p>
* @author hz16092620
* @date 2018年9月16日 上午10:27:08
* @version
*/
public class RpcClient {
public static void main(String[] args) {
createClient();
}
/**
* 交换器发送数据
* */
public static void createClient() {
Connection conn = RabbitConnection.createConnection();
Channel channel = null;
try {
channel = conn.createChannel();
String queneName = channel.queueDeclare().getQueue();
final String uuid = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties().builder().correlationId(uuid).replyTo(queneName).build();
channel.basicPublish("", "rpc_liuhp_quene", props, String.valueOf(new Random().nextInt(100)).getBytes());// 发送消息
// 接受返回的结果
// 方式一 queneingConsumer 很多的问题,已经在3.0之后遗弃了。这个暂时不研究
// QueueingConsumer consume = new QueueingConsumer(channel);
/*
* while (true) { QueueingConsumer.Delivery delivery =
* consume.nextDelivery(); if
* (delivery.getProperties().getCorrelationId().equals(uuid)) {
* System.out.println(new String(delivery.getBody())); } break; }
*/
// 方式二 defaultConsumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
while (true) {
if (properties.getCorrelationId().equals(uuid)) {
StringBuilder sb = new StringBuilder();
for (byte b : body) {
sb.append((char) b);
}
System.out.println(sb.toString());
}
break;
}
}
};
channel.basicConsume(queneName, true, consumer);
Thread.sleep(30000L);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}