天天看點

elastic-job 定時任務內建

文章目錄

  • ​​一、基本使用​​
  • ​​第一步添加依賴:​​
  • ​​第二步:增加Zookeeper注冊中心的配置​​
  • ​​第三步:開啟Elastic-Job自動配置​​
  • ​​第四步 配置任務​​
  • ​​二、擴充功能​​
  • ​​1.想在配置檔案設定任務參數怎麼寫?​​
  • ​​2.每次任務執行日志如何記錄?​​
  • ​​3.想執行業務腳本任務如何操作?​​
  • ​​4.任務執行前後如何添加監聽事件?​​
  • ​​5.任務業務執行失敗如何自定義異常處理?​​
  • ​​6.動态作業怎麼實作?​​

一、基本使用

elastic-job的任務類型分為三種:

  • a. Simple類型作業,意為簡單實作,未經任何封裝的類型。需實作SimpleJob接口。該接口僅提供單一方法用于覆寫,此方法将定時執行。與Quartz原生接口相似,但提供了彈性擴縮容和分片等功能。
  • b. Dataflow類型用于處理資料流,需實作DataflowJob接口。該接口提供2個方法可供覆寫,分别用于抓取(fetchData)和處理(processData)資料。
  • c.Script類型作業意為腳本類型作業,支援shell,python,perl等所有類型腳本。隻需通過控制台或代碼配置scriptCommandLine即可,無需編碼。執行腳本路徑可包含參數,參數傳遞完畢後,作業架構會自動追加最後一個參數為作業運作時資訊。

本文檔提供的jar包是對elastic-job進行了二次封裝,封裝成了springboot的starter啟動器,使用更加簡單。後續開發引入我們自定義的starter完成定時任務功能即可。

先以最簡單的SimpleJob類型說明如何使用內建:

第一步添加依賴:

<!--elastic-job的starter內建-->
<dependency>
  <groupId>com.wuzheng</groupId>
  <artifactId>spring-boot-elastic-job-starter</artifactId>
  <version>1.0.4</version>
</dependency>      
​​點選此處檢視starter源碼​​,可以自己下載下傳到本地或私服,引入依賴

第二步:增加Zookeeper注冊中心的配置

elastic.job.zk.serverLists=192.168.0.121:2181,192.168.0.122:2181,192.168.0.129:2181
elastic.job.zk.namespace=yourself namespace      

Zookeeper配置的字首是elastic.job.zk,詳細的屬性配置請檢視​​ZookeeperProperties​​

第三步:開啟Elastic-Job自動配置

開啟自動配置隻需要在Spring Boot的啟動類上增加@EnableElasticJob注解

import java.util.concurrent.CountDownLatch;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import com.cxytiandi.elasticjob.annotation.EnableElasticJob;

/**
 * ElasticJob Spring Boot內建案例
 * 
 * @author hfl
 *
 */
@SpringBootApplication
@EnableElasticJob
public class JobApplication {
  
  public static void main(String[] args) {
    SpringApplication.run(JobApplication.class, args);
  }
  
}      

第四步 配置任務

@ElasticJobConf(name = "MySimpleJob", cron = "0/10 * * * * ?", 
  shardingItemParameters = "0=0,1=1", description = "簡單任務")
public class MySimpleJob implements SimpleJob {

  public void execute(ShardingContext context) {
    String shardParamter = context.getShardingParameter();
    System.out.println("分片參數:"+shardParamter);
    int value = Integer.parseInt(shardParamter);
    for (int i = 0; i < 3; i++) {
      if (i % 2 == value) {
        String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
        System.out.println(time + ":開始執行簡單任務" + i);
      }
    }
  }

}      

任務的配置隻需要在任務類上增加一個ElasticJobConf注解,注解中有很多屬性,這些屬性都是任務的配置,詳細的屬性配置如下連結:

​​​ElasticJobConf.java​​

測試下,啟動自己的任務所在微服務,看下效果

elastic-job 定時任務內建

自此,最簡單的小demo其實已經完成了,但是實際中還有一些進階功能需要了解,在此也一并說明下:

