天天看點

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

為何要使用分布式任務排程

示範項目源碼位址** https://gitee.com/yongzhebuju/spring-task **

**本人部落格網站 **IT小神 www.itxiaoshen.com

在企業系統開發過程中難免少不了要使用定時任務來進行定時觸發執行,對于非分布式環境系統的單一應用來說則非常容易解決,我們隻需要在系統中内部內建一些開源的排程庫配置定時觸發即可;但是随着企業的系統越來越多,逐漸從單一應用慢慢演變為微服務,在分布式系統中常見的任務重複執行、任務負載均衡、統一任務排程、任務故障轉移、任務監控和報警等一些列的問題都是需要在分布式系統中進行解決的,分布式任務排程則應運而生

Java定時常用方式

基礎理論原理

很多人寫過基于線程的while+sleep來實作定時任務完成一些定時背景任務,而Jdk原生也有提供定時器實作;一般定時器實作底層有下面幾種原理,主要涉及資料結構和算法的應用

  • 小頂堆
    • 堆看作一個數組,******也可以****被看作一個完全二叉樹,通俗來講堆其實就是**利用完全二叉樹的結構來維護的一維數組
    • 每個結點的值都小于或等于其左右孩子結點的值
    • t通過建堆和堆化操作,從定時任務上使用每次可以從堆頂取出最近一個需要執行的任務
  • 時間輪算法(顧名思義就是以時間)
    • 連結清單或數組實作時間輪
      • 通過while true sleep然後周遊數組,每個數組下标放置一個連結清單,而這個連結清單放置定時任務,隻要周遊到就取出執行
    • round型時間輪
      • 任務記錄一個round值,周遊到就減1,為0時取出執行
      • 需要周遊所有任務,效率較低
    • 分層時間輪
      • 使用多個不同次元的時間輪
        • 天輪,記錄幾點執行
        • 月輪,記錄幾号執行
      • 當在月輪周遊好了取出放到天輪,以這樣方式時間幾月幾号執行

Jdk Timer

Jdk的timer核心實作

  • 最小堆:queue,存放TimerTask
  • 任務線程:TimerThread,任務執行線程,繼承自Thread基類,死循環判斷是否有任務需要執行
  • 單線程:執行任務,任務可能會互相阻塞
    • schedule
    • scheduleAtFixedRate
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程
package com.itxs.timer;

import java.util.TimerTask;  

public class MyTimerTask extends TimerTask {

    @Override
    public void run() {
        System.out.println("hello my timer task");
    }
}

package com.itxs.timer;

import org.joda.time.DateTime; //joda-time日期類型庫

import java.util.Timer;

public class JdkTimer {
    public static void main(String[] args) {
        Timer timer = new Timer();
        //指定時間點執行
        timer.schedule(new MyTimerTask(),new DateTime(2021,8,26,17,19,30).toDate());
        //延遲兩秒執行,然後再每個3秒執行
        timer.schedule(new MyTimerTask(),2000,3000);
    }
}
           

Jdk定時任務線程池

核心實作也是小頂堆,無界隊列,可以使用多線程執行任務,有Leader-Follower模式,避免沒必要阻塞和喚醒操作,節省系統資源

package com.itxs.scheduler;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class SchedulerThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        //延遲3秒執行,執行一次
        scheduledExecutorService.schedule(new MySchedulerThreadPoolTask(),3,TimeUnit.SECONDS);
        //延遲3秒執行,之後每隔十秒執行
        scheduledExecutorService.scheduleAtFixedRate(new MySchedulerThreadPoolTask(),3,10,TimeUnit.SECONDS);
    }
}

package com.itxs.scheduler;

public class MySchedulerThreadPoolTask implements Runnable{
    @Override
    public void run() {
        System.out.println("MySchedulerThreadPoolTask");
    }
}
           

Spring Task

下面我們借助Spring Boot來示範下Spring Task,配置為多線程模式

package com.itxs.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;

import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class ScheduleConfig {

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(10);
        threadPoolTaskScheduler.setThreadNamePrefix("spring-task-thread");
        return threadPoolTaskScheduler;
    }
}
           
