天天看點

RabbitMQ:消息确認機制

消息确認機制

生産端消息确認機制

消息确認,是指生産端投遞消息後,如果​

​Broker​

​​收到消息,則會給我們生産端一個應答。生産端接收應答,用來确定這條消息是否正常發送到​

​Broker​

​,這種方式也是消息可靠性投遞的核心保障!

消息确認機制流程圖

RabbitMQ:消息确認機制

如何實作消息确認機制

我們需要實作消息确認機制,首先我們必須要開啟這種機制。

// 開啟消息确認機制
        channel.confirmSelect();      

​confirmSelect()​

​源碼如下:

/**
     * Enables publisher acknowledgements on this channel.
     * @see com.rabbitmq.client.AMQP.Confirm.Select
     * @throws java.io.IOException if an error is encountered
     */
    Confirm.SelectOk confirmSelect() throws IOException;      

​Enables publisher acknowledgements on this channel.​

​​從注釋中也可以知道,通過​

​Channel​

​調用這個方法會開啟生産端消息确認機制。

現在我們開啟了生産端消息确認機制,但當​

​Broker​

​收到消息,給我們生産端一個應答,我們需要有處理這個應答的方法(異步回調方法處理消息發送成功或者失敗,這很像部落客之前用AIO模型實作的簡易多人聊天室,也是異步回調),是以需要一個處理應答的回調函數,根據具體的結果對消息進行重新發送、或記錄日志等後續處理,如下:

// 添加消息确認監聽(異步回調)
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------------ack---------------");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------------no ack---------------");
            }
        });      

現在我們實作了消息确認機制,生産端投遞消息,​

​Broker​

​​收到消息,給生産端一個應答,如果消息成功收到,回調函數會輸出​

​ack​

​​,否則輸出​

​no ack​

​。

生産端

我們這裡使用的是預設交換機,它的路由規則可以看看下面這篇部落格。

​RabbitMQ:交換機(default exchange)​​

package com.kaven.rabbitmq.api.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 開啟消息确認機制
        channel.confirmSelect();

        // 5 發送消息
        for (int i = 0; i < 10; i++) {
            String msg = "消息确認模式:RabbitMQ send confirm message "+i;
            channel.basicPublish(exchange , routingKey , null , msg.getBytes());
        }

        // 6 添加消息确認監聽(異步回調)
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------------ack---------------");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------------no ack---------------");
            }
        });
    }
}      

消費端

package com.kaven.rabbitmq.api.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 隊列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 建立Queue
        channel.queueDeclare(queueName , true , false , false , null);

        // 5 建立消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName , true , consumer);

        // 6 接收消息
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}      

測試

運作生産端會發現每次運作結果都可能不一樣,會有多種情況出現,因為​

​Broker​

​會進行優化,有時會批量一次性Confirm ,有時會分開幾條Confirm。

五條。

------------ack---------------
------------ack---------------
------------ack---------------
------------ack---------------
------------ack---------------      

三條。

------------ack---------------
------------ack---------------
------------ack---------------      

還有其他情況就不列舉了。

我們來看看​

​RabbitMQ Management​

​​,有​

​40​

​條消息準備好了,因為我啟動了四次生産端來進行測試。

RabbitMQ:消息确認機制

現在,我們啟動消費端,消費端也成功收到了這​

​40​

​條消息。

消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9
消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9
消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9
消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9