二、擴充功能

擴充包括:

  • 1.想在配置檔案設定任務參數怎麼寫?
  • 2.每次任務執行日志如何記錄?
  • 3.想執行業務腳本任務如何操作?
  • 4.任務執行前後如何添加監聽事件?
  • 5.任務業務執行失敗如何自定義異常處理?
  • 6.動态作業怎麼實作?

下面具體說明:

1.想在配置檔案設定任務參數怎麼寫?

這個問題即application.properties中如何配置任務資訊?

使用注解是比較友善,但是在屬性檔案中配置任務的參數資訊更靈活,也便于以後使用appolo或者necos等工具進行集中配置檔案管理。

我們可以同時指定注解方法,也可以同時在屬性檔案中設定,但是當屬性檔案中配置了任務的資訊,優先級就比注解中的高。具體做法如下:

首先還是在任務類上加@ElasticJobConf(name = “MySimpleJob”)注解,隻需要增加一個name即可,任務名是唯一的。

剩下的配置都可以在屬性檔案中進行配置,格式為elastic.job.任務名.配置屬性=屬性值

elastic.job.MySimpleJob.cron=0/10 * * * * ?
elastic.job.MySimpleJob.overwrite=true
elastic.job.MySimpleJob.shardingTotalCount=1
elastic.job.MySimpleJob.shardingItemParameters=0=0,1=1
elastic.job.MySimpleJob.jobParameter=test
elastic.job.MySimpleJob.failover=true
elastic.job.MySimpleJob.misfire=true
elastic.job.MySimpleJob.description=simple job
elastic.job.MySimpleJob.monitorExecution=false
elastic.job.MySimpleJob.listener=com.cxytiandi.job.core.MessageElasticJobListener
elastic.job.MySimpleJob.jobExceptionHandler=com.cxytiandi.job.core.CustomJobExceptionHandler
elastic.job.MySimpleJob.disabled=true      
elastic-job 定時任務內建
elastic-job 定時任務內建
elastic-job 定時任務內建

2.每次任務執行日志如何記錄?

elastic-job已經為我們提供了事件追蹤資料源功能,可以将每一次的任務執行情況,任務執行軌迹,任務執行曆史持久化到資料庫或者其他日志收集架構中,下面示範如何儲存到資料庫中:

事件追蹤功能在注解中也隻需要配置eventTraceRdbDataSource=你的資料源 就可以使用了,資料源用什麼連接配接池無限制,唯一需要注意的一點是你的資料源必須在spring-boot-elastic-job-starter之前建立,因為spring-boot-elastic-job-starter中依賴了你的資料源,下面我以druid作為連接配接池來進行講解。

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>druid-spring-boot-starter</artifactId>
  <version>1.1.2</version>
</dependency>      

配置連接配接池屬性:

spring.datasource.druid.log.url=jdbc:mysql://localhost:3306/event_log
spring.datasource.druid.log.username=root
spring.datasource.druid.log.password=123456
spring.datasource.druid.log.driver-class-name=com.mysql.jdbc.Driver      

然後在項目中定義一個配置類,配置連接配接池,手動配置的原因是連接配接池可以在elastic-job-starter之前被初始化。

@Configuration
public class BeanConfig {
  
  /**
   * 任務執行事件資料源
   * @return
   */
  @Bean("datasource")
  @ConfigurationProperties("spring.datasource.druid.log")
  public DataSource dataSourceTwo(){
      return DruidDataSourceBuilder.create().build();
  }
  
}
      

然後在注解中增加資料源的配置eventTraceRdbDataSource 即可:

注解配置方式如下:

@ElasticJobConf(name = "MySimpleJob", cron = "0/10 * * * * ?", 
  shardingItemParameters = "0=0,1=1", description = "簡單任務", eventTraceRdbDataSource = "datasource")      
elastic-job 定時任務內建
elastic-job 定時任務內建

也可以通過官方提供的可視化工具看到任務執行情況

elastic-job 定時任務內建

​注意: 經測試:​

​1:經測試: 該功能Mysql資料庫的話隻支援5.7,Mysql5.8并不會生效​

