天天看點

幾種延時隊列的實作方案

什麼是延時隊列?

幾種延時隊列的實作方案

顧名思義:首先它要具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。

延時隊列的應用

延時隊列在項目中的應用還是比較多的,尤其像電商類平台:

1、訂單成功後,在30分鐘内沒有支付,自動取消訂單

2、外賣平台發送訂餐通知,下單成功後60s給使用者推送短信。

3、如果訂單一直處于某一個未完結狀态時,及時處理關單,并退還庫存

4、淘寶建立商戶一個月内還沒上傳商品資訊,将當機商鋪等

設計目标

1、消息傳輸可靠性:消息進入到延遲隊列後,保證至少被消費一次。

2、Client支援豐富:由于業務上的需求,至少支援PHP和Python。

3、高可用性:至少得支援多執行個體部署。挂掉一個執行個體後,還有後備執行個體繼續提供服務。

4、實時性:允許存在一定的時間誤差。

5、支援消息删除:業務使用方,可以随時删除指定消息。

延時隊列的實作

⚠️注:以下API使用基于JAVA8

我個人一直秉承的開發理念:能用

JDK

自帶

API

實作的功能,就不要輕易重複造輪子,或者引入三方中間件(當然如果是經受長久考驗的第三方中間件,且教程文檔資源充分的也是可以引入項目中使用)。一方面自己封裝很容易出問題(大佬無視,并收下我的膝蓋),同時調試過程也會耽誤很多開發時間;另一方面一旦接入三方的中間件就會讓系統複雜度成倍的增加,後續他人接手項目會比較懵逼,學習、維護成本大大增加。

1、DelayQueue 延時隊列

JDK

 中提供了一組實作延遲隊列的

API

,位于

Java.util.concurrent

包下

DelayQueue

幾種延時隊列的實作方案
幾種延時隊列的實作方案

DelayQueue

是一個

BlockingQueue

(無界阻塞)隊列,它本質就是封裝了一個

PriorityQueue

(優先隊列),

PriorityQueue

内部使用

完全二叉堆

(不知道的可以自行了解)來實作隊列元素排序,我們在向

DelayQueue

隊列中添加元素時,會給元素一個

Delay

(延遲時間)作為排序條件,隊列中最小的元素會優先放在隊首。隊列中的元素隻有到了

Delay

時間才允許從隊列中取出。隊列中可以放基本資料類型或自定義實體類,在存放基本資料類型時,優先隊列中元素預設升序排列,自定義實體類就需要我們根據類屬性值比較計算了。

要實作

DelayQueue

延時隊列,隊中元素要

implements

Delayed

 接口。Delayed接口裡隻有一個

getDelay

方法,用于設定延期時間。DelayedNotify類中

compareTo

方法負責對隊列中的元素進行排序。ps:因業務需要我這裡使用多線程去執行notify,是以也 implements Runnable接口,并重寫run方法。

幾種延時隊列的實作方案
幾種延時隊列的實作方案

DelayQueue

put

方法是線程安全的(内部調用的offer方法),offer方法内部使用了

ReentrantLock

鎖進行線程同步。

DelayQueue

還提供了兩種出隊的方法 

poll()

 和 

take()

 , 

poll()

 為非阻塞擷取,沒有到期的元素直接傳回null;

take()

 阻塞方式擷取,沒有到期的元素線程将會等待。另外,延遲隊列也支援元素的删除-remove方法。

幾種延時隊列的實作方案
幾種延時隊列的實作方案
幾種延時隊列的實作方案
幾種延時隊列的實作方案

2、Quartz 定時任務

Quartz

一款非常經典任務排程架構,在

Redis

RabbitMQ

還未廣泛應用時,逾時未支付取消訂單功能都是由定時任務實作的。定時任務它有一定的周期性,可能很多單子已經逾時,但還沒到達觸發執行的時間點,那麼就會造成訂單處理的不夠及時。

使用也很友善,隻需在依賴中引入

quartz

架構依賴包,并在啟動類上使用

@EnableScheduling

注解開啟定時任務功能。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>      

關于Cron表達式,可閱讀之前的文章--定時任務

3、Redis

⚠️注:工具類使用的是redisTemplate

 3.1 Zset

