延遲消息
當消息發送到伺服器時,該消息不能直接被放在隊列裡面,而是在 MQ 伺服器裡面建立一個定時任務,在伺服器到達延時時間後再執行投遞的操作
延遲消息的使用場景
淘寶七天自動确認收貨。在我們簽收商品後,物流系統會在七天後延時發送一個消息給支付系統, 通知支付系統将款打給商家,這個過程持續七天,就是使用了消息中間件的延遲推送功能。12306 購票支付确認頁面。我們在選好票點選确定跳轉的頁面中往往都會有倒計時,代表着 30 分鐘内訂單不确認的話将會自動取消訂單。其實在下訂單那一刻開始購票業務系統就會發送一個延時消息給訂單系統,延時30分鐘,告訴訂單系統訂單未完成,如果我們在30分鐘内完成了訂單,則可以通過邏輯代碼判斷來忽略掉收到的消息。在上面兩種場景中,如果我們使用下面兩種傳統解決 1 對于單機版而已,我們的政策有3種定時删除、定期删除、惰性删除。
對于分布式項目而言呢,我們最好選擇 MQ
插件安裝:javascript:void(0)
RabbitMQ 的延時消息實作
RabbitMQ 本身沒有提供一個隊列的機制,但是它裡面有個延遲隊列的機制
延遲隊列的機制及建立項目
rabbitmq-springboot-dealy
把
application.properties
修改為
application.yml
格式的,然後修改其中的内如如下所示:
server:
port: 8001
spring:
application:
name: rabbitmq-springboot-dealy
rabbitmq:
host: 139.196.183.130
port: 5672
username: user
password: 123456
virtual-host: v-it6666
# 這個是老版本的用法
# publisher-confirms: true
# 開啟消息到達交換機的确認機制
publisher-confirm-type: correlated
# 消息由交換機到達隊列失敗時觸發
publisher-returns: true
listener:
simple:
# 自動簽收,這個是預設行為
# acknowledge-mode: auto
# 手動簽收
acknowledge-mode: manual
direct:
# 設定直連交換機的簽收類型
acknowledge-mode: manual
代碼實作延遲消息
/**
* @author BNTang
*/
@Configuration
public class DealyMessageConfig {
@Bean
public Queue dealyQueue() {
Map<String, Object> args = new HashMap<>();
// 把一個隊列修改為延遲隊列
// 消息的最大存活時間
args.put("x-message-ttl", 10 * 1000);
// 該隊列裡面的消息死了,去那個交換機
args.put("x-dead-letter-exchange", "DeadLetter.exc");
// 該隊列裡面的消息死了,去那個交換機,由那個路由 key 來路由他
args.put("x-dead-letter-routing-key", "DeadLetter.key");
return new Queue("dealy", true, false, false, args);
}
/**
* 死信交換機
*
* @return 交換機
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("DeadLetter.exc");
}
/**
* 新生隊列
*
* @return 隊列
*/
@Bean
public Queue newQueue() {
return new Queue("new.queue");
}
/**
* 綁定
*
* @return 綁定結果
*/
@Bean
public Binding newAndDeadLetterExchange() {
return BindingBuilder.bind(newQueue()).to(deadLetterExchange()).with("DeadLetter.key");
}
}
監聽新的世界
/**
* @author BNTang
*/
@Component
@RabbitListener(queues = {"new.queue"})
public class MessageReceive {
@RabbitHandler
public void onMessage(String content, Message message, Channel channel) {
System.out.println("來到了新的世界,正在消費中...");
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag, false);
System.out.println("簽收成功:" + content);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("簽收成功時間為:" + sdf.format(new Date()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
測試
我們往延遲隊列發送消息,啟動項目
@SneakyThrows
@Test
public void sendDealy() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息發送時間:" + sdf.format(new Date()));
rabbitTemplate.convertAndSend("dealy", "我是一個延時消息");
System.out.println("消息發送成功");
System.in.read();
}
發消息指定時間
我們的這個隊列裡面讓你活
10s
,你想活11s那是不行的,但是你活5s是可以的,它是以那個最小為主的!
@SneakyThrows
@Test
public void sendDealy() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息發送時間:" + sdf.format(new Date()));
rabbitTemplate.convertAndSend("dealy", (Object) "我是一個延時消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 我們延時隊列裡面設定的為 10 秒,但這個消息它隻想活 5 秒
message.getMessageProperties().setExpiration("5000");
return message;
}
});
System.out.println("消息發送成功");
System.in.read();
}