​2: 資料庫名必須是elastic_job_log 才會自動建立這2張表,其他資料庫名請手動添加job_execution_log,job_status_trace_log這2張表(後面附2張表建表sql)​

​3:zookeepr注冊中心必須能夠連到mysql​

CREATE TABLE `job_execution_log` (
  `id` varchar(40) NOT NULL,
  `job_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '作業名稱',
  `task_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任務名稱,每次作業運作生成新任務',
  `hostname` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '主機名稱',
  `ip` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '主機IP',
  `sharding_item` int NOT NULL COMMENT '分片項',
  `execution_source` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '作業執行來源。可選值為NORMAL_TRIGGER, MISFIRE, FAILOVER',
  `failure_cause` varchar(4000) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '執行失敗原因',
  `is_success` int NOT NULL COMMENT '是否執行成功',
  `start_time` timestamp NULL DEFAULT NULL COMMENT '作業開始執行時間',
  `complete_time` timestamp NULL DEFAULT NULL COMMENT '作業結束執行時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='曆史軌迹表';

CREATE TABLE `job_status_trace_log` (
  `id` varchar(40) NOT NULL,
  `job_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '作業名稱',
  `original_task_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '原任務id',
  `task_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任務id',
  `slave_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '執行作業伺服器的名稱,Lite版本為伺服器的IP位址,Cloud版本為Mesos執行機主鍵',
  `source` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任務執行源,可選值為CLOUD_SCHEDULER, CLOUD_EXECUTOR, LITE_EXECUTOR',
  `execution_type` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任務執行類型,可選值為NORMAL_TRIGGER, MISFIRE, FAILOVER',
  `sharding_item` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '分片項集合,多個分片項以逗号分隔',
  `state` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任務執行狀态,可選值為TASK_STAGING, TASK_RUNNING, TASK_FINISHED, TASK_KILLED, TASK_LOST, TASK_FAILED, TASK_ERROR',
  `message` varchar(4000) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '相關資訊',
  `creation_time` timestamp NULL DEFAULT NULL COMMENT '記錄建立時間',
  PRIMARY KEY (`id`),
  KEY `TASK_ID_STATE_INDEX` (`task_id`,`state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='曆史狀态表';      

3.想執行業務腳本任務如何操作?

elastic-job提供了Script任務類型.

由于Script任務的執行邏輯是在具體的腳本中,是通過scriptCommandLine來指定執行腳本的路徑。我這邊為了統一的去發現項目中的任務清單,還是需要建一個腳本的Java類,加上ElasticJobConf注解,隻是不需要寫邏輯而已,示例如下:

/**
 * 腳本任務不需要寫邏輯,邏輯在被執行的腳本中,這邊隻是定義一個任務而已
 *
 */
@ElasticJobConf(name = "MyScriptJob")
public class MyScriptJob implements ScriptJob {

  public void execute(ShardingContext context) {
    
  }

}
      

配置:

elastic.job.MyScriptJob.cron=0/10 * * * * ?
elastic.job.MyScriptJob.overwrite=true
elastic.job.MyScriptJob.scriptCommandLine=D:\\1.bat      

1.bat腳本内容如下:

@echo ------【腳本任務】Sharding Context: %*      
elastic-job 定時任務內建

4.任務執行前後如何添加監聽事件?

首先書寫自己的監聽事件:

/**
 * 作業監聽器, 執行前後發送釘釘消息進行通知
 * @author hfl
 */
public class MessageElasticJobListener implements ElasticJobListener {

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String msg = date + " 【五征定時任務-" + shardingContexts.getJobName() + "】任務開始執行====" + JsonUtils.toJson(shardingContexts);
//        DingDingMessageUtil.sendTextMessage(msg);
        System.out.println(msg);
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String msg = date + " 【五征定時任務-" + shardingContexts.getJobName() + "】任務執行結束====" + JsonUtils.toJson(shardingContexts);
//        DingDingMessageUtil.sendTextMessage(msg);
        System.out.println(msg);
    }

}      

然後在配置檔案中job參數上添加listener參數:

elastic.job.MySimpleJob.listener=com.wuzheng.job.core.MessageElasticJobListener
      
elastic-job 定時任務內建

運作MySimpleJob看下執行結果:

elastic-job 定時任務內建

可以看到任務前後執行了我們自己的邏輯.

5.任務業務執行失敗如何自定義異常處理?

和監聽器配置一樣,第一步書寫自定義異常,第二步配置job參數指定自定義異常類。

/**
 * 自定義異常處理,在任務異常時使用釘釘發送通知
 * @author hfl
 */
public class CustomJobExceptionHandler implements JobExceptionHandler {

    private Logger logger = LoggerFactory.getLogger(CustomJobExceptionHandler.class);

    @Override
    public void handleException(String jobName, Throwable cause) {
        logger.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
        DingDingMessageUtil.sendTextMessage("【"+jobName+"】任務異常。" + cause.getMessage());
    }

}      

異常參數設定

elastic.job.MySimpleJob.jobExceptionHandler=com.wuzheng.job.core.CustomJobExceptionHandler
      

我們自定義個異常測試下(System.out.println(2/0);)

@ElasticJobConf(name = "MySimpleJob")
public class MySimpleJob implements SimpleJob {

  @Override
  public void execute(ShardingContext context) {
    System.out.println(2/0);
    String shardParamter = context.getShardingParameter();
    System.out.println("分片參數:"+shardParamter);
    for (int i = 0; i < 2; i++) {
        String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
        System.out.println(time + ":開始執行簡單任務" + i);
    }
  }

}      

看下效果:

已經指向了我們自定義的異常的類及其類裡面的異常處理邏輯代碼:

elastic-job 定時任務內建

6.動态作業怎麼實作?

以上的定時任務基本都是這個流程:首先配置job參數,然後編寫業務job,最後啟動定時任務所在的微服務,這樣該微服務下的所有定時任務job都統一注冊進zookeeper注冊中心了,然後自動就會啟動這些定時任務。

如果有個定時任務,不想微服務啟動的時候讓其啟用,而是想動态的注冊任務參數并控制何時啟用。這個需求實作也很簡單,我們封裝的elastic-job的starter就已提供實作。

自定義的starter提供了restful的2個接口,一個注冊任務接口: /job;一個删除任務接口:/job/remove?jobName=任務名。

首先書寫自己的任務
package com.wuzheng.job.demo.dynamic;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

/**
 * 動态添加任務示範
 * @author hfl
 *
 */
public class DynamicJob implements SimpleJob {

  @Override
  public void execute(ShardingContext shardingContext) {
    // 可以根據JobParameter來對不同的資料進行操作
    System.out.println(shardingContext.getJobParameter());
    System.out.println(shardingContext.getShardingParameter());
  }

}      
置好之後,啟動這個定時任務微服務項目并不會啟用這個任務,因為我們沒有任務參數,也沒在job上加@ElasticJobConf注解,掃描不到這個任務,但是通過REST API可以動态的注冊任務,API清單如下:

/job

添加任務是POST請求,資料格式為JSON體送出,格式如下:

{
"jobName":"DynamicJob333",
"cron":"0/10 * * * * ?",
"jobType":"SIMPLE",
"jobClass":"com.wuzheng.job.demo.dynamic.DynamicJob",
"jobParameter":"2222222",
"shardingTotalCount":1
}      

​​點選檢視完整字段位址​​

注意:jobClass必須事先存在于服務中

使用postman測試:

elastic-job 定時任務內建

去控制台參看,顯示已經成功注冊到zookeeper中心,作業狀态正常!

elastic-job 定時任務內建

/job/remove 删除任務是GET請求,參數隻要任務名稱即可,比如:/job/remove?jobName=任務名。可以用于任務完成之後清空注冊中心的任務資訊。

以上所有demo的源碼下載下傳位址請點選:​​https://gitee.com/hufanglei/elastic-job-learn/tree/master/spring-boot-elastic-job-example​​。

個人微信公衆号:

搜尋: 怒放de每一天

不定時推送相關文章,期待和大家一起成長!!