天天看點

延時任務(三)-基于redis zset的完整實作

所謂的延時任務給大家舉個例子:你買了一張火車票,必須在30分鐘之内付款,否則該訂單被自動取消。「訂單30分鐘不付款自動取消,這個任務就是一個延時任務。」   我之前已經寫過2篇關于延時任務的文章:

  • ​​《完整實作-通過DelayQueue實作延時任務》​​
  • ​​《延時任務(二)-基于netty時間輪算法實戰》​​

這兩種方法都有一個缺點:都是基于單體應用的記憶體的方式運作延時任務的,一旦出現單點故障,可能出現延時任務資料的丢失。是以此篇文章給大家介紹實作延時任務的第三種方式,結合redis zset實作延時任務,可以解決單點故障的問題。給出實作原理、完整實作代碼,以及這種實作方式的優缺點。

一、實作原理

首先來介紹一下實作原理,我們需要使用redis zset來實作延時任務的需求,是以我們需要知道zset的應用特性。zset作為redis的有序集合資料結構存在,排序的依據就是score。

延時任務(三)-基于redis zset的完整實作

是以我們可以利用zset score這個排序的這個特性,來實作延時任務

  • 在使用者下單的時候,同時生成延時任務放入redis,key是可以自定義的,比如:​

    ​delaytask:order​

  • value的值分成兩個部分,一個部分是score用于排序,一個部分是member,member的值我們設定為訂單對象(如:訂單編号),因為後續延時任務時效達成的時候,我們需要有一些必要的訂單資訊(如:訂單編号),才能完成訂單自動取消關閉的動作。
  • 「延時任務實作的重點來了,score我們設定為:訂單生成時間 + 延時時長」。這樣redis會對zset按照score延時時間進行排序。
  • 開啟redis掃描任務,擷取"目前時間 > score"的延時任務并執行。即:目前時間 >  訂單生成時間 + 延時時長的時候 ,執行延時任務。

二、準備工作

使用 redis zset 這個方案來完成延時任務的需求,首先肯定是需要redis,這一點毫無疑問。redis的搭建網上有很多的文章,我這裡就不贅述了。

其次,筆者長期的java類應用系統開發都是使用SpringBoot來完成,是以也是習慣使用SpringBoot的redis內建方案。首先通過maven坐标引入spring-boot-starter-data-redis

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>      

其次需要在Spring Boot的​

​application.yml​

​配置檔案中,配置redis資料庫的連結資訊。我這裡配置的是redis的單例,如果大家的生産環境是哨兵模式、或者是叢集模式的redis,這裡的配置方式需要進行微調。其實這部分内容在我的個人部落格裡面都曾經系統的介紹過,感興趣的朋友可以關注我的個人部落格。

spring:
  redis:
    database: 0 # Redis 資料庫索引(預設為 0)
    host: 192.168.161.3 # Redis 伺服器位址
    port: 6379 # Redis 伺服器連接配接端口
    password: 123456 # Redis 伺服器連接配接密碼(預設為空)
    timeout:  5000  # 連接配接逾時,機關ms
    lettuce:
      pool:
        max-active: 8 # 連接配接池最大連接配接數(使用負值表示沒有限制) 預設 8
        max-wait: -1 # 連接配接池最大阻塞等待時間(使用負值表示沒有限制) 預設 -1
        max-idle: 8 # 連接配接池中的最大空閑連接配接 預設 8
        min-idle: 0 # 連接配接池中的最小空閑連接配接 預設 0      

三、代碼實作

下面的這個類就是延時任務的核心實作了,一共包含三個核心方法,我們來一一說明一下:

  • produce方法,用于生成訂單-order為訂單資訊,可以是訂單流水号,用于延時任務達到時效後關閉訂單
  • afterPropertiesSet方法是InitializingBean接口的方法,之是以實作這個接口,是因為我們需要在應用啟動的時候開啟redis掃描任務。即:當OrderDelayService bean初始化的時候,開啟redis掃描任務循環擷取延時任務資料。
  • consuming函數,用于從redis擷取延時任務資料,消費延時任務,執行逾時訂單關閉等操作。為了避免阻塞for循環,影響後面延時任務的執行,是以這個consuming函數一定要做成異步的,參考Spring Boot異步任務及​

    ​Async​

    ​注解的使用方法。我之前寫過一個SpringBoot的「可觀測、易配置」的異步任務線程池開源項目,源代碼位址:https://gitee.com/hanxt/zimug-monitor-threadpool  。我的這個zimug-monitor-threadpool開源項目,可以做到對線程池使用情況的監控,我自己平時用的效果還不錯,向大家推薦一下!
