天天看點

死信隊列----RabbitMQ

       最新項目有一個場景,送出訂單後,有一個15分鐘的支付逾時時間,有同僚提出 直接一個定時任務輕松搞定, 但是不優雅.那麼怎麼優雅的解決這個問題呢? 團隊的大佬,我源哥建議使用 死信隊列,但是項目還沒有實踐,我這個 小菜雞就不能抄作業了,那就隻能放大招啦(百度),成功搞定. 

       先來完整說一下項目的流程,有一個15分鐘的支付逾時,還有一個24小時的訂單服務逾時,需要寫一個監聽 處理逾時的消息,如果消費失敗,需要将消息 加入異常隊列,人工處理   下面就直接上代碼

package com.ikang.common;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableRabbit
public class RabbitmqConfig {


    /**
     *  設定 消息手動ack
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }


    /******************     死信隊列配置 begin    ************************/

    @Value("${mqConfig.ddx.queueNameProcess}")
    String queue_process_name;  // 最終消費隊列的 name

    @Value("${mqConfig.ddx.queueName24h}")
    String queue_ttl_name_24h; // 24小時隊列 name

    @Value("${mqConfig.ddx.queueName15m}")
    String queue_ttl_name_15m; // 15分鐘隊列 name

    @Value("${mqConfig.ddx.delayExchangeName}")
    String delay_exchange_name; // 轉發交換機

    @Value("${mqConfig.ddx.queueRoutKey15m}")
    String queue_routing_key_15m; // key.15m

    @Value("${mqConfig.ddx.queueRoutKey24h}")
    String queue_routing_key_24h; // key.24h

    @Value("${mqConfig.ddx.queueRoutKey}")
    String delay_routing_key;  // key.process  最終消費隊列的key

    @Value("${mqConfig.ddx.ttlExchangeName}")
    String ttl_exchange_name;  // 入口交換機


    /**
     * 死信交換機
     * @return
     */
    @Bean
    public DirectExchange  delayExchange(){
        DirectExchange directExchange = new DirectExchange(delay_exchange_name,true,false);
        return directExchange;
    }

    /**
     * 延遲交換機
     * @return
     */
    @Bean
    public DirectExchange  TTLExchange(){
        DirectExchange directExchange = new DirectExchange(ttl_exchange_name,true,false);
        return directExchange;
    }

    /**
     * 實際消費隊列
     * @return
     */
    @Bean
    public Queue delayProcessQueue() {
        return new Queue(queue_process_name,true,false,false);
    }

    /**
     * 15分鐘 延遲隊列
     * @return
     */
    @Bean
    public Queue delayTTLQueue15m() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",delay_exchange_name);
        paramMap.put("x-dead-letter-routing-key",delay_routing_key);
        paramMap.put("x-message-ttl",900000);//15分鐘
        return new Queue(queue_ttl_name_15m,true,false,false,paramMap);
    }

    /**
     * 24小時 延遲隊列
     * @return
     */
    @Bean
    public Queue delayTTLQueue24h() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",delay_exchange_name);
        paramMap.put("x-dead-letter-routing-key",delay_routing_key);
        paramMap.put("x-message-ttl",1800000);//24小時
        return new Queue(queue_ttl_name_24h,true,false,false,paramMap);
    }

    /**
     * 轉發交換機 綁定 最終消費的隊列
     * @return
     */
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(delay_routing_key);
    }

    /**
     * 綁定延遲隊列 15m
     * @return
     */
    @Bean
    public Binding queueTTLBinding() {
        return BindingBuilder.bind(delayTTLQueue15m()).to(TTLExchange()).with(queue_routing_key_15m);
    }

    /**
     * 綁定延遲隊列 24h
     * @return
     */
    @Bean
    public Binding queueTTLBinding2() {
        return BindingBuilder.bind(delayTTLQueue24h()).to(TTLExchange()).with(queue_routing_key_24h);
    }


    /******************     死信隊列配置 end    ************************/


    /******************     異常消息投放隊列 begin   ************************/

    @Value("${mqConfig.exchangeNameException}")
    String exchange_exception;

    @Value("${mqConfig.queueNameException}")
    String queue_exception;

    @Value("${mqConfig.routeKeyException}")
    String routekey_exception;

    @Bean
    public Queue queueException() {
        boolean durable = true;
        boolean exclusive = false;
        boolean autoDelete = false;
        return new Queue(queue_exception, durable, exclusive, autoDelete);
    }

    @Bean
    public DirectExchange exchangeException() {
        boolean durable = true;
        boolean autoDelete = false;
        return new DirectExchange(exchange_exception, durable, autoDelete);
    }

    @Bean
    public Binding bindingException() {
        return BindingBuilder.bind(queueException())
                .to(exchangeException())
                .with(routekey_exception);
    }

    /******************     異常消息投放隊列 end    ************************/


}
           

         上面的是死性隊列的配置相關資訊,基本就是三步, 1.聲明隊列,配置相關屬性  2.聲明交換機,配置相關屬性  3. 配對,把他倆綁定在一起.  就是這麼簡單

        然後 生産消息,消費消息

