天天看點

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

背景介紹

很多小夥伴們都跟我留言說過一個類似的問題,就是針對于任務排程架構而言的選取,很多公司都會采用任務排程架構的鼻祖Quartz,那麼我們來梳理以下Java領域的任務排程架構吧。

Java領域的定時任務的架構

單機級别任務排程
  • TimeTask:是一個定時器類,通過該類可以為指定的定時任務進行配置。TimerTask類是一個定時任務類,該類實作了Runnable接口,缺點異常未檢查會中止線程。
  • ScheduledExecutorService:相對延遲或者周期作為定時任務排程,缺點沒有絕對的日期或者時間。
  • spring定時架構:配置簡單功能較多,如果系統使用單機的話可以優先考慮spring定時器

給予Java原生的一些任務排程架構後有很多組織機構推陳出新,出現了很多的優秀的任務排程架構,它們除了可以實作任務排程,還可以實作了分布式體系下的運作任務。例如以下:

分布式級别任務排程
  • Quartz:Java實作的定時任務排程架構,但Quartz關注點在于定時任務而非資料,并沒有一套根據資料處理而定制化的流程。雖然Quartz可以基于資料庫實作作業的高可用,但缺少分布式并行排程的功能。
  • TBSchedule:阿裡早期開源的分布式任務排程系統,代碼較為陳舊,使用timer而非線程池執行任務排程。衆所周知,timer在處理異常狀況時是有缺陷的。而且TBSchedule作業類型較為單一,隻能是擷取/處理資料一種模式。還有就是文檔缺失比較嚴重,而且目前已經沒有對應的人進行維護了,(處理方式主要分為“搶占式”和“協同配置設定式”,通過叢集的節點分擔大批量任務的處理,提高批量任務的處理效率)。
  • Elastic-job:當當開發的彈性分布式任務排程系統,功能豐富強大,采用zookeeper實作分布式協調,實作任務高可用以及分片,并且可以支援雲開發,但是開發的模式有點侵入性,但也是一個不錯的選擇方向
  • Saturn:是唯品會自主研發的分布式的定時任務的排程平台,基于當當的elastic-job 版本開發的,并且可以很好的部署到docker容器上,目前本人還沒有做過
  • Xxl-job: 是大衆點評員工徐雪裡于2015年釋出的分布式任務排程平台,是一個輕量級分布式任務排程架構,其核心設計目标是開發迅速、學習簡單、輕量級、易擴充。

回到主旨如何建立Spring的分布式架構

這時候你會說,如果我們已經有了很多以上的分布式能力的任務排程架構了,我們為什麼還要再進行改造Spring模式的任務排程架構呢,好的,就這事一個問題,如果當我們的任務排程不需要那麼大的資源以及不能夠依靠以上的開源架構去處理的時候,我們隻能采用單機版的場景下,那該怎麼辦呢?比如說,公司不需要提供額外的部署分布式排程、資源不足不想暴漏任務排程服務的單點問題等場景、甚至很多人習慣了使用任務排程了@Schedule,你讓他直接改成以上的開發模式,成本是否也會很高?這該咋辦?那麼種種不能使用以上開源的分布式任務排程架構的時候,才有了我們今天的課題系列。

廢話不多說直接上幹貨

先說以下我們所需要的原材料有哪些,首先Spring架構是必須的,之後還需要就是可以連接配接Redis的元件如:jedis、lettuce、Redission等。本次我們采用的是Redission架構,如果有用其他的小夥伴可以直接替換理念就好。

分析以下Spring的@Schedule的工作原理

@EnableScheduling這個注解

以下是源碼:

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

可以看出Schedule排程架構主要啟動需要依靠SchedulingConfiguration這個類進行解析掃描所有的元件。我們深入進去看一下。

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

别的我們都不看,我們就關注一個類:ScheduledAnnotationBeanPostProcessor,相信對Spring了解的小夥伴們應該知道了,這個是一個注解注入的掃描器類,專門處理bean的生命周期的,專門講@Scheduled的方法對應的類進行動态代理植入到對應的容器内,并且配置設定對應的任務排程觸發器機制,以下就是ScheduledAnnotationBeanPostProcessor的處理類的源碼。

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

再次我們重點關注代碼,後置處理的類攔截機制,他會掃描出所有相關的@Schedule對應的類和方法:

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

然後進行循環調用processScheduled的方法:

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

我們再來看一下processScheduled方法裡面最直接封裝到對應的schedule調用機制的方法片段。

cron模式

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

delay模式

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

fix模式

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

以上類型分為了三種模式:Cron模式、fix模式、delay模式等,相信了解任務排程的小夥伴們并不陌生吧!

最重要的改造點來了

我們如何通過以上分析得出的結論講我們的@Schedule改造為分布式模式的呢,關鍵點就在這裡,下圖示黃的位置createRunnable方法,這也是Spring提供給我們唯一可以改造的地方。

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

createRunnable

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule
進入createRunnable方法之後,我們需要看一下最後建立出的對象是什麼?就是這個ScheduledMethodRunnable,進入這裡的ScheduledMethodRunnable的類後可以看到如下。
【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

改造位置

【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

這裡就是真正要執行的代碼,我們隻需要再這裡加入分布式鎖,進行攔截,保證多個任務排程,同時隻會有一個方法進行執行是否就可以保證了呢。但是如果你說需要進行分片處理,也是可以通過資料條件進行分隔,但是那又是另外一回事了。是以是不是很好就可以解決了?那我們來實作一下啊,以下就是小編完成的分布式改造後的成品。

我的分布式任務排程

定義屬于我的分布式排程注解
我才用了加入字首Distribute用來區分和原來的任務排程。

啟動分布式任務排程機制

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DistributeSchedulingConfiguration.class)
@Documented
public @interface EnableDistributeScheduling {}      

辨別任務排程機制(1)

源碼直接拷貝過來改一下改一下名稱就好
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributeScheduleds {
    DistributeScheduled[] value();
}      

辨別任務排程機制(2)

源碼直接拷貝過來改一下改一下名稱就好
【Spring專題】「開發指南」手把手教你将@Schedule任務排程更新為分布式排程@DistributeSchedule

對應的EnableDistributeScheduling的配置類服務

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class DistributeSchedulingConfiguration {

    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public DistributeScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new DistributeScheduledAnnotationBeanPostProcessor();
    }
}      

定義DistributeSchedulingConfiguration中的DistributeScheduledAnnotationBeanPostProcessor處理器

隻需要找原來的樣子調整成為return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);即可。

/**
     * Create a {@link Runnable} for the given bean instance,
     * calling the specified scheduled method.
     * <p>The default implementation creates a {@link ScheduledMethodRunnable}.
     * @param target the target bean instance
     * @param method the scheduled method to call
     * @since 5.1
     * @see ScheduledMethodRunnable#ScheduledMethodRunnable(Object, Method)
     */
    protected Runnable createRunnable(Object target, Method method) {
        Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @DistributeScheduled");
        Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
        RedisDistributionLock redisDistributionLock = applicationContext.getBean(RedisDistributionLock.class);
        return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);
    }      

定義我的DistributeScheduledMethodRunnable的runnable對象

添加分布式鎖元件

改造run方法

繼續閱讀