目錄
- Work Queues
- 抽取工具類
- 啟動生産者
- 啟動兩個工作線程(消費者)
工作隊列(又稱任務隊列)的主要思想是避免立即執行資源密集型任務,而不得不等待它完成。相反我們安排任務在之後執行。我們把任務封裝為消息并将其發送到隊列。在背景運作的工作程序将彈出任務并最終執行作業。當有多個工作線程時,這些工作線程将一起處理這些任務。
輪訓分發消息
在這個案例中我們會啟動兩個工作線程,一個消息發送線程,我們來看看他們兩個工作線程是如何工作的。
public class RabbitMqUtils {
//得到一個連接配接的 channel
public static Channel getChannel() throws Exception {
//建立一個連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.100");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("my_vhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
發送10條消息
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
// 讓消息持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
//從控制台當中接受資訊
for (int i = 0; i < 10; i++) {
String message = "生産者發送消息" + i;
System.out.println(message);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
}
}
worker01 和 worker02
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("C1接收到消息:" + receivedMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "C1消費者取消消費接口回調邏輯");
};
System.out.println("C1 消費者啟動等待消費......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
public class Worker02 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("C2接收到消息:" + receivedMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "C2消費者取消消費接口回調邏輯");
};
System.out.println("C2 消費者啟動等待消費......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
結果展示