@Slf4j
@Service
public class RabbitmqService {



    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("${mqConfig.ddx.ttlExchangeName}")
    String ttl_exchange_name;

    @Value("${mqConfig.exchangeNameException}")
    String exchange_name_Exception;

    @Value("${mqConfig.routeKeyException}")
    String routkey_Exception;

    @Value("${mqConfig.ddx.queueRoutKey24h}")
    String queue_routing_key_24h; // key.24h

    @Value("${mqConfig.ddx.queueRoutKey15m}")
    String queue_routing_key_15m; // key.15m



  /**
     *
     * @param transfer
     */
    public void produceMsg(RabbitTransfer transfer) {
        String transStr = JSON.toJSONString(transfer);
        rabbitTemplate.convertAndSend(ttl_exchange_name,transfer.getRoutingKey(),transStr,new CorrelationData(UUID.randomUUID().toString()));
    }

    /**
     *  消息處理失敗,加入異常隊列,人工處理
     * @param transfer
     */
    public void produceExceptionMsg(RabbitTransfer transfer){
        String transStr = JSON.toJSONString(transfer);
        rabbitTemplate.convertAndSend(exchange_name_Exception,routkey_Exception,transStr,new CorrelationData(UUID.randomUUID().toString()));
    }



  /**
     *  處理步驟:
     *  1. 問診訂單完成,im推送訂單id
     *  2. 根據orderId 整理資料,給監管平台發送訂單資訊
     * @param orderId
     * @param channel
     * @param tag
     */
    @RabbitListener(queues = "${mqConfig.inqOrderQueue}")
    public void receiveIm2Hosp(String orderId, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        // 增加try catch,防止消費失敗後,一直循環消費
        try{
            inqOrderMsgService.receive(orderId);
        }catch (Exception ex){
            ex.printStackTrace(); 
            log.info("發送 消息異常 郵件成功!");*/
        }
        channel.basicAck(tag, false);
    }

     /**
     *  處理步驟:
     * @param msg
     * @param channel
     * @param tag
     */
    @RabbitListener(queues = "${mqConfig.ddx.queueNameProcess}")
    @SneakyThrows
    public void process(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {

          //15分鐘 24小時邏輯
          channel.basicAck(tag, false);

    }


}
           

       消費失敗,加入異常隊列,其實還可以更優雅一些, 可以在消費逾時消息的隊列上,再做一個綁定,綁定一個異常隊列,這樣 不要消費失敗的時候,手動将消費發送到異常隊列了. 這裡就是實作了

        這樣一套 死信隊列就ok了,是不是 很簡單, 這個隻是使用,rabbitMQ還是很強大的,推薦一個資料,rabbitMQ實戰指南,很實用