有序集合Zset可用來實作簡單的延遲隊列,将消息資料序列化,作為Zset的基本元素,将消息生成時間戳 + 消息處理延遲時間戳作為score,每次通過zRangeByScore擷取一條消息進行處理,後通過zRem删除集合元素:相當于移除需要消費的 Job。

實作簡單,适合做中小型對延遲時間要求不高的業務場景。

幾種延時隊列的實作方案
幾種延時隊列的實作方案

 3.2 expire回調

Redis

 的

key

過期回調事件,也能達到延遲隊列的效果,簡單來說我們開啟監聽key是否過期的事件,一旦key過期會觸發一個callback事件。

修改

redis.conf

檔案開啟

notify-keyspace-events Ex

notify-keyspace-events Ex      

Redis

監聽配置,注入Bean 

RedisMessageListenerContainer

@Configuration
public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}      

編寫Redis過期回調監聽方法,必須繼承

KeyExpirationEventMessageListener

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
 
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("監聽到key:" + expiredKey + "已過期");
    }
}      

4、RabbitMQ 延時隊列

使用 

RabbitMQ

 做延時隊列是比較常見的一種方式,而實際上

RabbitMQ

 自身并沒有直接支援提供延遲隊列功能,而是通過 

RabbitMQ

 消息隊列的 

TTL

和 

DXL

這兩個屬性間接實作的。

先來認識一下 

TTL

DXL

兩個概念:

Time To Live

(

TTL

) :

TTL

 顧名思義:指的是消息的存活時間,

RabbitMQ

可以通過

x-message-tt

參數來設定指定

Queue

(隊列)和 

Message

(消息)上消息的存活時間,它的值是一個非負整數,機關為微秒。

RabbitMQ

 可以從兩種次元設定消息過期時間,分别是

隊列

消息本身

  • 設定隊列過期時間,那麼隊列中所有消息都具有相同的過期時間。
  • 設定消息過期時間,對隊列中的某一條消息設定過期時間,每條消息

    TTL

    都可以不同。

如果同時設定隊列和隊列中消息的

TTL

,則

TTL

值以兩者中較小的值為準。而隊列中的消息存在隊列中的時間,一旦超過

TTL

過期時間則成為

Dead Letter

(死信)。

Dead Letter Exchanges

DLX

DLX

即死信交換機,綁定在死信交換機上的即死信隊列。

RabbitMQ

的 

Queue

(隊列)可以配置兩個參數

x-dead-letter-exchange

x-dead-letter-routing-key

(可選),一旦隊列内出現了

Dead Letter

(死信),則按照這兩個參數可以将消息重新路由到另一個

Exchange

(交換機),讓消息重新被消費。

x-dead-letter-exchange

:隊列中出現

Dead Letter

後将

Dead Letter

重新路由轉發到指定 

exchange

(交換機)。

x-dead-letter-routing-key

:指定

routing-key

發送,一般為要指定轉發的隊列。

隊列出現

Dead Letter

的情況有:

  • 消息或者隊列的

    TTL

    過期
  • 隊列達到最大長度
  • 消息被消費端拒絕(basic.reject or basic.nack)

下邊結合一張圖看看如何實作超30分鐘未支付關單功能,我們将訂單消息A0001發送到延遲隊列

order.delay.queue

,并設定

x-message-tt

消息存活時間為30分鐘,當到達30分鐘後訂單消息A0001成為了

Dead Letter

(死信),延遲隊列檢測到有死信,通過配置

x-dead-letter-exchange

,将死信重新轉發到能正常消費的關單隊列,直接監聽關單隊列處理關單邏輯即可。

發送消息時指定消息延遲的時間

public void send(String delayTimes) {
        amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延遲資料", message -> {
            // 設定延遲毫秒值
            message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
            return message;
        });
    }
}      

設定延遲隊列出現死信後的轉發規則

/**
     * 延時隊列
     */
    @Bean(name = "order.delay.queue")
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(RabbitConstant.DEAD_LETTER_QUEUE)
                // 配置到期後轉發的交換
                .withArgument("x-dead-letter-exchange", "order.close.exchange")
                // 配置到期後轉發的路由鍵
                .withArgument("x-dead-letter-routing-key", "order.close.queue")
                .build();
    }      

繼續閱讀