天天看點

RabbitMq 消費者端 消息确認機制(ACK)

最近在看springboot 整合消息中間件的部分,

之前對于消息中間價的消息回調機制不是太清楚,這次做了一次測試

下面是測試代碼

@RabbitListener(queues = {"queue_1"})
    public void msgListener(Message message, Channel channel) throws IOException {
        //System.out.println("message:" + message);

        try {
            //int i = 1/0;  //測試接收消息抛出異常情況

            //消息手動确認
            //如果配置檔案中配置了acknowledge-mode: none、auto,在這裡消費時會報錯,因為隊列中的消息已經删除了(Channel shutdown: channel error, unknown delivery tag )
            //如果配置檔案中配置了acknowledge-mode: manual,正常消費
            //basicAck:消費者回調broker已經收到消息;參數:long deliveryTag, boolean multiple
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

            System.out.println("消費消息确認:" + message.getMessageProperties().getConsumerQueue() + ",DeliveryTag:" + message.getMessageProperties().getDeliveryTag() + ",接收到了回調方法");
        } catch (Exception e) {

            System.out.println("嘗試重發:" + message.getMessageProperties().getConsumerQueue() + ":" + message.getMessageProperties().getDeliveryTag());

            //若acknowledge-mode= manual, requeue=true,會一直重試,DeliveryTag會持續+1,配置檔案的重試配置不起作用,
            //若acknowledge-mode= auto, requeue=true,會一直重試,DeliveryTag不變,配置檔案的重試配置不起作用,
            //若acknowledge-mode= none, requeue=true,消息被自動确認
            //若requeue=false,隊列中消息會被丢棄
            //basicNack:消費者回調broker已經接收消息,但沒确認;參數:long deliveryTag, boolean multiple, boolean requeue
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            
            //與basicNack 相同
            //basicReject:消費者拒絕消息,不确認;參數:long deliveryTag, boolean requeue
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

        }

    }
           
  • acknowledge-mode:none, auto, manual

    none: 自動确認;auto:根據情況;manual:手動确認

  • deliveryTag:消息的唯一表示(在同一個隊列内)
  • multiple:批量 (手動模式下)
  • requeue:重回隊列(requeue =true 重回隊列,false 丢棄),建議requeue 設定為false,

    消息處理發生異常,除非網絡方面的原因,否則處理多次也是失敗;根據自己的業務處理失敗消息,比如存儲到資料庫或緩存中,或者報警;網絡方面的異常建議開啟重試機制

  • springboot中預設重試機制是關閉的,以下添加重試機制的元件
@Configuration
public class MyRabbitmqConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory =
                new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setRetryTemplate(new RetryTemplate());
        return factory;
    }
}
           
  • 配置檔案(.yml)
spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  // 開啟手動确認
        retry:
          enabled: true  // 開啟重試
          max-attempts: 3  //最大重試次數
          initial-interval: 2000ms  //重試間隔時間