天天看點

SpringBoot+RabbitMQ 實作延遲隊列

9 total views, 3 views today

RabbitMQ 實作延遲隊列,方法有兩種。

第一種是安裝延遲隊列插件;

第二種就是利用死信隊列的方式;

這裡采用第二種方式。

rabbitmq 自身的一些概念,可以去網上或者書上獲得。rabbitmq 延遲隊列的實作原理,網上資料很多,簡單盜圖一張。

SpringBoot+RabbitMQ 實作延遲隊列
簡單說明一下原理。

将消息發送到一個隊列中去,消息自身有一個 TTL,即失效時間,如果到期還是為消費該消息,那麼該消息就成為死信,将死信移到專門的死性隊列,然後消費者隻需要消費死信隊列中的消息,變相的實作了延遲消息的功能。

基于 Springboot 的具體代碼實作:
<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>           

複制

配置檔案:

spring:
  rabbitmq:
    host: 111.22.1.2 
    port: 5672
    username: user
    password: passw
    template:
      mandatory: true
    #支援釋出确認與傳回
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        #是否自動開始監聽消息隊列
        auto-startup: false
        #手動應答
        acknowledge-mode: manual
        #監聽容器數及最大數
        concurrency: 1
        max-concurrency: 1
        #是否支援重試
        retry:
          enabled: true
           

複制

配置檔案 TopicRabbitConfig.Java

package com.mine.config;


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TopicRabbitConfig {


    //延遲隊列的名字
    public static final String delayQueueName = "delay_machine_check_queue";
    //延遲隊列的exchange名字
    public static final String delayExchangeName = "delay_machine_check_exchange";

    //死信exchange的名稱
    public static final String deadLetterProcessQueueName = "dead_letter_process_queue";
    //死信exchange的名稱
    public static final String deadLetterProcessExchangeName = "dead_letter_machine_check_exchange";

    public static final String routingKey = "machine_check";


    //延遲隊列的exchange
    @Bean
    public TopicExchange delayExchange() {
        return new TopicExchange(delayExchangeName, true, false);
    }

    //死信隊列的exchange
    @Bean
    public TopicExchange deadLetterProcessExchange() {
        return new TopicExchange(deadLetterProcessExchangeName, true, false);
    }

    //延遲隊列
    @Bean
    Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        //args.put("x-message-ttl", 20000);
        args.put("x-dead-letter-exchange", deadLetterProcessExchangeName); //DLX,dead letter發送到的exchange
        args.put("x-dead-letter-routing-key", routingKey);
        return new Queue(delayQueueName, true, false, false, args);
    }

    //死信隊列
    @Bean
    public Queue deadLetterProcessQueue() {
        return new Queue(deadLetterProcessQueueName, true, false, false);
    }

    //綁定延遲隊列,延遲Exchange,routing關系
    @Bean
    Binding bindingDelayExchange(Queue delayQueue, TopicExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(routingKey);
    }

    //綁定死信隊列,死信Exchange,routing關系
    //參數根據Spring命名約定的方式,會将上面的Queue執行個體和exchange執行個體注入進來,形成綁定關系
    @Bean
    Binding bindingDeadLetterProcessExchange(Queue deadLetterProcessQueue, TopicExchange deadLetterProcessExchange) {
        return BindingBuilder.bind(deadLetterProcessQueue).to(deadLetterProcessExchange).with(routingKey);
    }
}           

複制

監聽代碼:

//隻監聽聽死信隊列即可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = TopicRabbitConfig.deadLetterProcessQueueName, durable = "true", autoDelete = "false"),
        exchange = @Exchange(name = TopicRabbitConfig.deadLetterProcessExchangeName, durable = "true", type = "topic", autoDelete = "false"),
        key = TopicRabbitConfig.routingKey), autoStartup = "true", id = "myconsumer")
public void receiveRabbitMQ(Message message, Channel channel) throws Exception {
    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        LogHelper.log4j2.info("收到消息:" + new String(message.getBody()));

    } catch (Exception ex) {
        LogHelper.log4j2.error("receive", ex);
    }
}           

複制

上面 2 段代碼,實作的效果就是如下:

SpringBoot+RabbitMQ 實作延遲隊列

發送消息的代碼:RabbitMQTopicSender.java

package com.mine.utils;

import com.mine.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Component
public class RabbitMQTopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String message, Long delaySecond) {

        /***
         * 方法參數說明
         * https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html
         convertAndSend(String exchange, String routingKey, Object object)
         Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
         ***/

        message = message + "【" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "】";
        LogHelper.log4j2.info("message:" + message);
        MessageProperties props = new MessageProperties();
        props.setExpiration(Long.toString(delaySecond * 1000));//消息的延遲時間
        Message ttlMessage = new Message(message.getBytes(), props);

        rabbitTemplate.convertAndSend(TopicRabbitConfig.delayExchangeName, TopicRabbitConfig.routingKey, ttlMessage);
        LogHelper.log4j2.info("消息發送成功");
    }
}           

複制

至此調用 send 方法,即可發送延遲隊列。

以上代碼親測有效。

原創文章,轉載請注明出處!http://www.javathings.top/springbootrabbitmq實作延遲隊列/