天天看點

SpringBoot2 整合ElasticJob架構,定制化管理流程一、ElasticJob簡介二、定時任務加載三、動态添加四、源代碼位址

一、ElasticJob簡介

1、定時任務

在前面的文章中,說過QuartJob這個定時任務,被廣泛應用的定時任務标準。但Quartz核心點在于執行定時任務并不是在于關注的業務模式和場景,缺少高度自定義的功能。Quartz能夠基于資料庫實作任務的高可用,但是不具備分布式并行排程的功能。

2、ElasticJob說明

  • 基礎簡介

Elastic-Job 是一個開源的分布式排程中間件,由兩個互相獨立的子項目 Elastic-Job-Lite 和 Elastic-Job-Cloud 組成。Elastic-Job-Lite 為輕量級無中心化解決方案,使用 jar 包提供分布式任務的排程和治理。 Elastic-Job-Cloud 是一個 Mesos Framework,依托于Mesos額外提供資源治理、應用分發以及程序隔離等服務。

  • 功能特點
分布式排程協調
彈性擴容縮容
失效轉移
錯過執行作業重觸發
作業分片一緻性,保證同一分片在分布式環境中僅一個執行執行個體           

補刀

:人家官網這樣描述的,這裡贅述一下,充實一下文章。

  • 基礎架構結構

該圖檔來自ElasticJob官網。

SpringBoot2 整合ElasticJob架構,定制化管理流程一、ElasticJob簡介二、定時任務加載三、動态添加四、源代碼位址

由圖可知如下内容:

需要Zookeeper元件支援,作為分布式的排程任務,有良好的監聽機制,和控制台,下面的案例也就沖這個圖解來。

3、分片管理

這個概念在ElasticJob中是最具有特點的,實用性極好。

  • 分片概念

任務的分布式執行,需要将一個任務拆分為多個獨立的任務項,然後由分布式的伺服器分别執行某一個或幾個分片項。

場景描述:假設有服務3台,分3片管理,要處理資料表100條,那就可以100%3,按照餘數0,1,2分散到三台服務上執行,看到這裡分庫分表的基本邏輯湧上心頭,這就是為何很多大牛講說,程式設計思維很重要。

  • 個性化參數

個性化參數即shardingItemParameter,可以和分片項比對對應關系,用于将分片項的數字轉換為更加可讀的業務代碼。

場景描述:這裡猛一讀好像很飄逸,其實就是這個意思,如果分3片,取名[0,1,2]不好看,或者不好辨別,可以分别給個别名辨別一下,[0=A,1=B,2=C]。

二、定時任務加載

1、核心依賴包

這裡使用2.0+的版本。

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>           

2、核心配置檔案

這裡主要配置一下Zookeeper中間件,分片和分片參數。

zookeeper:
  server: 127.0.0.1:2181
  namespace: es-job

job-config:
  cron: 0/10 * * * * ?
  shardCount: 1
  shardItem: 0=A,1=B,2=C,3=D           

3、自定義注解

看了官方的案例,沒看到好用的注解,這裡隻能自己編寫一個,基于案例的加載過程和核心API作為參考。

核心配置類:

com.dangdang.ddframe.job.lite.config.LiteJobConfiguration           

根據自己想如何使用注解的思路,比如我隻想注解定時任務名稱和Cron表達式這兩個功能,其他參數直接統一配置(這裡可能是受QuartJob影響太深,可能根本就是想省事...)

@Inherited
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface TaskJobSign {

    @AliasFor("cron")
    String value() default "";

    @AliasFor("value")
    String cron() default "";

    String jobName() default "";

}           

4、作業案例

這裡列印一些基本參數,對照配置和注解,一目了然。

@Component
@TaskJobSign(cron = "0/5 * * * * ?",jobName = "Hello-Job")
public class HelloJob implements SimpleJob {

    private static final Logger LOG = LoggerFactory.getLogger(HelloJob.class.getName()) ;

    @Override
    public void execute(ShardingContext shardingContext) {
        LOG.info("目前線程: "+Thread.currentThread().getId());
        LOG.info("任務分片:"+shardingContext.getShardingTotalCount());
        LOG.info("目前分片:"+shardingContext.getShardingItem());
        LOG.info("分片參數:"+shardingContext.getShardingParameter());
        LOG.info("任務參數:"+shardingContext.getJobParameter());
    }
}           

5、加載定時任務

既然自定義注解,那加載過程自然也要自定義一下,讀取自定義的注解,配置化,加入容器,然後初始化,等着任務執行就好。

@Configuration
public class ElasticJobConfig {

    @Resource
    private ApplicationContext applicationContext ;
    @Resource
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    @Value("${job-config.cron}") private String cron ;
    @Value("${job-config.shardCount}") private int shardCount ;
    @Value("${job-config.shardItem}") private String shardItem ;
    
