天天看點

elastic-job靜态任務與動态任務實戰elastic-job

文章目錄

  • elastic-job
    • 分布式任務排程
      • 概念
      • 分類
    • elastic-job簡介
      • 下載下傳安裝使用
    • 實戰案例
      • 靜态任務案例
      • 動态任務案例

elastic-job

分布式任務排程

概念

很多時候,我們需要定時執行一些程式完成一些預定要完成的操作,如果手動處理,一旦任務量過大,就非常麻煩,是以用定時任務去操作是個非常不錯的選項,這種操作就是分布式任務排程。

分類

現在的應用多數是分布式或者微服務,是以我們需要的是分布式任務排程,那麼現在分布式任務排程流行的主要有elastic-job、xxl-job、quartz等,我們這裡做一個對比:

feature quartz elastic-job xxl-job antares opencron
依賴 mysql jdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+ mysql ,jdk1.7+ , maven3.0+ jdk 1.7+ , redis , zookeeper jdk1.7+ , Tomcat8.0+
HA 多節點部署,通過競争資料庫鎖來保證隻有一個節點執行任務 通過zookeeper的注冊與發現,可以動态的添加伺服器。 支援水準擴容 叢集部署 叢集部署
任務分片 支援 支援 支援
文檔完善 完善 完善 完善 文檔略少 文檔略少
管理界面 支援 支援 支援 支援
難易程度 簡單 簡單 簡單 一般 一般
公司 OpenSymphony 當當網 個人 個人 個人
進階功能 彈性擴容,多種作業模式,失效轉移,運作狀态收集,多線程處理資料,幂等性,容錯處理,spring命名空間支援 彈性擴容,分片廣播,故障轉移,Rolling實時日志,GLUE(支援線上編輯代碼,免釋出),任務進度監控,任務依賴,資料加密,郵件報警,運作報表,國際化 任務分片, 失效轉移,彈性擴容 , 時間規則支援quartz和crontab ,kill任務, 現場執行,查詢任務運作狀态
使用企業 大衆化産品,對分布式排程要求不高的公司大面積使用 36氪,當當網,國美,金柚網,聯想,唯品會,亞信,平安,豬八戒 大衆點評,運滿滿,優信二手車,拍拍貸

elastic-job簡介

中文官網:https://shardingsphere.apache.org/elasticjob/index_zh.html

ElasticJob 是一個分布式排程解決方案,由 2 個互相獨立的子項目 ElasticJob-Lite 和 ElasticJob-Cloud 組成。

ElasticJob-Lite 定位為輕量級無中心化解決方案,使用jar的形式提供分布式任務的協調服務;ElasticJob-Cloud 使用 Mesos 的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。

ElasticJob 的各個産品使用統一的作業 API,開發者僅需要一次開發,即可随意部署。

下載下傳安裝使用

可以參考官方文檔:https://shardingsphere.apache.org/elasticjob/current/cn/overview/

實戰案例

靜态任務案例

使用elastic-job很容易,我們接下來學習下elastic-job的使用,這裡的案例我們先實作靜态任務案例,靜态任務案例也就是執行時間事先寫好。

實作步驟:

  1. 引入依賴包
  2. 配置zookeeper節點以及任務名稱命名空間
  3. 實作自定義任務,需要實作SimpleJob接口

1)在

seckill-goods

中引入依賴

<!-- ElasticJobAutoConfiguration自動配置類作用-->
<dependency>
    <groupId>com.github.kuhn-he</groupId>
    <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
    <version>2.1.5</version>
</dependency>
           

2)配置elastic-job

bootstrap.yml

中配置

elastic-job

,如下:

elaticjob:
  zookeeper:
    server-lists: zk-server:3181
    namespace: updatetask
           

配置說明:

  • server-lists:

    zookeeper的位址
  • namespace

    :定時任務命名空間

3)任務建立

建立

com.seckill.goods.task.statictask.ElasticjobTask

,代碼如下:

@ElasticSimpleJob(
        cron = "5/10 * * * * ?",
        jobName = "updateTask",
        shardingTotalCount = 1
)
@Component
public class ElasticjobTask implements SimpleJob {

