天天看點

RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms

這篇文章主要講 RabbitMQ 中 消費者 ack 以及 生産者 confirms。

RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms

如上圖,生産者把消息發送到 RabbitMQ,然後 RabbitMQ 再把消息投遞到消費者。

生産者和 RabbitMQ,以及 RabbitMQ 和消費者都是通過 TCP 連接配接,但是他們之間是通過信道(Channel)傳遞資料的。多個線程共享一個連接配接,但是每個線程擁有獨自的信道。

消費者 ack

  • 問題:怎麼保證 RabbitMQ 投遞的消息被成功投遞到了消費者?

    RabbitMQ 投遞的消息,剛投遞一半,産生了網絡抖動,就有可能到不了消費者。

  • 解決辦法:

    RabbitMQ 對消費者說:“如果你成功接收到了消息,給我說确認收到了,不然我就當你沒有收到,我還會重新投遞”

在 RabbitMQ 中,有兩種 acknowledgement 模式。

自動 acknowledgement 模式

這也稱作發後即忘模式。

在這種模式下,RabbitMQ 投遞了消息,在投遞成功之前,如果消費者的 TCP 連接配接 或者 channel 關閉了,這條消息就會丢失。

會有丢失消息問題。

手動 acknowledgement 模式

在這種模式下,RabbitMQ 投遞了消息,在投遞成功之前,如果消費者的 TCP 連接配接 或者 channel 關閉了,導緻這條消息沒有被 acked,RabbitMQ 會自動把目前消息重新入隊,再次投遞。

會有重複投遞消息的問題,是以消費者得準備好處理重複消息的問題,就是所謂的:幂等性。

為了啟用 手動 ack 模式,消費者需要實作

ChannelAwareMessageListener

接口。

@Component
public class Consumer implements ChannelAwareMessageListener {

    @Autowired
    private MessageConverter messageConverter;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();

        // 代表投遞的辨別符,唯一辨別了目前信道上的投遞,通過 deliveryTag ,消費者就可以告訴 RabbitMQ 确認收到了目前消息,見下面的方法
        long deliveryTag = messageProperties.getDeliveryTag();

        // 如果是重複投遞的消息,redelivered 為 true
        Boolean redelivered = messageProperties.getRedelivered();

        // 擷取生産者發送的原始消息
        Object originalMessage = messageConverter.fromMessage(message);

        Console.log("consume message = {} , deliveryTag = {} , redelivered = {}"
                , originalMessage, deliveryTag, redelivered);

        // 代表消費者确認收到目前消息,第二個參數表示一次是否 ack 多條消息
        channel.basicAck(deliveryTag, false);

        // 代表消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把目前消息重新入隊
//        channel.basicNack(deliveryTag, false, false);

        // 代表消費者拒絕目前消息,第二個參數表示是否把目前消息重新入隊
//        channel.basicReject(deliveryTag,false);

    }
}
           
  • channel.basicAck

    代表消費者确認收到目前消息,語義上表示消費者成功處理了目前消息。

  • channel.basicNack

    代表消費者拒絕一條或者多條消息。basicNack 算是 basicReject 的一個擴充,因為 basicReject 不能一次拒絕多條消息。

  • channel.basicReject

    代表消費者拒絕這條消息,語義上表示消費者沒有處理目前消息。

    對于 basicNack 和 basicReject ,如果參數

    boolean requeue

    傳入

    false

    ,消息還是會從隊列裡面删除。這三個方法隻是語義上的不同。
  • deliveryTag

    deliveryTag 是 64 bit long 值,從 1 開始,不停的遞增 1。不同的 channel 有獨立的 deliveryTag。比如有兩個消費者,你會發現,都是從 1 開始遞增,互不影響。

由于上面建立的消費者,沒有指明監聽那個隊列,是以還需要建立一個

MessageListenerContainer

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, ChannelAwareMessageListener listener) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);

    // 指定消費者
    container.setMessageListener(listener);
    // 指定監聽的隊列
    container.setQueueNames(QUEUE_NAME);

    // 設定消費者的 ack 模式為手動确認模式
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    container.setPrefetchCount();

    return container;
}
           

這樣就開啟了消費者手動 ack 模式。

注意

如果開啟了消費者手動 ack 模式,但是又沒有調用手動确認方法(比如:channel.basicAck),那問題就大了,RabbitMQ 會在目前 channel 上一直阻塞,等待消費者 ack。

生産者 confirms

  • 問題:怎麼保證生産者發送的消息被 RabbitMQ 成功接收?

    生産者發送的消息,剛發送一半,産生了網絡抖動,就有可能到不了 RabbitMQ。

  • 解決辦法:

    生産者對 RabbitMQ 說:“如果你成功接收到了消息,給我說确認收到了,不然我就當你沒有收到”

自定義消息中繼資料

/**
 * 自定義消息中繼資料
 */
@NoArgsConstructor
@Data
public class RabbitMetaMessage implements Serializable{

    /**
     * 是否是 returnCallback
     */
    private boolean returnCallback;

    /**
     * 承載原始消息資料資料
     */
    private Object payload;