    /**
     * 配置任務監聽器
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        return new TaskJobListener();
    }
    /**
     * 初始化配置任務
     */
    @PostConstruct
    public void initTaskJob() {
        Map<String, SimpleJob> jobMap = this.applicationContext.getBeansOfType(SimpleJob.class);
        Iterator iterator = jobMap.entrySet().iterator();
        while (iterator.hasNext()) {
            // 自定義注解管理
            Map.Entry<String, SimpleJob> entry = (Map.Entry)iterator.next();
            SimpleJob simpleJob = entry.getValue();
            TaskJobSign taskJobSign = simpleJob.getClass().getAnnotation(TaskJobSign.class);
            if (taskJobSign != null){
                String cron = taskJobSign.cron() ;
                String jobName = taskJobSign.jobName() ;
                // 生成配置
                SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
                                                JobCoreConfiguration.newBuilder(jobName, cron, shardCount)
                                                .shardingItemParameters(shardItem).jobParameter(jobName).build(),
                                                simpleJob.getClass().getCanonicalName());
                LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
                                                simpleJobConfiguration).overwrite(true).build();
                TaskJobListener taskJobListener = new TaskJobListener();
                // 初始化任務
                SpringJobScheduler jobScheduler = new SpringJobScheduler(
                                                simpleJob, zookeeperRegistryCenter,
                                                liteJobConfiguration, taskJobListener);
                jobScheduler.init();
            }
        }
    }
}           

絮叨一句

:不要疑問這些API是怎麼知道,看下官方文檔的案例,他們怎麼使用這些核心API,這裡就是照着寫過來,就是多一步自定義注解類的加載過程。當然官方文檔大緻讀一遍還是很有必要的。

補刀一句

:如何快速學習一些元件的用法,首先找到官方文檔,或者開源庫Wiki,再不濟ReadMe文檔(如果都沒有,酌情放棄,另尋其他),熟悉基本功能是否符合自己的需求,如果符合,就看下基本用法案例,熟悉API,最後就是研究自己需要的功能子產品,個人經驗來看,該過程是彎路最少,坑最少的。

6、任務監聽

用法非常簡單,實作ElasticJobListener接口。

@Component
public class TaskJobListener implements ElasticJobListener {
    private static final Logger LOG = LoggerFactory.getLogger(TaskJobListener.class);

    private long beginTime = 0;

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        beginTime = System.currentTimeMillis();
        LOG.info(shardingContexts.getJobName()+"===>開始...");
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        long endTime = System.currentTimeMillis();
        LOG.info(shardingContexts.getJobName()+
        "===>結束...[耗時:"+(endTime - beginTime)+"]");
    }
}           

絮叨一句

:before和after執行前後,中間執行目标方法,标準的AOP切面思想,是以底層水準決定了對上層架構的了解速度,那本《Java程式設計思想》上的灰塵是不是該擦擦?

三、動态添加

1、作業任務

有部分場景需要動态添加和管理定時任務,基于上面的加載流程,在自定義一些步驟就可以。

@Component
public class GetTimeJob implements SimpleJob {

    private static final Logger LOG = LoggerFactory.getLogger(GetTimeJob.class.getName()) ;

    private static final SimpleDateFormat format =
            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") ;

    @Override
    public void execute(ShardingContext shardingContext) {
        LOG.info("Job Name:"+shardingContext.getJobName());
        LOG.info("Local Time:"+format.format(new Date()));
    }
}           

2、添加任務服務

這裡就動态添加上面的任務。

@Service
public class TaskJobService {

    @Resource
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    public void addTaskJob(final String jobName,final SimpleJob simpleJob,
                           final String cron,final int shardCount,final String shardItem) {
        // 配置過程
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
                                                    jobName, cron, shardCount)
                                                    .shardingItemParameters(shardItem).build();
        JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,
                                                    simpleJob.getClass().getCanonicalName());
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
                                                    jobTypeConfiguration).overwrite(true).build();
        TaskJobListener taskJobListener = new TaskJobListener();
        // 加載執行
        SpringJobScheduler jobScheduler = new SpringJobScheduler(
                simpleJob, zookeeperRegistryCenter,
                liteJobConfiguration, taskJobListener);
        jobScheduler.init();
    }

}           

補刀一句

:這裡添加之後,任務就會定時執行,如何停止任務又是一個問題,可以在任務名上做一些配置,比如在資料庫生成一條記錄[1,job1,state],如果排程到state為停止狀态的任務,直接截胡即可。

3、測試接口

@RestController
public class TaskJobController {

    @Resource
    private TaskJobService taskJobService ;

    @RequestMapping("/addJob")
    public String addJob(@RequestParam("cron") String cron,@RequestParam("jobName") String jobName,
                         @RequestParam("shardCount") Integer shardCount,
                         @RequestParam("shardItem") String shardItem) {
        taskJobService.addTaskJob(jobName, new GetTimeJob(), cron, shardCount, shardItem);
        return "success";
    }
}           

四、源代碼位址

GitHub·位址
https://github.com/cicadasmile/middle-ware-parent
GitEE·位址
https://gitee.com/cicadasmile/middle-ware-parent