package com.itxs.scheduler;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MySpringTask {

    @Scheduled(cron = "0/30 * * * * ?")
    private void process(){
        System.out.println("MySpringTask:"+Thread.currentThread());
    }
}
           
package com.itxs;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling //注意需要在Spring Boot啟動類上加開啟Spring任務的注解
public class SpringTaskApplication {
    public static void main(String[] args) {
        new SpringApplication().run(SpringTaskApplication.class,args);
    }
}
           
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

Quartz定時任務架構

定義

Quartz是一個功能豐富的開源作業排程庫,可以內建到幾乎任何Java應用程式中——從最小的獨立應用程式到最大的電子商務系統。Quartz可用于建立簡單或複雜的排程,以執行數十個、數百個甚至數萬個作業;這些作業的任務被定義為标準Java元件,這些元件實際上可以執行您程式設計讓它們執行的任何事情。

運作環境

  • Quartz可以嵌入到另一個獨立的應用程式中運作
  • Quartz可以在應用程式伺服器(或servlet容器)中執行個體化,并參與XA事務
  • Quartz可以作為一個獨立的程式(在它自己的Java虛拟機中)運作,通過RMI使用
  • 可以将Quartz執行個體化為一個獨立程式叢集(具有負載平衡和故障轉移功能),用于執行作業

持久化

  • Quartz的設計包括一個JobStore接口,該接口可以實作為作業的存儲提供各種機制。
  • 通過使用包含的Jdbc JobStore,所有配置為“非易失性”的job和觸發器都通過JDBC存儲在關系資料庫中。
  • 使用包含的RAM JobStore,所有的job和觸發器都存儲在RAM中,是以不會在程式執行之間持久化——但這樣做的好處是不需要外部資料庫

叢集

  • 故障轉移
  • 負載平衡
  • Quartz的内置叢集特性依賴于通過JDBCJobStore(前面描述過)實作的資料庫持久性。
  • Terracotta對Quartz的擴充提供了叢集功能,而不需要支援資料庫。

使用說明

可以使用 SchedulerFactory 類來達到程式排程的目的,一旦排程器執行個體化後,它就能夠啟動,等待執行和關閉。需要注意的是一旦排程器調用 了shutdown 方法關閉後,如果不重新執行個體化,它就不會啟動了。觸發器在排程器未啟動時,或是終止狀态時,都不會被觸發

  • Scheduler - 與排程程式互動的主要API。
  • Job - 你想要排程器執行的任務元件需要實作的接口
  • JobDetail - 用于定義作業的執行個體。
  • Trigger - 觸發器,定義執行給定作業的計劃的元件。
  • JobBuilder - 用于定義/建構 JobDetail 執行個體,用于定義作業的執行個體。
  • TriggerBuilder - 用于定義/建構觸發器執行個體。
  • Scheduler 的生命期 - 從 SchedulerFactory 建立它時開始,到 Scheduler 調用shutdown() 方法時結束;Scheduler 被建立後,可以增加、删除和列舉 Job 和 Trigger,以及執行其它與排程相關的操作(如暫停 Trigger)。但是,Scheduler 隻有在調用 start() 方法後,才會真正地觸發 trigger(即執行 job)

Spring Boot 整合Quartz

我們本篇的主角ElasticJob底層是依賴Quartz實作的,是以我們有必要先簡單了解Quartz使用,本篇采用jdbc持久化模式,我們這裡選擇基于mysql的持久化,是以需要将tables_mysql_innodb.sql包含11張表導入到mysql資料庫中

靜态配置Quartz任務

