天天看點

rabbitmq 多個消費者消費一個隊列_RabbitMQ如何高效的消費消息-工作隊列

在上篇介紹了如何簡單的發送一個消息隊列之後,我們本篇來看下RabbitMQ的另外一種模式,

工作隊列

什麼是工作隊列

我們上篇文章說的是,一個生産者生産了消息被一個消費者消費了,如下圖

rabbitmq 多個消費者消費一個隊列_RabbitMQ如何高效的消費消息-工作隊列

上面這種簡單的消息隊列确實可以處理我們的任務,但是當我們隊列中的任務過多,處理每條任務有需要很長的耗時,那麼使用一個消費者處理消息顯然不不夠的,是以我們可以增加消費者,來共享消息隊列中的消息,進行任務處理。

也就是如下圖

rabbitmq 多個消費者消費一個隊列_RabbitMQ如何高效的消費消息-工作隊列

雖然上圖我隻花了一個生産者A,那麼同理,能有多個消費者,那也能多個生産者。

代碼

發送消息

public class Send {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        // 擷取連接配接
        Connection connection = MQConnectUtil.getConnection();

        // 建立通道
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 模拟發送20條消息
        for (int i = 0; i < 20; i++) {

            String msg = "消息:" + i;

            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            Thread.sleep(i * 20);

            System.out.println(msg);
        }

        channel.close();
        connection.close();
    }
}
           

消費者A

public class Consumer1 {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws Exception {

        // 擷取連接配接
        Connection connection = MQConnectUtil.getConnection();

        // 建立頻道
        Channel channel = connection.createChannel();

        // 隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("消費者[A]-内容:" + msg);

                Thread.sleep(2 * 1000);
            }
        };

        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
           

消費者B

public class Consumer2 {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws Exception {

        // 擷取連接配接
        Connection connection = MQConnectUtil.getConnection();

        // 建立頻道
        Channel channel = connection.createChannel();

        // 隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("消費者[B]-内容:" + msg);

                Thread.sleep(1000);
            }
        };

        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
           

我們來看下消費者A和消費者B的消費情況

  • 消費者B
rabbitmq 多個消費者消費一個隊列_RabbitMQ如何高效的消費消息-工作隊列
  • 消費者A
rabbitmq 多個消費者消費一個隊列_RabbitMQ如何高效的消費消息-工作隊列

有沒有發現什麼問題,我總過模拟發送了20條消息,細心的同學可以發現,消費者A和消費者B消費了同樣多的消息,都消費了10天,但是我在消費者A和消費者B中,什麼sleep不通的時長,按道理說消費者B要比消費者A處理消息的速度塊,處理的消息更多,那麼為什麼會産生這樣的原因?

RabbitMQ工作隊列的預設配置

預設情況下,RabbitMQ會将每個消息依次發送給下一個消費者,每個消費者收到的消息數量其實是一樣的,我們把這種分發消息的方式稱為

輪訓分發模式

本篇我們就簡單介紹這麼多内容,有心學習的童鞋一定要敲敲代碼,看不一定能看會,隻有自己敲一遍,才能有所了解。

日拱一卒,功不唐捐