天天看點

rabbitmq實作RPC執行個體

最近閱讀《RabbitMq實戰指南》了解了rpc(remote procedure call 遠端過程調用)的實作。下面是測試的例子:

服務端

/**
 * <p>
 * 
 * rpc伺服器,1、開啟隊列,2、消費消息,3、把response發送到回調隊列。
 *
 * </p>
 * @author	hz16092620 
 * @date	2018年9月16日 上午10:08:23
 * @version      
 */
public class RpcServer {
    
    public static void main(String[] args) {
	consumerMessage();
    }
    
    /**
     * 服務端消費消息
     * */
    public static void consumerMessage() {
	Connection conn = RabbitConnection.createConnection();//自定義的擷取連結的方法
	try {
	    String queneName = "rpc_liuhp_quene";
	    final Channel channel = conn.createChannel();
	    channel.queueDeclare(queneName, false, true, false, null);
	   
	    
	    //消費消息,推模式
	    channel.basicQos(10);//最多消費消息個數
	    Consumer consumer = new DefaultConsumer(channel) {
		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
		    StringBuilder sb = new StringBuilder();
		    for (byte b : body) {
			    sb.append((char) b);
		    }
		    System.out.println(sb.toString());
		    BasicProperties props = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
		    channel.basicPublish("", properties.getReplyTo(), props, "result".getBytes());//消費消息之後傳回result
		}
	    };
	    channel.basicConsume(queneName, true, consumer);
	    
	} catch (IOException e) {
	    e.printStackTrace();
	} finally {
	    //連接配接不關閉,一直處于打開狀态
	}
    }

}
           

用戶端:

/**
 * <p>
 * 
 * 用戶端發送消息,然後接收消息,
 *
 * </p>
 * @author	hz16092620 
 * @date	2018年9月16日 上午10:27:08
 * @version      
 */
public class RpcClient {

    public static void main(String[] args) {
	createClient();
    }

    /**
     * 交換器發送資料
     * */
    public static void createClient() {
	Connection conn = RabbitConnection.createConnection();
	Channel channel = null;
	try {
	    channel = conn.createChannel();
	    String queneName = channel.queueDeclare().getQueue();

	    final String uuid = UUID.randomUUID().toString();
	    BasicProperties props = new BasicProperties().builder().correlationId(uuid).replyTo(queneName).build();
	    channel.basicPublish("", "rpc_liuhp_quene", props, String.valueOf(new Random().nextInt(100)).getBytes());// 發送消息

	    // 接受傳回的結果
	    // 方式一 queneingConsumer 很多的問題,已經在3.0之後遺棄了。這個暫時不研究
	    // QueueingConsumer consume = new QueueingConsumer(channel);
	    /*
	     * while (true) { QueueingConsumer.Delivery delivery =
	     * consume.nextDelivery(); if
	     * (delivery.getProperties().getCorrelationId().equals(uuid)) {
	     * System.out.println(new String(delivery.getBody())); } break; }
	     */
	    // 方式二 defaultConsumer
	    Consumer consumer = new DefaultConsumer(channel) {

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
		    while (true) {
			if (properties.getCorrelationId().equals(uuid)) {
			    StringBuilder sb = new StringBuilder();
			    for (byte b : body) {
				sb.append((char) b);
			    }
			    System.out.println(sb.toString());
			}
			break;
		    }
		}
	    };
	    channel.basicConsume(queneName, true, consumer);

	    Thread.sleep(30000L);
	} catch (IOException | InterruptedException e) {
	    e.printStackTrace();
	} finally {
	    try {
		channel.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    } catch (TimeoutException e) {
		e.printStackTrace();
	    }
	    try {
		conn.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }
	}
    }
}