pom檔案内容如下,大部分都是常見啟動器,我們重點是spring-boot-starter-quartz

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itxs</groupId>
    <artifactId>spring-task</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.5.2</version>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <quartz-springboot.version>2.5.2</quartz-springboot.version>
        <lombok.version>1.18.20</lombok.version>
        <druid.version>1.2.6</druid.version>
        <mysql.version>8.0.25</mysql.version>
        <mybatis-plus.version>3.4.0</mybatis-plus.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        //Spring Boot整合Quartz的啟動器
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
            <version>${quartz-springboot.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
            <optional>true</optional>
        </dependency>
      
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

    </dependencies>
</project>
           

application配置檔案,profiles激活dev環境,在dev環境中進行資料庫配置,include: quartz包含一個單獨配置檔案,在裡面可以配置多個Quantz的任務參數

application.yml

spring:
  application:
    name: itxs-spring-task
  profiles:
    active: dev
    include: quartz
  quartz:
    job-store-type: jdbc # 使用資料庫存儲
    scheduler-name: cluster_scheduler # 相同 Scheduler 名字的節點,形成一個 Quartz 叢集
    wait-for-jobs-to-complete-on-shutdown: true # 應用關閉時,是否等待定時任務執行完成。預設為 false ,建議設定為 true
    jdbc:
      initialize-schema: never # 是否自動使用 SQL 初始化 Quartz 表結構。這裡設定成 never ,我們手動建立表結構。
mybatis-plus:
  #  mapper-locations: classpath:mapper/*.xml
  global-config:
    db-config:
      id-type: auto
      logic-delete-field: deleted
      logic-delete-value: 1
      logic-not-delete-value: 0
  configuration:
    map-underscore-to-camel-case: on
    call-setters-on-nulls: on
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
           

application-dev.yml

server:
  port: 8080
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.3.117:3306/testdb?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
    username: itxs
    password: [email protected]
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      max-active: 1000
      min-idle: 5
      initial-size: 10
           

application-quartz.yml

quartz:
  # jobGroup名稱一緻的情況下,不可出現相同jobName
  jobs[0]:
    jobName: myJob1
    # 以服務名為組名
    jobGroup: myGroup1
    # 業務邏輯處理類的包名
    jobClassName: com.itxs.scheduler.MySpringQuartzOneJob
    # 觸發器名稱
    triggerName: myTrigger1
    # cron表達式 每30秒執行一次
    cronExpression: 0/30 * * * * ?
    # 任務狀态 1 正常 0 暫停
    triggerState: 1
    # 排序
    sort: 1
  jobs[1]:
    jobName: myJob2
    # 以服務名為組名
    jobGroup: myGroup2
    # 業務邏輯處理類的包名
    jobClassName: com.itxs.scheduler.MySpringQuartzSecondJob
    # 觸發器名稱
    triggerName: myTrigger2
    # cron表達式 每分鐘執行一次
    cronExpression: 0 * * * * ?
    # 任務狀态 1 正常 0 暫停
    triggerState: 1
    # 排序
    sort: 2
           

配置類,主要配置schedulerFactoryBean和線程池,初始化quartz的scheduler

package com.itxs.config;

import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.concurrent.Executor;

@Configuration
public class ScheduleConfig {

    @Autowired
    private DataSource dataSource;

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(10);
        threadPoolTaskScheduler.setThreadNamePrefix("spring-task-thread");
        return threadPoolTaskScheduler;
    }

    @Value("${spring.quartz.job-store-type}")
    private String storeType;

    @Bean
    public Scheduler scheduler(){
        return schedulerFactoryBean().getScheduler();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(){
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        schedulerFactoryBean.setSchedulerName("cluster_scheduler");
        if (storeType.equals("jdbc")){
            schedulerFactoryBean.setDataSource(dataSource);
        }
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("application");
        schedulerFactoryBean.setTaskExecutor(schedulerThreadPool());
        schedulerFactoryBean.setStartupDelay(0);
        return schedulerFactoryBean;
    }

    @Bean
    public Executor schedulerThreadPool(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
        threadPoolTaskExecutor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
        return threadPoolTaskExecutor;
    }
}
           

Quartz實作類,內建QuartzJobBean實作executeInternal的接口

package com.itxs.scheduler;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
import java.util.Date;

/**
 * 運作在spring
 */
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
@Component
public class MySpringQuartzJob extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext){
        try {
            log.info("MySpringQuartzJob------排程執行個體:{},任務名稱:{},執行時間:{}" + jobExecutionContext.getScheduler().getSchedulerInstanceId(),
                    jobExecutionContext.getJobDetail().getKey().getName(),new Date());
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}
           

接下來是建立監聽器并将job啟動執行,在Spring 容器重新整理後執行監聽器,SpringQuartzApplicationListener是在将Quantz Job配置直接寫在代碼裡,而SpringQuartzYamlApplicationListener則是讀取application-quartz.yml裡面的每個job的配置然後循環建立

package com.itxs.listener;

import com.itxs.scheduler.MySpringQuartzJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringQuartzApplicationListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private Scheduler scheduler;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("SpringQuartzApplicationListener quartz排程任務建立開始-------");
        TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", "group1");
        try {
            Trigger trigger = scheduler.getTrigger(triggerKey);
            System.out.println(scheduler.getSchedulerName());
            if (trigger == null){
                trigger = TriggerBuilder.newTrigger()
                        .withIdentity(triggerKey)
                        .withSchedule(CronScheduleBuilder.cronSchedule("0 * * * * ?"))
                        .startNow()
                        .build();
                JobDetail jobDetail = JobBuilder.newJob(MySpringQuartzJob.class)
                        .withIdentity("job1","group1")
                        .build();
                scheduler.scheduleJob(jobDetail,trigger);
                scheduler.start();
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        log.info("SpringQuartzApplicationListener quartz排程任務建立結束-------");
    }
}
           
package com.itxs.listener;


import com.itxs.pojo.JobEntity;
import com.itxs.utils.QuartzEnum;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
@ConfigurationProperties(prefix = "quartz")
public class SpringQuartzYamlApplicationListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private Scheduler scheduler;

