天天看点

rabbitmq实现RPC实例

最近阅读《RabbitMq实战指南》了解了rpc(remote procedure call 远程过程调用)的实现。下面是测试的例子:

服务端

/**
 * <p>
 * 
 * rpc服务器,1、开启队列,2、消费消息,3、把response发送到回调队列。
 *
 * </p>
 * @author	hz16092620 
 * @date	2018年9月16日 上午10:08:23
 * @version      
 */
public class RpcServer {
    
    public static void main(String[] args) {
	consumerMessage();
    }
    
    /**
     * 服务端消费消息
     * */
    public static void consumerMessage() {
	Connection conn = RabbitConnection.createConnection();//自定义的获取链接的方法
	try {
	    String queneName = "rpc_liuhp_quene";
	    final Channel channel = conn.createChannel();
	    channel.queueDeclare(queneName, false, true, false, null);
	   
	    
	    //消费消息,推模式
	    channel.basicQos(10);//最多消费消息个数
	    Consumer consumer = new DefaultConsumer(channel) {
		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
		    StringBuilder sb = new StringBuilder();
		    for (byte b : body) {
			    sb.append((char) b);
		    }
		    System.out.println(sb.toString());
		    BasicProperties props = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
		    channel.basicPublish("", properties.getReplyTo(), props, "result".getBytes());//消费消息之后返回result
		}
	    };
	    channel.basicConsume(queneName, true, consumer);
	    
	} catch (IOException e) {
	    e.printStackTrace();
	} finally {
	    //连接不关闭,一直处于打开状态
	}
    }

}
           

客户端:

/**
 * <p>
 * 
 * 客户端发送消息,然后接收消息,
 *
 * </p>
 * @author	hz16092620 
 * @date	2018年9月16日 上午10:27:08
 * @version      
 */
public class RpcClient {

    public static void main(String[] args) {
	createClient();
    }

    /**
     * 交换器发送数据
     * */
    public static void createClient() {
	Connection conn = RabbitConnection.createConnection();
	Channel channel = null;
	try {
	    channel = conn.createChannel();
	    String queneName = channel.queueDeclare().getQueue();

	    final String uuid = UUID.randomUUID().toString();
	    BasicProperties props = new BasicProperties().builder().correlationId(uuid).replyTo(queneName).build();
	    channel.basicPublish("", "rpc_liuhp_quene", props, String.valueOf(new Random().nextInt(100)).getBytes());// 发送消息

	    // 接受返回的结果
	    // 方式一 queneingConsumer 很多的问题,已经在3.0之后遗弃了。这个暂时不研究
	    // QueueingConsumer consume = new QueueingConsumer(channel);
	    /*
	     * while (true) { QueueingConsumer.Delivery delivery =
	     * consume.nextDelivery(); if
	     * (delivery.getProperties().getCorrelationId().equals(uuid)) {
	     * System.out.println(new String(delivery.getBody())); } break; }
	     */
	    // 方式二 defaultConsumer
	    Consumer consumer = new DefaultConsumer(channel) {

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
		    while (true) {
			if (properties.getCorrelationId().equals(uuid)) {
			    StringBuilder sb = new StringBuilder();
			    for (byte b : body) {
				sb.append((char) b);
			    }
			    System.out.println(sb.toString());
			}
			break;
		    }
		}
	    };
	    channel.basicConsume(queneName, true, consumer);

	    Thread.sleep(30000L);
	} catch (IOException | InterruptedException e) {
	    e.printStackTrace();
	} finally {
	    try {
		channel.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    } catch (TimeoutException e) {
		e.printStackTrace();
	    }
	    try {
		conn.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }
	}
    }
}