    /***
     * 執行任務
     * @param shardingContext
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("-----------執行!");
    }
}
           

參數說明:

  • cron:定時表達式
  • jobName:這裡和bootstrap.yml中的namespace保持一緻
  • shardingTotalCount:分片數量

動态任務案例

項目參考位址:https://github.com/LuoLiangDSGA/spring-learning/tree/master/boot-elasticjob

動态任務案例主要是講解程式在運作時,動态添加定時任務,這種場景應用非常廣泛。使用elastic-job實作動态添加定時任務的實作有點複雜,我們接下來實際操作一次。

實作步驟:

  1. 配置初始化的zookeeper位址
  2. 配置的定時任務命名空間(不一定會使用)
  3. 注冊初始化資料
  4. 監聽器->任務執行前後監聽(可有可無)
  5. 動态添加定時任務實作
  6. 自定義任務處理過程-實作SimpleJob

1)監聽器建立

監聽器采用AOP模式,類似前置通知和後置通知,

doBeforeJobExecutedAtLastStarted

doAfterJobExecutedAtLastCompleted

分别會在任務執行前和執行後調用,我們建立一個監聽器實作任務排程前後攔截,

com.seckill.goods.task.dynamic.ElasticJobListener

:

public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {

    /****
     * 構造函數
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    /***
     * 任務初始化前要做的事情,類似前置通知
     * @param shardingContexts
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("========doBeforeJobExecutedAtLastStarted========"+ TimeUtil.date2FormatHHmmss(new Date()));
    }

    /***
     * 任務執行完成後要做的事情,類似後置通知
     * @param shardingContexts
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.out.println("=======doAfterJobExecutedAtLastCompleted============="+ TimeUtil.date2FormatHHmmss(new Date()));
    }
}
           

2)注冊中心配置

在bootstrap.yml中配置zk和namespace

#配置動态任務案例的zk和namespace
zk: zk-server:3181
namesp: autotask
           

建立配置類配置注冊中心資訊,

com.seckill.goods.task.dynamic.ElasticJobConfig

:

@Configuration
public class ElasticJobConfig {

    //配置檔案中的zookeeper的ip和端口
    @Value(value = "${zk}")
    private String serverlists;
    //指定一個命名空間
    @Value("${namesp}")
    private String namespace;

    /***
     * 配置Zookeeper和namespace
     * @return
     */
    @Bean
    public ZookeeperConfiguration zkConfig() {
        return new ZookeeperConfiguration(serverlists, namespace);
    }

    /***
     * 向zookeeper注冊初始化資訊
     * @param config
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }

    /****
     * 建立ElasticJob的監聽器執行個體
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        //初始化要給定逾時多少秒重連
        return new ElasticJobListener(100L,100L);
    }
}
           

3)任務建構

我們建立一個動态配置任務的類,任何邏輯代碼需要建立定時任務,可以直接調用該類的指定方法即可。建立類:

com.seckill.goods.task.dynamic.ElasticJobHandler

,代碼如下:

@Component
public class ElasticJobHandler {

    @Resource
    private ZookeeperRegistryCenter registryCenter;

    @Resource
    private ElasticJobListener elasticJobListener;

    /**
     * @param jobName:任務的命名空間
     * @param jobClass:執行的定時任務對象
     * @param shardingTotalCount:分片個數
     * @param cron:定時周期表達式
     * @param id:自定義參數
     * @return
     */
    private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
                                                                       Class<? extends SimpleJob> jobClass,
                                                                       int shardingTotalCount,
                                                                       String cron,
                                                                       String id) {
        //建立任務建構對象
        LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.
                        //任務命名空間名字、任務執行周期表達式、分片個數
                        newBuilder(jobName, cron, shardingTotalCount).
                        //自定義參數
                        jobParameter(id).
                        build(),
                jobClass.getCanonicalName()));
        //本地配置是否可覆寫注冊中心配置
        builder.overwrite(true);
        return builder;
    }

    /**
     * 添加一個定時任務
     * @param cron:周期執行表達式
     * @param id:自定義參數
     * @param jobName:命名空間
     * @param instance:任務對象
     */
    public void addPublishJob(String cron,String id,String jobName,SimpleJob instance) {
        LiteJobConfiguration jobConfig = simpleJobConfigBuilder(
                jobName,
                instance.getClass(),
                1,
                cron,
                id).overwrite(true).build();
        //DynamicTask為具體的任務執行邏輯類
        new SpringJobScheduler(instance, registryCenter, jobConfig, elasticJobListener).init();
    }

    /***
     * Date轉cron表達式
     */
    public static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";

    /**
     * 獲得定時
     * @param date
     * @return
     */
    public static String getCron(final Date date) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(CRON_DATE_FORMAT);
        return simpleDateFormat.format(date);
    }
}
           

4)執行邏輯

我們接着建立一個類,用于執行自己所需要操作的邏輯,

com.seckill.goods.task.dynamic.DynamicTask

,代碼如下:

public class DynamicTask implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        //傳遞的參數
        String id = shardingContext.getJobParameter();
        try {
            //具體任務邏輯
            System.out.println("執行你的邏輯代碼!param:"+id);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
           

5)調用測試

建立

com.seckill.goods.controller.TaskController

動态調用建立任務的方法,代碼如下:

@RestController
@RequestMapping(value = "/task")
public class TaskController {

    @Autowired
    ElasticJobHandler elasticJobHandler;

    /***
     * 動态建立任務
     * @param times:延遲時間,為了測試到效果,是以在目前時間往後延遲
     * @param jobname:任務名字
     * @param param:自定義參數
     * @return
     */
    @GetMapping
    public Result add(Long times,String jobname,String param){
        //在目前指定時間内延遲times毫秒執行任務
        Date date = new Date(System.currentTimeMillis()+times);
        //需要傳遞給定時任務的參數
        String cron = ElasticJobHandler.getCron(date);

        //執行任務
        elasticJobHandler.addPublishJob(cron,param,jobname,new DynamicTask());
        return new Result(true, StatusCode.OK,"添加任務成功!");
    }
}
           

6)測試

通路:

http://localhost:18081/task?times=15000&jobname=asyncname&param=No001

背景執行效果如下:

elastic-job靜态任務與動态任務實戰elastic-job