天天看點

java-rabbitmq-官網執行個體06

java-rabbitmq-官網執行個體06

描述:

    使用兩個互通的隊列,模拟  RPC 調用

運作:

    D6_RPCClient.main();

    D6_RPCServer.main();

package com.example.tutorials;


import com.rabbitmq.client.*;


import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;


/**
 * 使用兩個隊列,互相接收對方的消息。使用 AMQP.BasicProperties 傳輸唯一"互聯ID",實作RPC請求
 * rpc 調用,轉換為大寫
 * @create 2017-08-30
 * amqp-client 4.2.0
 **/
public class D6_RPCClient {


    public static void main(String[] argv) {
        D6_RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new D6_RPCClient();
            //生成一個,互聯ID
            final String corrId = UUID.randomUUID().toString();


            //發送消息
            System.out.println("輸入要發送的内容,退出輸入 x ");
            String message ;
            while(true){
                Scanner scanner = new Scanner(System.in);
                message = scanner.next();
                if("x".equals(message))
                    break;


                System.out.println(" [請求] 内容 = "+message);
                response = fibonacciRpc.call(message,corrId);
                System.out.println(" [響應] 内容 = " + response );
            }
        }
        catch  (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            if (fibonacciRpc!= null) {
                try {
                    fibonacciRpc.close();
                }
                catch (IOException _ignore) {}
            }
        }
    }


    private Connection connection;
    private Channel channel;
    /**
     * 由 client 發送消息到 "rpc_queue" 隊列,server 消費消息,之後 server 将響應寫入"應答隊列"
     */
    private String requestQueueName = "rpc_queue";
    /**
     * 應答隊列
     */
    private String replyQueueName;


    /**
     * 阻塞隊列
     */
    private final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);


    /**
     * rpc 用戶端執行個體
     * @throws IOException
     * @throws TimeoutException
     */
    public D6_RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //設定登入賬号
        factory.setHost(ServerInfo.host);
        factory.setPort(ServerInfo.port);
        factory.setUsername(ServerInfo.uName);
        factory.setPassword(ServerInfo.uPwd);
        //連結伺服器
        connection = factory.newConnection();
        channel = connection.createChannel();
        //聲明一個私有排他、自動删除的隊列
        replyQueueName = channel.queueDeclare().getQueue();
        System.out.println(" [x]應答隊列名:"+replyQueueName);
    }


    /**
     * 調用
     * @param message 消息内容
     * @param corrId  關聯id
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public String call(String message,final String corrId) throws IOException, InterruptedException {
        //消息的屬性,路由報頭等等
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId) //互聯ID
                .replyTo(replyQueueName) //應答隊列
                .build();


        //釋出送消息
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));


        //消費者,接收"應答隊列"消息
        boolean autoAck=true;//自動應答
        channel.basicConsume(replyQueueName, autoAck, new DefaultConsumer(channel) {
            @Override
            public void handleConsumeOk(String consumerTag) {
                super.handleConsumeOk(consumerTag);
                System.out.println(" [接收應答消息] 已經注冊成功! ");
            }
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(String.format(
                        " [響應隊列] 請求corrId = %s; 響應corrId = %s; replyTo = %s;"
                        ,corrId
                        ,properties.getCorrelationId()
                        ,properties.getReplyTo()));
                if (properties.getCorrelationId().equals(corrId)) {
                    //将元素e插入到隊列末尾,如果插入成功,則傳回true;如果插入失敗(即隊列已滿),則傳回false;
                    String str=new String(body, "UTF-8");
                    boolean wSuccess = response.offer(str);
                    System.out.println(
                            " [接收應答消息] 已收到響應,并寫入阻塞隊列("+wSuccess+",size="+response.size()+");響應内容="+str);
                }
            }
        });


        //使用阻塞隊列,在 handleDelivery 沒有接收到回調消息時一直處于阻塞狀态
        String result=response.take();
        System.out.println(" [接收應答消息] 阻塞隊列已經擷取到内容! ");
        return result;
    }


    public void close() throws IOException {
        connection.close();
    }
}      
package com.example.tutorials;


import com.rabbitmq.client.*;


import java.io.IOException;
import java.util.concurrent.TimeoutException;


/**
 * 使用兩個隊列,互相接收對方的消息。使用 AMQP.BasicProperties 傳輸唯一"互聯ID",實作RPC請求
 * rpc 調用,轉換為大寫
 * @create 2017-08-30
 * amqp-client 4.2.0
 **/
public class D6_RPCServer {


    /**
     * 由 client 發送消息到 "rpc_queue" 隊列,server 消費消息,之後 server 将響應寫入"應答隊列"
     */
    private static final String RPC_QUEUE_NAME = "rpc_queue";


    public static void main(String[] argv) throws TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //設定登入賬号
        factory.setHost(ServerInfo.host);
        factory.setPort(ServerInfo.port);
        factory.setUsername(ServerInfo.uName);
        factory.setPassword(ServerInfo.uPwd);


        Connection connection = null;
        try {
            //連結伺服器
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();


            //定義一個隊列
            boolean duiable=false;//持久化
            boolean exclusive = false;//排他隊列
            boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
            channel.queueDeclare(RPC_QUEUE_NAME, duiable, exclusive, autoDelete, null);
            //輪詢分發消息
            channel.basicQos(1);


            System.out.println(" [x] Awaiting RPC requests");


            //建立消費者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleConsumeOk(String consumerTag) {
                    super.handleConsumeOk(consumerTag);
                    System.out.println(" [x] 已經注冊成功! ");
                }
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    final String corrId =properties.getCorrelationId();
                    //消息的屬性,路由報頭等等
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(corrId)//互聯ID
                            .build();
                    System.out.println("=====================");
                    System.out.println(String.format(" [響應隊列] corrId = %s;replyTo = %s;"
                        ,corrId
                        ,replyProps.getReplyTo()));


                    //響應内容
                    String response = "";


                    try {
                        //接收到的消息
                        String message = new String(body,"UTF-8");
                        //int n = Integer.parseInt(message);


                        System.out.println(" [接收到] 内容 = " + message );
                        response += getMsg(message);//計算斐波那契數列
                        System.out.println(" [接收到] 計算結果 = "+response);
                    }
                    catch (RuntimeException e){
                        System.out.println(" [異常] " + e.toString());
                    }
                    finally {


                        //發送消息到應答隊列
                        channel.basicPublish( "" //交換器
                                , properties.getReplyTo() //路由鍵
                                , replyProps //消息屬性
                                , response.getBytes("UTF-8"));


                        System.out.println(" [響應到] " +
                                String.format("ReplyTo = %s;消息:%s"
                                        ,properties.getReplyTo()
                                        ,response));
                        //手動應答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        System.out.println(" [響應到] " +
                                String.format("消息ID,basicAck = %s;"
                                        ,envelope.getDeliveryTag()));
                    }
                }
            };


            //啟動一個消費者,監聽 RPC_QUEUE_NAME 隊列
            boolean autoAck=false;//自動應答
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);


            //loop to prevent reaching finally block
            while(true) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException _ignore) {
                    System.out.println(" [loop] exception," + _ignore.getMessage());
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {}
        }
    }
    /**
     * 轉換為大寫
     * @param
     * @return
     */
    private static String getMsg(String str) {
        return String.format("%s => %s",str,str.toUpperCase());
    }


}