天天看點

四、RabbitMq的工作模式( Work Queues)

目錄

  • Work Queues
    • 抽取工具類
    • 啟動生産者
    • 啟動兩個工作線程(消費者)

工作隊列(又稱任務隊列)的主要思想是避免立即執行資源密集型任務,而不得不等待它完成。相反我們安排任務在之後執行。我們把任務封裝為消息并将其發送到隊列。在背景運作的工作程序将彈出任務并最終執行作業。當有多個工作線程時,這些工作線程将一起處理這些任務。

輪訓分發消息

在這個案例中我們會啟動兩個工作線程,一個消息發送線程,我們來看看他們兩個工作線程是如何工作的。

public class RabbitMqUtils {
    //得到一個連接配接的 channel
    public static Channel getChannel() throws Exception {
        //建立一個連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.100");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("my_vhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
           

發送10條消息

public class Task01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            // 讓消息持久化
            boolean durable = true;
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
            //從控制台當中接受資訊
            for (int i = 0; i < 10; i++) {
                String message = "生産者發送消息" + i;
                System.out.println(message);
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
        }
    }
}
           

worker01 和 worker02

public class Worker01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("C1接收到消息:" + receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "C1消費者取消消費接口回調邏輯");
        };
        System.out.println("C1 消費者啟動等待消費......");

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}
           
public class Worker02 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("C2接收到消息:" + receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "C2消費者取消消費接口回調邏輯");
        };
        System.out.println("C2 消費者啟動等待消費......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}
           

結果展示