    List<JobEntity> jobs = new ArrayList<>();

    public List<JobEntity> getJobs() {
        return jobs;
    }

    public void setJobs(List<JobEntity> jobs) {
        this.jobs = jobs;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("SpringQuartzYamlApplicationListener quartz排程任務建立開始-------");
        for(JobEntity entity : jobs) {
            log.info("排程任務-------"+jobs.toString());
            // 當定時任務狀态為0時,不啟動
            if (entity.getTriggerState() == QuartzEnum.PAUSED.getCode()) {
                continue;
            }

            try {
                Class<? extends Job> jobClass = (Class<? extends Job>) (Class.forName(entity.getJobClassName()).newInstance().getClass());
                if (jobClass != null){
                    TriggerKey triggerKey = TriggerKey.triggerKey(entity.getTriggerName(), entity.getJobGroup());
                    Trigger trigger = scheduler.getTrigger(triggerKey);
                    if (trigger == null){
                        trigger = TriggerBuilder.newTrigger()
                                .withIdentity(triggerKey)
                                .withSchedule(CronScheduleBuilder.cronSchedule(entity.getCronExpression()))
                                .startNow()
                                .build();

                        Map<String,Object> map = new HashMap<>();
                        map.put("objectName","object");
                        JobDataMap jobDataMap = new JobDataMap(map);

                        JobDetail jobDetail = JobBuilder.newJob(jobClass)
                                .usingJobData(jobDataMap)
                                .withIdentity(entity.getJobName(), entity.getJobGroup())
                                .build();
                        scheduler.scheduleJob(jobDetail,trigger);
                    }
                }
            } catch (SchedulerException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
                e.printStackTrace();
            }
        }

        try {
            scheduler.start();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        log.info("SpringQuartzYamlApplicationListener quartz排程任務建立結束-------");
    }
}
           

調用的Json參數為

{
    "cron": "0/3 * * * * ?",
    "beginTime": "2021-08-27",
    "clazzName": "com.itxs.scheduler.RemindJob",
    "jobGroup": "mygroup",
    "jobName": "myjob",
    "parmas": "elastic job dynamic hello world"
}
           

啟動Spring Boot啟動類,日志顯示quartz使用db持久化方式,所有的job實作類也按照配置參數定時執行,并寫持久化到mysql資料庫裡quartz表裡

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

動态配置Quantz

如果我們需要針對定時任務進行建立、停止等操作,那麼我們需要動态操作Quantz,本篇也基于Spring Boot + Quartz封裝任務排程實作了動态管理

詳細代碼可以到gitee項目源碼裡擷取

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-72e5t825-1630077809479)(file://F:\creation\markdown\article\ElasticJob%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%BA%94%E7%94%A8\image-20210827150653182.png?lastModify=1630077763)]

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

ElasticJob-分布式任務排程

其他架構名詞了解

開源的分布式任務或作業排程架構除了我們本篇當當的ElasticJob,還有大衆點評開發人員許雪裡的XXL-JOB、唯品會Saturn、淘寶的TBSchedule和SchedulerX,此外另外一個在大資料批處理作業排程器Azkaban也非常出名,

XXL-JOB:是一個分布式任務排程平台,其核心設計目标是開發迅速、學習簡單、輕量級、易擴充。

TBSchedule:一個簡潔的分布式任務排程引擎,基于ZooKeeper純Java實作,由Alibaba開源。

SchedulerX:Alibaba Cloud SchedulerX精确、高可靠、高可用的排程任務服務,響應時間在秒内,SchedulerX(分布式任務排程) 是隸屬于阿裡雲EDAS産品的元件, Spring Cloud AliCloud SchedulerX 提供了在Spring Cloud的配置規範下,分布式任務排程的功能支援。SchedulerX可提供秒級、精準、高可靠、高可用的定時任務排程服務,并支援多種類型的任務排程,如簡單單機任務、簡單多機任務、腳本任務以及網格任務。

Saturn:來自唯品會開發的一個分布式、容錯和高可用的作業排程服務。

此外,這裡也提一下Azkaban,Linkedin開源的一個批量工作流排程器,實作可以一個工作流内,多個作業可以按照特定的順序執行,作業之間的順序關系依靠key-value的形式來建立依賴關系,并提供可視化界面編制作業的工作流程。

由于我們基于Java技術大都以SpringBoot開發為主,ElasticJob與Spring整合也相當不錯,且ElasticJob子項目ElasticJob-Lite定位為輕量級無中心化解決方案,是以本篇我們主要推薦使用ElasticJob分布式任務排程架構

概述

shardingsphere官方網站** ,ShardingSphere 已于2020年4月16日成為 Apache 軟體基金會的頂級項目**

ElasticJob官方網站** ElasticJob作為******Apache ShardingSphere****的子項目

Apache ShardingSphere 是一套開源的分布式資料庫解決方案組成的生态圈,它由 JDBC、Proxy 和 Sidecar(規劃中)這 3 款既能夠獨立部署,又支援混合部署配合使用的産品組成。 它們均提供标準化的資料水準擴充、分布式事務和分布式治理等功能,可适用于如 Java 同構、異構語言、雲原生等各種多樣化的應用場景。

Apache ShardingSphere 旨在充分合理地在分布式的場景下利用關系型資料庫的計算和存儲能力,而并非實作一個全新的關系型資料庫。 關系型資料庫當今依然占有巨大市場佔有率,是企業核心系統的基石,未來也難于撼動,我們更加注重在原有基礎上提供增量,而非颠覆。

Apache ShardingSphere 5.x 版本開始緻力于可插拔架構,項目的功能元件能夠靈活的以可插拔的方式進行擴充。 目前,資料分片、讀寫分離、資料加密、影子庫壓測等功能,以及 MySQL、PostgreSQL、SQLServer、Oracle 等 SQL 與協定的支援,均通過插件的方式織入項目。 開發者能夠像使用積木一樣定制屬于自己的獨特系統。Apache ShardingSphere 目前已提供數十個 SPI 作為系統的擴充點,仍在不斷增加中。

  • ElasticJob 是一個面向網際網路生态和海量任務的分布式排程解決方案,由 2 個互相獨立的子項目 ElasticJob-Lite 和 ElasticJob-Cloud 組成。
  • ElasticJob-Lite 定位為輕量級無中心化解決方案,使用jar的形式提供分布式任務的協調服務;
  • ElasticJob-Cloud 使用 Mesos 的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。
  • ElasticJob 通過彈性排程、資源管控、以及作業治理的功能,打造一個适用于網際網路場景的分布式排程解決方案,并通過開放的架構設計,提供多元化的作業生态。
  • ElasticJob 的各個産品使用統一的作業 API,開發者僅需要一次開發,即可随意部署
  • 使用 ElasticJob 能夠讓開發工程師不再擔心任務的線性吞吐量提升等非功能需求,使他們能夠更加專注于面向業務編碼設計; 同時,它也能夠解放運維工程師,使他們不必再擔心任務的可用性和相關管理需求,隻通過輕松的增加服務節點即可達到自動化運維的目的。
  • ElasticJob實作分布式特性主要依賴于Zookeeper,比如leader選舉、彈性擴縮容、故障轉移、負載均衡等機制

可以通過快速入門快速體驗ElasticJob

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

架構

ElasticJob-Lite

定位為輕量級無中心化解決方案,使用 jar 的形式提供分布式任務的協調服務。

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

ElasticJob-Cloud

采用自研 Mesos Framework 的解決方案,額外提供資源治理、應用分發以及程序隔離等功能。

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程
特性 ElasticJob-Lite ElasticJob-Cloud
無中心化
資源配置設定 不支援 支援
作業模式 常駐 常駐 + 瞬時
部署依賴 ZooKeeper ZooKeeper + Mesos

功能清單

  • 彈性排程
    • 支援任務在分布式場景下的分片和高可用
    • 能夠水準擴充任務的吞吐量和執行效率
    • 任務處理能力随資源配備彈性伸縮
  • 資源配置設定
    • 在适合的時間将适合的資源配置設定給任務并使其生效
    • 相同任務聚合至相同的執行器統一處理
    • 動态調配追加資源至新配置設定的任務
  • 作業治理
    • 失效轉移
    • 錯過作業重新執行
    • 自診斷修複
  • 作業依賴(TODO)
    • 基于有向無環圖(DAG)的作業間依賴
    • 基于有向無環圖(DAG)的作業分片間依賴
  • 作業開放生态
    • 可擴充的作業類型統一接口
    • 豐富的作業類型庫,如資料流、腳本、HTTP、檔案、大資料等
    • 易于對接業務作業,能夠與 Spring 依賴注入無縫整合
  • 可視化管控端
    • 作業管控端
    • 作業執行曆史資料追蹤
    • 注冊中心管理

基本使用

作業類型

  • 簡單作業,我們本篇文章主要使用這個,其他後續再補充
  • 資料流作業
  • 腳本作業
  • HTTP作業(3.0.0-beta 提供)

靜态任務配置

前面我們學習Spring Boot 整合Quartz的使用,ElasticJob使用可所謂簡單至極,還是原來我們說的Spring Boot的三闆斧,加依賴和配置,另外增加實作類Ok搞掂。由于我們還用使用之前工程項目,是以依賴和配置内容較多,核心是添加elasticjob-lite-spring-boot-starter和elasticjob項配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itxs</groupId>
    <artifactId>spring-task</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.5.2</version>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <quartz-springboot.version>2.5.2</quartz-springboot.version>
        <lombok.version>1.18.20</lombok.version>
        <druid.version>1.2.6</druid.version>
        <mysql.version>8.0.25</mysql.version>
        <mybatis-plus.version>3.4.0</mybatis-plus.version>
        <elasticjob-lite-core.version>3.0.0-RC1</elasticjob-lite-core.version>
        <curator.version>5.2.0</curator.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
            <version>${quartz-springboot.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-core</artifactId>
            <version>${elasticjob-lite-core.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
            <version>${elasticjob-lite-core.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>${curator.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>${curator.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
        </dependency>

    </dependencies>
</project>
           
spring:
  application:
    name: itxs-spring-task
  profiles:
    active: dev
    include: quartz
  quartz:
    job-store-type: jdbc # 使用資料庫存儲
    scheduler-name: cluster_scheduler # 相同 Scheduler 名字的節點,形成一個 Quartz 叢集
    wait-for-jobs-to-complete-on-shutdown: true # 應用關閉時,是否等待定時任務執行完成。預設為 false ,建議設定為 true
    jdbc:
      initialize-schema: never # 是否自動使用 SQL 初始化 Quartz 表結構。這裡設定成 never ,我們手動建立表結構。
elasticjob:
  reg-center:
    server-lists: 192.168.50.201:2181,192.168.50.204:2182,192.168.50.153:2183
    namespace: itxs-elastic-job
  jobs:
    elasticDemoOneJob:
      elasticJobClass: com.itxs.scheduler.ElasticDemoOneJob
      cron: 0/30 * * * * ?
      shardingTotalCount: 1
      shardingItemParameters: 0=Beijing
mybatis-plus:
  #  mapper-locations: classpath:mapper/*.xml
  global-config:
    db-config:
      id-type: auto
      logic-delete-field: deleted
      logic-delete-value: 1
      logic-not-delete-value: 0
  configuration:
    map-underscore-to-camel-case: on
    call-setters-on-nulls: on
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
           

ElasticJob簡單實作類示例

package com.itxs.scheduler;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

/**
 * 運作在spring
 */

@Slf4j
@Component
@Scope("prototype")
public class ElasticDemoOneJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("ElasticDemoOneJob Start ------jobname={},taskid={},parameter={},shardingitem={},shardingparameter={}",shardingContext.getJobName(),
                shardingContext.getTaskId(),shardingContext.getJobParameter(),
                shardingContext.getShardingItem(),shardingContext.getShardingParameter());
    }
}
           

搞掂運作,出現我們ElasticDemoOneJob排程日志,我們再啟動一個8081端口,也即是同時有多個程序實作排程任務,發現目前的Job一直在8080這個程序上運作,當我們關閉8080端口這個SpringBoot程式後,過一會8081端口這個微服務就會ElasticDemoOneJob排程日志執行任務接替原來8080定時任務。注意由于我們job中繼資料資訊是存在zookeeper裡面,如果我們沒有使用覆寫等級制,重新修改job配置參數後沒有生效,建議先删除掉zookeeper的節點資料然後啟動再服務

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

當我們配置分片後,比如配置為5個配置設定,啟動多個程序會将分片負載均衡配置設定到各個程序任務去支援,比如目前8082則執行0和3兩個分片,8080是2分片,8081是1和4兩個分片,也即是根據目前可用節點資料針對分片數量進行動态調整,這種場景比較适用于處理任務執行時間較長需要處理的資料較大

ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

動态任務

如果我們需要動态建立啟動和停止ElasticJob,我們可以自己實作封裝,具體如下

動态建立啟用和停止的接口聲明類

package com.itxs.service;

import com.itxs.pojo.SysTask;

/**
 * @Description 任務管理
 * @Version 1.0
 */
public interface IElasticJobService {

    /**
     * @Description //添加一個任務
     * @Param [sysTask]
     * @return boolean
     */
    boolean addJob(SysTask sysTask) throws Exception;

    /**
     * @Description //删除某個任務
     * @Param [sysTask]
     * @return boolean
     */
    boolean deleteJob(SysTask sysTask)  throws Exception;
}
           

接口實作類:

package com.itxs.service.impl;

import com.itxs.pojo.SysTask;
import com.itxs.service.IElasticJobService;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class ElasticJobServiceImpl implements IElasticJobService {

    @Value("${elasticjob.reg-center.server-lists}")
    private String serverList;
    @Value("${elasticjob.reg-center.namespace}")
    private String namespace;

    @Override
    public boolean addJob(SysTask sysTask){
        try {
            Class jobClass = Class.forName(sysTask.getClazzName());
            JobConfiguration jobConfig = JobConfiguration.newBuilder(sysTask.getJobName(),1)
                    .cron(sysTask.getCron()).overwrite(true).jobParameter(sysTask.getParmas()).disabled(false).build();
            ScheduleJobBootstrap scheduleJobBootstrap = new ScheduleJobBootstrap(createRegistryCenter(), (SimpleJob)jobClass.newInstance(), jobConfig);
            scheduleJobBootstrap.schedule();
            return true;
        }catch (Exception e){
            return false;
        }
    }

    @Override
    public boolean deleteJob(SysTask sysTask){
        try {
            Class jobClass = Class.forName(sysTask.getClazzName());
            JobConfiguration jobConfig = JobConfiguration.newBuilder(sysTask.getJobName(),1)
                    .cron(sysTask.getCron()).overwrite(true).jobParameter(sysTask.getParmas()).disabled(true).build();
            ScheduleJobBootstrap scheduleJobBootstrap = new ScheduleJobBootstrap(createRegistryCenter(), (SimpleJob)jobClass.newInstance(), jobConfig);
            scheduleJobBootstrap.schedule();
            return true;
        }catch (Exception e){
            return false;
        }
    }

    public CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        regCenter.init();
        return regCenter;
    }
}
           

在原來controller上增加addElasticTask和deleteElasticTask兩個方法

package com.itxs.controller;


import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.itxs.pojo.SysTask;
import com.itxs.service.IElasticJobService;
import com.itxs.service.IJobManageService;
import com.itxs.utils.JsonResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


/**
 * @Description 任務管理控制,此處隻實作了增加和删除,
 * @Version 1.0
 */
@Slf4j
@RestController
@RequestMapping("/quartz")
public class SysTaskController {

    @Autowired
    private IJobManageService jobManageService;

    @Autowired
    private IElasticJobService iElasticJobService;

    private boolean validateParmas(SysTask task) {
        return task != null && StringUtils.isNotBlank(task.getCron())
                && StringUtils.isNotBlank(task.getJobGroup())
                && StringUtils.isNotBlank(task.getJobName());
    }

    @GetMapping("/add")
    public JsonResult addTask() {
        return JsonResult.success();
    }

    /**
     * @Description //添加一個任務
     * @Param [task]
     * @return com.quartz.result.JsonResult
     */
    @PostMapping("/add-task")
    public JsonResult addTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.jobManageService.addJob(task);
                return JsonResult.success();
            } catch (Exception e) {
                log.error("添加任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
            }
        }
        return JsonResult.error();
    }

    @PostMapping("/add-elastic-task")
    public JsonResult addElasticTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.iElasticJobService.addJob(task);
                return JsonResult.success();
            } catch (Exception e) {
                log.error("添加任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
            }
        }
        return JsonResult.error();
    }

    /**
     * @Description //删除一個任務
     * @Param [task]
     * @return com.quartz.result.JsonResult
     */
    @RequestMapping("/delete-elastic-task")
    public JsonResult deleteElasticTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.iElasticJobService.deleteJob(task);

                return JsonResult.success();
            } catch (Exception e) {
                log.error("删除任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
            }
        }
        return JsonResult.error();
    }

    /**
     * @Description //删除一個任務
     * @Param [task]
     * @return com.quartz.result.JsonResult
     */
    @RequestMapping("/delete-task")
    public JsonResult deleteTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.jobManageService.deleteJob(task);

                return JsonResult.success();
            } catch (Exception e) {
                log.error("删除任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
            }
        }
        return JsonResult.error();
    }
}
           

重新啟動服務,端口為8080,post調用位址:http://192.168.3.224:8080/quartz/add-elastic-task,調用添加接口成功任務開始定時執行,檢視zookeeper上也已經存儲了新建立Job中繼資料資訊,當我們調用删除任務接口後定時任務不再執行

{
    "cron": "0/5 * * * * ?",
    "beginTime": "2021-08-27",
    "clazzName": "com.itxs.scheduler.ElasticDemoJob",
    "jobGroup": "myelasticgroup",
    "jobName": "myelasticjob",
    "parmas": "real elastic job dynamic hello world"
}
           
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程
ElasticJob分布式任務排程應用為何要使用分布式任務排程Java定時常用方式ElasticJob-分布式任務排程

有耐心看在這裡的小夥伴們,恭喜你,已經入門了分布式任務排程

繼續閱讀