在上篇介紹了如何簡單的發送一個消息隊列之後,我們本篇來看下RabbitMQ的另外一種模式,
工作隊列。
什麼是工作隊列
我們上篇文章說的是,一個生産者生産了消息被一個消費者消費了,如下圖

上面這種簡單的消息隊列确實可以處理我們的任務,但是當我們隊列中的任務過多,處理每條任務有需要很長的耗時,那麼使用一個消費者處理消息顯然不不夠的,是以我們可以增加消費者,來共享消息隊列中的消息,進行任務處理。
也就是如下圖
雖然上圖我隻花了一個生産者A,那麼同理,能有多個消費者,那也能多個生産者。
代碼
發送消息
public class Send {
public static final String QUEUE_NAME = "test_word_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 擷取連接配接
Connection connection = MQConnectUtil.getConnection();
// 建立通道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 模拟發送20條消息
for (int i = 0; i < 20; i++) {
String msg = "消息:" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i * 20);
System.out.println(msg);
}
channel.close();
connection.close();
}
}
消費者A
public class Consumer1 {
public static final String QUEUE_NAME = "test_word_queue";
public static void main(String[] args) throws Exception {
// 擷取連接配接
Connection connection = MQConnectUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
// 隊列聲明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("消費者[A]-内容:" + msg);
Thread.sleep(2 * 1000);
}
};
// 監聽隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費者B
public class Consumer2 {
public static final String QUEUE_NAME = "test_word_queue";
public static void main(String[] args) throws Exception {
// 擷取連接配接
Connection connection = MQConnectUtil.getConnection();
// 建立頻道
Channel channel = connection.createChannel();
// 隊列聲明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("消費者[B]-内容:" + msg);
Thread.sleep(1000);
}
};
// 監聽隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我們來看下消費者A和消費者B的消費情況
- 消費者B
- 消費者A
有沒有發現什麼問題,我總過模拟發送了20條消息,細心的同學可以發現,消費者A和消費者B消費了同樣多的消息,都消費了10天,但是我在消費者A和消費者B中,什麼sleep不通的時長,按道理說消費者B要比消費者A處理消息的速度塊,處理的消息更多,那麼為什麼會産生這樣的原因?
RabbitMQ工作隊列的預設配置
預設情況下,RabbitMQ會将每個消息依次發送給下一個消費者,每個消費者收到的消息數量其實是一樣的,我們把這種分發消息的方式稱為
輪訓分發模式。
本篇我們就簡單介紹這麼多内容,有心學習的童鞋一定要敲敲代碼,看不一定能看會,隻有自己敲一遍,才能有所了解。
日拱一卒,功不唐捐