什麼是延時隊列?

顧名思義:首先它要具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。
延時隊列的應用
延時隊列在項目中的應用還是比較多的,尤其像電商類平台:
1、訂單成功後,在30分鐘内沒有支付,自動取消訂單
2、外賣平台發送訂餐通知,下單成功後60s給使用者推送短信。
3、如果訂單一直處于某一個未完結狀态時,及時處理關單,并退還庫存
4、淘寶建立商戶一個月内還沒上傳商品資訊,将當機商鋪等
設計目标
1、消息傳輸可靠性:消息進入到延遲隊列後,保證至少被消費一次。
2、Client支援豐富:由于業務上的需求,至少支援PHP和Python。
3、高可用性:至少得支援多執行個體部署。挂掉一個執行個體後,還有後備執行個體繼續提供服務。
4、實時性:允許存在一定的時間誤差。
5、支援消息删除:業務使用方,可以随時删除指定消息。
延時隊列的實作
⚠️注:以下API使用基于JAVA8
我個人一直秉承的開發理念:能用
JDK
自帶
API
實作的功能,就不要輕易重複造輪子,或者引入三方中間件(當然如果是經受長久考驗的第三方中間件,且教程文檔資源充分的也是可以引入項目中使用)。一方面自己封裝很容易出問題(大佬無視,并收下我的膝蓋),同時調試過程也會耽誤很多開發時間;另一方面一旦接入三方的中間件就會讓系統複雜度成倍的增加,後續他人接手項目會比較懵逼,學習、維護成本大大增加。
1、DelayQueue 延時隊列
JDK
中提供了一組實作延遲隊列的
API
,位于
Java.util.concurrent
包下
DelayQueue
。
DelayQueue
是一個
BlockingQueue
(無界阻塞)隊列,它本質就是封裝了一個
PriorityQueue
(優先隊列),
PriorityQueue
内部使用
完全二叉堆
(不知道的可以自行了解)來實作隊列元素排序,我們在向
DelayQueue
隊列中添加元素時,會給元素一個
Delay
(延遲時間)作為排序條件,隊列中最小的元素會優先放在隊首。隊列中的元素隻有到了
Delay
時間才允許從隊列中取出。隊列中可以放基本資料類型或自定義實體類,在存放基本資料類型時,優先隊列中元素預設升序排列,自定義實體類就需要我們根據類屬性值比較計算了。
要實作
DelayQueue
延時隊列,隊中元素要
implements
Delayed
接口。Delayed接口裡隻有一個
getDelay
方法,用于設定延期時間。DelayedNotify類中
compareTo
方法負責對隊列中的元素進行排序。ps:因業務需要我這裡使用多線程去執行notify,是以也 implements Runnable接口,并重寫run方法。
DelayQueue
的
put
方法是線程安全的(内部調用的offer方法),offer方法内部使用了
ReentrantLock
鎖進行線程同步。
DelayQueue
還提供了兩種出隊的方法
poll()
和
take()
,
poll()
為非阻塞擷取,沒有到期的元素直接傳回null;
take()
阻塞方式擷取,沒有到期的元素線程将會等待。另外,延遲隊列也支援元素的删除-remove方法。
2、Quartz 定時任務
Quartz
一款非常經典任務排程架構,在
Redis
、
RabbitMQ
還未廣泛應用時,逾時未支付取消訂單功能都是由定時任務實作的。定時任務它有一定的周期性,可能很多單子已經逾時,但還沒到達觸發執行的時間點,那麼就會造成訂單處理的不夠及時。
使用也很友善,隻需在依賴中引入
quartz
架構依賴包,并在啟動類上使用
@EnableScheduling
注解開啟定時任務功能。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
關于Cron表達式,可閱讀之前的文章--定時任務
3、Redis
⚠️注:工具類使用的是redisTemplate
3.1 Zset
有序集合Zset可用來實作簡單的延遲隊列,将消息資料序列化,作為Zset的基本元素,将消息生成時間戳 + 消息處理延遲時間戳作為score,每次通過zRangeByScore擷取一條消息進行處理,後通過zRem删除集合元素:相當于移除需要消費的 Job。
實作簡單,适合做中小型對延遲時間要求不高的業務場景。
3.2 expire回調
Redis
的
key
過期回調事件,也能達到延遲隊列的效果,簡單來說我們開啟監聽key是否過期的事件,一旦key過期會觸發一個callback事件。
修改
redis.conf
檔案開啟
notify-keyspace-events Ex
notify-keyspace-events Ex
Redis
監聽配置,注入Bean
RedisMessageListenerContainer
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
編寫Redis過期回調監聽方法,必須繼承
KeyExpirationEventMessageListener
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("監聽到key:" + expiredKey + "已過期");
}
}
4、RabbitMQ 延時隊列
使用
RabbitMQ
做延時隊列是比較常見的一種方式,而實際上
RabbitMQ
自身并沒有直接支援提供延遲隊列功能,而是通過
RabbitMQ
消息隊列的
TTL
和
DXL
這兩個屬性間接實作的。
先來認識一下
TTL
DXL
兩個概念:
Time To Live
(
TTL
) :
TTL
顧名思義:指的是消息的存活時間,
RabbitMQ
可以通過
x-message-tt
參數來設定指定
Queue
(隊列)和
Message
(消息)上消息的存活時間,它的值是一個非負整數,機關為微秒。
RabbitMQ
可以從兩種次元設定消息過期時間,分别是
隊列
和
消息本身
- 設定隊列過期時間,那麼隊列中所有消息都具有相同的過期時間。
- 設定消息過期時間,對隊列中的某一條消息設定過期時間,每條消息
都可以不同。TTL
如果同時設定隊列和隊列中消息的
TTL
,則
TTL
值以兩者中較小的值為準。而隊列中的消息存在隊列中的時間,一旦超過
TTL
過期時間則成為
Dead Letter
(死信)。
Dead Letter Exchanges
(
DLX
)
DLX
即死信交換機,綁定在死信交換機上的即死信隊列。
RabbitMQ
的
Queue
(隊列)可以配置兩個參數
x-dead-letter-exchange
x-dead-letter-routing-key
(可選),一旦隊列内出現了
Dead Letter
(死信),則按照這兩個參數可以将消息重新路由到另一個
Exchange
(交換機),讓消息重新被消費。
x-dead-letter-exchange
:隊列中出現
Dead Letter
後将
Dead Letter
重新路由轉發到指定
exchange
(交換機)。
x-dead-letter-routing-key
:指定
routing-key
發送,一般為要指定轉發的隊列。
隊列出現
Dead Letter
的情況有:
- 消息或者隊列的
過期TTL
- 隊列達到最大長度
- 消息被消費端拒絕(basic.reject or basic.nack)
下邊結合一張圖看看如何實作超30分鐘未支付關單功能,我們将訂單消息A0001發送到延遲隊列
order.delay.queue
,并設定
x-message-tt
消息存活時間為30分鐘,當到達30分鐘後訂單消息A0001成為了
Dead Letter
(死信),延遲隊列檢測到有死信,通過配置
x-dead-letter-exchange
,将死信重新轉發到能正常消費的關單隊列,直接監聽關單隊列處理關單邏輯即可。
發送消息時指定消息延遲的時間
public void send(String delayTimes) {
amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延遲資料", message -> {
// 設定延遲毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
}
}
設定延遲隊列出現死信後的轉發規則
/**
* 延時隊列
*/
@Bean(name = "order.delay.queue")
public Queue getMessageQueue() {
return QueueBuilder
.durable(RabbitConstant.DEAD_LETTER_QUEUE)
// 配置到期後轉發的交換
.withArgument("x-dead-letter-exchange", "order.close.exchange")
// 配置到期後轉發的路由鍵
.withArgument("x-dead-letter-routing-key", "order.close.queue")
.build();
}