9 total views, 3 views today
RabbitMQ 實作延遲隊列,方法有兩種。
第一種是安裝延遲隊列插件;
第二種就是利用死信隊列的方式;
這裡采用第二種方式。
rabbitmq 自身的一些概念,可以去網上或者書上獲得。rabbitmq 延遲隊列的實作原理,網上資料很多,簡單盜圖一張。

簡單說明一下原理。
将消息發送到一個隊列中去,消息自身有一個 TTL,即失效時間,如果到期還是為消費該消息,那麼該消息就成為死信,将死信移到專門的死性隊列,然後消費者隻需要消費死信隊列中的消息,變相的實作了延遲消息的功能。
基于 Springboot 的具體代碼實作:
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
複制
配置檔案:
spring:
rabbitmq:
host: 111.22.1.2
port: 5672
username: user
password: passw
template:
mandatory: true
#支援釋出确認與傳回
publisher-confirms: true
publisher-returns: true
listener:
simple:
#是否自動開始監聽消息隊列
auto-startup: false
#手動應答
acknowledge-mode: manual
#監聽容器數及最大數
concurrency: 1
max-concurrency: 1
#是否支援重試
retry:
enabled: true
複制
配置檔案 TopicRabbitConfig.Java
package com.mine.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TopicRabbitConfig {
//延遲隊列的名字
public static final String delayQueueName = "delay_machine_check_queue";
//延遲隊列的exchange名字
public static final String delayExchangeName = "delay_machine_check_exchange";
//死信exchange的名稱
public static final String deadLetterProcessQueueName = "dead_letter_process_queue";
//死信exchange的名稱
public static final String deadLetterProcessExchangeName = "dead_letter_machine_check_exchange";
public static final String routingKey = "machine_check";
//延遲隊列的exchange
@Bean
public TopicExchange delayExchange() {
return new TopicExchange(delayExchangeName, true, false);
}
//死信隊列的exchange
@Bean
public TopicExchange deadLetterProcessExchange() {
return new TopicExchange(deadLetterProcessExchangeName, true, false);
}
//延遲隊列
@Bean
Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
//args.put("x-message-ttl", 20000);
args.put("x-dead-letter-exchange", deadLetterProcessExchangeName); //DLX,dead letter發送到的exchange
args.put("x-dead-letter-routing-key", routingKey);
return new Queue(delayQueueName, true, false, false, args);
}
//死信隊列
@Bean
public Queue deadLetterProcessQueue() {
return new Queue(deadLetterProcessQueueName, true, false, false);
}
//綁定延遲隊列,延遲Exchange,routing關系
@Bean
Binding bindingDelayExchange(Queue delayQueue, TopicExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(routingKey);
}
//綁定死信隊列,死信Exchange,routing關系
//參數根據Spring命名約定的方式,會将上面的Queue執行個體和exchange執行個體注入進來,形成綁定關系
@Bean
Binding bindingDeadLetterProcessExchange(Queue deadLetterProcessQueue, TopicExchange deadLetterProcessExchange) {
return BindingBuilder.bind(deadLetterProcessQueue).to(deadLetterProcessExchange).with(routingKey);
}
}
複制
監聽代碼:
//隻監聽聽死信隊列即可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = TopicRabbitConfig.deadLetterProcessQueueName, durable = "true", autoDelete = "false"),
exchange = @Exchange(name = TopicRabbitConfig.deadLetterProcessExchangeName, durable = "true", type = "topic", autoDelete = "false"),
key = TopicRabbitConfig.routingKey), autoStartup = "true", id = "myconsumer")
public void receiveRabbitMQ(Message message, Channel channel) throws Exception {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
LogHelper.log4j2.info("收到消息:" + new String(message.getBody()));
} catch (Exception ex) {
LogHelper.log4j2.error("receive", ex);
}
}
複制
上面 2 段代碼,實作的效果就是如下:
發送消息的代碼:RabbitMQTopicSender.java
package com.mine.utils;
import com.mine.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
public class RabbitMQTopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message, Long delaySecond) {
/***
* 方法參數說明
* https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html
convertAndSend(String exchange, String routingKey, Object object)
Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
***/
message = message + "【" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "】";
LogHelper.log4j2.info("message:" + message);
MessageProperties props = new MessageProperties();
props.setExpiration(Long.toString(delaySecond * 1000));//消息的延遲時間
Message ttlMessage = new Message(message.getBytes(), props);
rabbitTemplate.convertAndSend(TopicRabbitConfig.delayExchangeName, TopicRabbitConfig.routingKey, ttlMessage);
LogHelper.log4j2.info("消息發送成功");
}
}
複制
至此調用 send 方法,即可發送延遲隊列。
以上代碼親測有效。
原創文章,轉載請注明出處!http://www.javathings.top/springbootrabbitmq實作延遲隊列/