@Component
public class OrderDelayService  implements InitializingBean {
  //redis zset key
  public static final String ORDER_DELAY_TASK_KEY = "delaytask:order";

  @Resource
  private StringRedisTemplate stringRedisTemplate;

  //生成訂單-order為訂單資訊,可以是訂單流水号,用于延時任務達到時效後關閉訂單
  public void produce(String orderSerialNo){
    stringRedisTemplate.opsForZSet().add(
            ORDER_DELAY_TASK_KEY,     // redis key
            orderSerialNo,    // zset  member
            //30分鐘延時
            System.currentTimeMillis() + (30 * 60 * 1000)    //zset score
    );
  }

  //延時任務,也是異步任務,延時任務達到時效之後關閉訂單,并将延時任務從redis zset删除
  @Async("test")
  public void consuming(){
       
      Set<ZSetOperations.TypedTuple<String>> orderSerialNos = stringRedisTemplate.opsForZSet().rangeByScoreWithScores(
              ORDER_DELAY_TASK_KEY,
              0,  //延時任務score最小值
              System.currentTimeMillis() //延時任務score最大值(目前時間)
      );
      if (!CollectionUtils.isEmpty(orderSerialNos)) {
        for (ZSetOperations.TypedTuple<String> orderSerialNo : orderSerialNos) {
          //這裡根據orderSerialNo去檢查使用者是否完成了訂單支付
          //如果使用者沒有支付訂單,去執行訂單關閉的操作
          System.out.println("訂單" + orderSerialNo.getValue() + "逾時被自動關閉");
          //訂單關閉之後,将訂單延時任務從隊列中删除
          stringRedisTemplate.opsForZSet().remove(ORDER_DELAY_TASK_KEY, orderSerialNo.getValue());
        }
      }
  }

  //該類對象Bean執行個體化之後,就開啟while掃描任務
  @Override
  public void afterPropertiesSet() throws Exception {
    new Thread(() -> {  //開啟新的線程,否則SpringBoot應用初始化無法啟動
      while(true){
        try {
          Thread.sleep(5 * 1000);   //每5秒掃描一次redis庫擷取延時資料,不用太頻繁沒必要
        } catch (InterruptedException e) {
          e.printStackTrace();  //本文隻是示例,生産環境請做好相關的異常處理
        }
        consuming();
      }
    }).start();
  }
}      

更多的内容參考代碼中的注釋,需要關注的點是:

  • 上文中的rangeByScoreWithScores方法用于從redis中擷取延時任務,score大于0小于目前時間的所有延時任務,都将被從redis裡面取出來。每5秒執行一次,是以延時任務的誤差不會超過5秒。
  • 上文中的訂單資訊,我隻保留了訂單唯一流水号,用于關閉訂單。如果你的業務需要傳遞更多的訂單資訊,請使用RedisTemplate操作訂單類對象,而不是StringRedisTemplate操作訂單流水号字元串。

訂單下單的時候,使用如下的方法,将訂單序列号放入redis zset中即可實作延時任務

orderDelayService.produce("這裡填寫訂單編号");      

四、優缺點

使用redis zset來實作延時任務的優點是:相對于本文開頭介紹的兩種方法,我們的延時任務是儲存在redis裡面的,redis具有資料持久化的機制,可以有效的避免延時任務資料的丢失。另外,redis還可以通過哨兵模式、叢集模式有效的避免單點故障造成的服務中斷。至于缺點嘛,我覺得沒什麼缺點。如果非要勉強的說一個缺點的話,那就是我們需要額外維護redis服務,增加了硬體資源的需求和運維成本。但是現在随着微服務的興起,redis幾乎已經成了應用系統的标配,redis複用即可,是以我感覺這也算不上什麼缺點吧!

繼續閱讀