天天看點

RabbitMQ_2、工作隊列

工作隊列(競争消費者模式)

​​官方案例​​

工作隊列_消息發送

/**
 * @PackageName : com.rzk
 * @FileName : Send
 * @Description : 工作隊列-輪詢-消息生産者
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:21
 * @Version : 1.0.0
 */
public class Send {

    //定義隊列名稱
    private final static String QUEUE_NAME = "work_rr";

    public static void main(String[] argv) throws Exception {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("");
        factory.setVirtualHost("/");
        factory.setPassword("");
        factory.setPort(5672);

        try (
                //連接配接工廠建立連接配接
                Connection connection = factory.newConnection();
                //建立信道
                Channel channel = connection.createChannel()) {
            /**
             * 綁定隊列
             * 聲明隊列
             *  第一個參數 queue :隊列名稱
             *  第二個參數 durable :是否持久化
             *  第三個參數 Exclusive :排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明的連接配接可見,并在連接配接斷開時自動删除。
             * 這裡需要注意三點:
             *      1 .排他隊列是基于連接配接可見的,同一連接配接的不同通道是可以同時通路同一個連接配接建立的排他他隊
             *      2 . ”百次“,如果一個連接配接已經聲明了一個排他隊列,其他連接配接是不允許建立同名的;排他隊列這個與昔通隊歹懷同。
             *      3 .即使該隊列是持久化的,一旦連接配接關閉或者用戶端退出,該排他隊列都會被自動删除的。這種隊列适用于隻限于一個容戶端發送讀取消息的應用場景。
             * 第四個參數 Auto-delete :自動删除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動删除。這種隊列适用于臨時隊列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 20; i++) {
                String message = " Send  "+i;
                //隊列消息的生産者:發送消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'" + i);
            }
        }
    }
}      

工作隊列_消息接收

建立兩個消息消費去接收

1

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 工作隊列-輪詢-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv01 {
    private final static String QUEUE_NAME = "work_rr";

    public static void main(String[] argv) throws Exception {
        //建立工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //連接配接工廠建立連接配接
        Connection connection = factory.newConnection();
        //建立信道
        Channel channel = connection.createChannel();
        //綁定隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            /**
             * 手動确認
             * multiple: 是否确認多條
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false );
        };
        /**
         * 監聽隊列消費消息
         * autoAck:自動應答
         * 當消費者收到該消息,會傳回通知消息隊列 我消費者已經收到消息了
         */
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}      

2

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 工作隊列-輪詢-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv02 {
    private final static String QUEUE_NAME = "work_rr";

    public static void main(String[] argv) throws Exception {
        //建立工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //連接配接工廠建立連接配接
        Connection connection = factory.newConnection();
        //建立信道
        Channel channel = connection.createChannel();
        //綁定隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //模拟消費耗時
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            /**
             * 手動确認
             * multiple: 是否确認多條或單條資料
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false );
        };
        //監聽隊列消費消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}      

監聽隊列消費消息

channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

在 Boolean autoAck = false的情況下,如果消費者1當機了,消息隊列沒有收到消費者發送回的應答,就會将這個消息發送給下一個消費者處理。直到消費者處理完這個消息,并向消息隊列發送了一個消息應答,告訴消息隊列此時這個消息已經處理完成,消息隊列才會将這個消息從記憶體中删除。

工作隊列的優點:解決簡單隊列 當生産者生産消息大與消費者消費能力時,加多幾個消費者,讓消費者的消費能力大于等于生産者生産能力,這樣就能減少多餘的消息堆積在隊列裡面

RabbitMQ_2、工作隊列

公平模式

//限制消費者每次隻能接受一條,處理完才能接受下一條消息
        channel.basicQos(1);