天天看點

RabbitMQ的死信隊列

RabbitMQ的死信隊列

1.業務背景

如果有有錯誤消息,如果手動nack同時将消息放回到隊列中,那麼這條消息會反複消費,留在隊列中 。

如果nack後将消息丢棄,那麼如果碰到網絡抖動,消息也會丢失 。是以 通過建立死信隊列避免消息丢失。

2.實作

檔案目錄如下:

RabbitMQ的死信隊列

1.原理

我們額外建立一條隊列。當消息進入進入業務隊列後,如果收到nack那麼就将這條消息放入這條條隊列中 。

2.修改pom檔案

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>      

3.修改配置檔案

server:
  port: 8088
spring:
  rabbitmq:
    host: 192.168.*.*
    port: 5672
    username: root
    password: root
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  #手動應答
        prefetch: 1 # 每次隻處理一個資訊
    publisher-confirms: true #開啟消息确認機制
    publisher-returns: true #支援消息發送失敗傳回隊列      

4.rabbitmq的配置

@Configuration
public class RabbitMqConfig {

    /**
     * 連接配接工廠
     */
    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * 定制化amqp模版
     *
     * ConfirmCallback接口用于實作消息發送到RabbitMQ交換器後接收ack回調 即消息發送到exchange ack
     * ReturnCallback接口用于實作消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調 即消息發送不到任何一個隊列中 ack
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 消息發送失敗傳回到隊列中, yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        // 發送消息确認, yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
        // 消息傳回, yml需要配置 publisher-returns: true
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId().toString();
            logger.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {}  路由鍵: {}", correlationId, replyCode, replyText, exchange,
                routingKey);
        });
        return rabbitTemplate;
    }

    /**
     * 确認發送消息是否成功(調用util方法)
     *
     * @return
     */
    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack() {
        return new MsgSendConfirmCallBack();
    }
}      

5.util類

發送是否成功的回調方法。

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    /**
     * 回調方法
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("MsgSendConfirmCallBack  , 回調id:" + correlationData);
        if (ack) {
            System.out.println("消息發送成功");
        } else {
            //可以将消息寫入本地,使用定時任務重新發送
            System.out.println("消息發送失敗:" + cause + "\n重新發送");
        }
    }

}      
這裡有一個點,如果想做實作消息失敗重新發送,在注釋處可以實作。需要将消息寫入本地,如果失敗從本地讀取,然後發送,如果成功删除本地資訊。

6.業務隊列(如:訂單業務)

這裡聲明了一個業務隊列 ,關鍵點在于x-dead-letter-exchange,x-dead-letter-routing-key 兩個參數。

@Configuration
public class BusinessConfig {

    /**
     * 業務1子產品direct交換機的名字
     */
    public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";

    /**
     * 業務1 demo業務的隊列名稱
     */
    public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";

    /**
     * 業務1 demo業務的routekey
     */
    public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";

    
    @Bean
    public Queue yewu1DemoDeadQueue() {
        // 将普通隊列綁定到死信隊列交換機上
        Map<String, Object> args = new HashMap<>(2);
        args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
        args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
        return new Queue("yewu1_demo_dead_queue", true, false, false, args);
    }

    /**
     * 将消息隊列和交換機進行綁定
     */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
            .with("yewu1_demo_dead_key");
    }
}      
這裡有一個點如果想持久化消息到磁盤,需要建立隊列時,new Queue将第二個參數輸入為true,但是面對大并發時效率會變低 。

7.死信隊列

這裡聲明死信隊列與綁定關系。

@Configuration
public class DeadConfig {

    /**
     * 死信隊列
     */
    public final static String FAIL_QUEUE_NAME = "fail_queue";

    /**
     * 死信交換機
     */
    public final static String FAIL_EXCHANGE_NAME = "fail_exchange";

    /**
     * 死信routing
     */
    public final static String FAIL_ROUTING_KEY = "fail_routing";

    /**
     * 建立配置死信隊列
     *
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(FAIL_QUEUE_NAME, true, false, false);
    }

    /**
     * 死信交換機
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
        return directExchange;
    }

    /**
     * 綁定關系
     * 
     * @return
     */
    @Bean
    public Binding failBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
    }

}      

8.生産者消費者

生産者與消費者的代碼實作。

public enum RabbitEnum {

    /**
     * 處理成功
     */
    ACCEPT,

    /**
     * 可以重試的錯誤
     */
    RETRY,

    /**
     * 無需重試的錯誤
     */
    REJECT
@RequestMapping("/sendDirectDead")
        String sendDirectDead(@RequestBody String message) throws Exception {
        System.out.println("開始生産");
        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
                message, data);
        System.out.println("結束生産");
        System.out.println("發送id:" + data);
        return "OK,sendDirect:" + message;
    }
    @RabbitListener(queues = "yewu1_demo_dead_queue")
    protected void consumerDead(Message message, Channel channel) throws Exception {
        RabbitEnum ackSign = RabbitEnum.RETRY;
        try {
            int i = 10 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            ackSign = RabbitEnum.RETRY;
            throw e;
        } finally {
            // 通過finally塊來保證Ack/Nack會且隻會執行一次
            if (ackSign == RabbitEnum.ACCEPT) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (ackSign == RabbitEnum.RETRY) {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }      

9.實驗

當發送yewu1_demo_dead_queue隊列時,如果抛出異常,會放入死信隊列中。