天天看點

rabbitMQ輪訓分發消息

 抽取工具類

public class untils {
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.231.132");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}      

啟動兩個工作線程

public class work03 {
    public   final static String QUEUE_NAME="hello3";
    public static void main(String[] args) throws Exception {

        System.out.println("c2應答短....");
        Channel channel = untils.getChannel();
        /**
         * 消費者資訊
         * 1.消費哪個隊列
         * 2.消費成功以後是否要自動應答,true自動應答,false手動擋
         * 3.消費者未成功消費的回調内容1
         * 4.消費者取消的回調
         *
         */
        //聲明 接收消息
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
             System.out.println("開始休眠1s...");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //1.消息标記
            // 2.false 代表隻應答接收到的哪個傳遞的資訊,true為應答所有的消息包括傳遞過來的消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            System.out.println("接收到的消息"+new String(delivery.getBody()));
        };
        //取消   消息的回調
        CancelCallback cancelCallback= consumerTag -> {
            System.out.println(consumerTag+"消息消費者中斷");
        };
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);


    }
}      
public class work03 {
    public   final static String QUEUE_NAME="hello4";
    public static void main(String[] args) throws Exception {

        System.out.println("c2應答長....");
        Channel channel = untils.getChannel();
        /**
         * 消費者資訊
         * 1.消費哪個隊列
         * 2.消費成功以後是否要自動應答,true自動應答,false手動擋
         * 3.消費者未成功消費的回調内容1
         * 4.消費者取消的回調
         *
         */
        //聲明 接收消息
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
             System.out.println("開始休眠10s...");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //1.消息标記
            // 2.false 代表隻應答接收到的哪個傳遞的資訊,true為應答所有的消息包括傳遞過來的消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            System.out.println("接收到的消息"+new String(delivery.getBody()));
        };
        //取消   消息的回調
        CancelCallback cancelCallback= consumerTag -> {
            System.out.println(consumerTag+"消息消費者中斷");
        };
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);


    }
}      

生産者

public class produce03 {
    public static  final  String QUEUE_NAME="hello4";
    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = untils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //從控制太中接受消息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext())
        {
            String message=scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));;
            System.out.println("發送消息完成"+message);
        }

    }
}      

結果 

rabbitMQ輪訓分發消息
rabbitMQ輪訓分發消息
rabbitMQ輪訓分發消息