    public RabbitMetaMessage(Object payload) {
        this.payload = payload;
    }
}
           
  • returnCallback 标記目前消息是否觸發了 returnCallback(後面會解釋)
  • payload 儲存原始消息資料

生産者

先把消息存儲到 redis,再發送到 rabbitmq

@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private DefaultKeyGenerator keyGenerator;

    @GetMapping("/sendMessage")
    public Object sendMessage() {
        new Thread(() -> {
            HashOperations hashOperations = redisTemplate.opsForHash();
            for (int i = ; i < ; i++) {
                String id = keyGenerator.generateKey() + "";
                String value = "message " + i;
                RabbitMetaMessage rabbitMetaMessage = new RabbitMetaMessage(value);

                // 先把消息存儲到 redis
                hashOperations.put(RedisConfig.RETRY_KEY, id, rabbitMetaMessage);

                Console.log("send message = {}", value);

                // 再發送到 rabbitmq
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value, (message) -> {
                    message.getMessageProperties().setMessageId(id);
                    return message;
                }, new CorrelationData(id));
            }
        }).start();
        return "ok";
    }

}
           

配置 ConnectionFactory

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.238.132", );
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");

    // 設定 生産者 confirms
    connectionFactory.setPublisherConfirms(true);

    // 設定 生産者 Returns
    connectionFactory.setPublisherReturns(true);

    return connectionFactory;
}
           

配置 RabbitTemplate

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

    // 必須設定為 true,不然當 發送到交換器成功,但是沒有比對的隊列,不會觸發 ReturnCallback 回調
    // 而且 ReturnCallback 比 ConfirmCallback 先回調,意思就是 ReturnCallback 執行完了才會執行 ConfirmCallback
    rabbitTemplate.setMandatory(true);

    // 設定 ConfirmCallback 回調
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        Console.log("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause);
        // 如果發送到交換器都沒有成功(比如說删除了交換器),ack 傳回值為 false
        // 如果發送到交換器成功,但是沒有比對的隊列(比如說取消了綁定),ack 傳回值為還是 true (這是一個坑,需要注意)
        if (ack) {
            String messageId = correlationData.getId();
            RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);
            Console.log("rabbitMetaMessage = {}", rabbitMetaMessage);
            if (!rabbitMetaMessage.isReturnCallback()) {

                // 到這一步才能完全保證消息成功發送到了 rabbitmq
                // 删除 redis 裡面的消息
                redisTemplate.opsForHash().delete(RedisConfig.RETRY_KEY, messageId);
            }
        }

    });

    // 設定 ReturnCallback 回調
    // 如果發送到交換器成功,但是沒有比對的隊列,就會觸發這個回調
    rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                                      exchange, routingKey) -> {
        Console.log("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey);

        // 從 redis 取出消息,設定 returnCallback 設定為 true
        String messageId = message.getMessageProperties().getMessageId();
        RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);
        rabbitMetaMessage.setReturnCallback(true);
        redisTemplate.opsForHash().put(RedisConfig.RETRY_KEY, messageId, rabbitMetaMessage);
    });
    return rabbitTemplate;
}
           

ReturnCallback 回調

必須

rabbitTemplate.setMandatory(true)

,不然當 發送到交換器成功,但是沒有比對的隊列,不會觸發 ReturnCallback 回調。而且 ReturnCallback 比 ConfirmCallback 先回調。

如何模拟 發送到交換器成功,但是沒有比對的隊列,先把項目啟動,然後再把隊列解綁,再發送消息,就會觸發 ReturnCallback 回調,而且發現消息也丢失了,沒有到任何隊列。

RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms
RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms
RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms

這樣就解綁了。

運作項目,然後打開浏覽器,輸入

http://localhost:9999/sendMessage

控制台打出如下日志

RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms

這樣就觸發了 ReturnCallback 回調 ,從 redis 取出消息,設定 returnCallback 設定為 true。你會發現 ConfirmCallback 的 ack 傳回值還是 true。

ConfirmCallback 回調

這裡有個需要注意的地方,如果發送到交換器成功,但是沒有比對的隊列(比如說取消了綁定),ack 傳回值為還是 true (這是一個坑,需要注意,就像上面那種情況!!!)。是以不能單靠這個來判斷消息真的發送成功了。這個時候會先觸發 ReturnCallback 回調,我們把 returnCallback 設定為 true,是以還得判斷 returnCallback 是否為 true,如果為 ture,表示消息發送不成功,false 才能完全保證消息成功發送到了 rabbitmq。

如何模拟 ack 傳回值為 false,先把項目啟動,然後再把交換器删除,就會發現 ConfirmCallback 的 ack 為 false。

RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms
RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms

運作項目,然後打開浏覽器,輸入

http://localhost:9999/sendMessage

控制台打出如下日志

RabbitMQ 實戰(四)消費者 ack 以及 生産者 confirms

你會發現 ConfirmCallback 的 ack 傳回值才是 false。

注意

不能單單依靠 ConfirmCallback 的 ack 傳回值為 true,就斷定目前消息發送成功了。

源碼位址

  • GitHub

參考資料

Consumer Acknowledgements and Publisher Confirms

結語

由于本人知識和能力有限,文中如有沒說清楚的地方,希望大家能在評論區指出,以幫助我将博文寫得更好。