天天看点

ElasticJob‐Lite:Dataflow作业

​ElasticJob​

​​的作业分类基于​

​class​

​​和​

​type​

​​两种类型。基于​

​class​

​​的作业需要开发者自行通过实现接口的方式织入业务逻辑;基于​

​type​

​​的作业则无需编码,只需要提供相应配置即可。基于​

​class​

​​的作业接口的方法参数​

​shardingContext​

​​包含作业配置、片和运行时信息。可通过​

​getShardingTotalCount()​

​​、​

​getShardingItem()​

​等方法分别获取分片总数和运行在本作业服务器的分片序列号等。

​ElasticJob​

​​目前提供​

​Simple​

​​、​

​Dataflow​

​​这两种基于​

​class​

​​的作业类型,并提供​

​Script​

​​、​

​HTTP​

​​这两种基于​

​type​

​​的作业类型,用户可通过实现​

​SPI​

​接口自行扩展作业类型。

本篇博客介绍​

​Dataflow​

​作业。

添加依赖(​

​3.0.1​

​​是目前最新的​

​Releases​

​版本):

<dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-core</artifactId>
            <version>3.0.1</version>
        </dependency>      

​Dataflow​

​​作业用于处理数据流,需要实现​

​DataflowJob​

​​接口。该接口提供​

​2​

​​个方法,分别用于抓取 (​

​fetchData​

​​)和处理 (​

​processData​

​) 数据。

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import reactor.core.publisher.Flux;

import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:02
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class MyDataflowJob implements DataflowJob<Flux<String>> {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final String[][] message = {
            {"java", "c", "c++", "python", "go"},
            {"docker", "k8s"},
            {"elastic-job", "elasticsearch", "zookeeper", "spring cloud alibaba"}
    };

    @Override
    public List<Flux<String>> fetchData(ShardingContext shardingContext) {
        int item = shardingContext.getShardingItem();
        return Collections.singletonList(Flux.fromArray(message[item]));
    }

    @Override
    public void processData(ShardingContext shardingContext, List<Flux<String>> list) {
        System.out.println("-------------------------------------------------------------");
        System.out.println(formatter.format(new Date()));
        System.out.println(shardingContext.getShardingParameter());
        list.forEach(MyDataflowJob::printData);
    }

    private static void printData(Flux<String> data) {
        data.sort().toStream().forEach(System.out::println);
    }
}      

​Flux​

​​抽象类由​

​reactor​

​​项目提供,​

​Spring​

​​的响应式编程就是基于​

​reactor​

​​项目。响应式编程是一种基于数据流和变化传递的声明式编程范式。这里就不详细介绍了,代码应该很容易看懂,就是根据分片项来获取对应的字符串数组,然后根据字典序从小到大的顺序打印该字符串数组中的字符串。​

​getShardingParameter​

​方法用于获取对应的分片参数。

​reactor​

​项目:

<dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.3.8.RELEASE</version>
        </dependency>      
package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:05
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class Application {
    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new MyDataflowJob(),
                createJobConfiguration()).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }
    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MyDataflowJob", 3)
                .shardingItemParameters("0=程序语言,1=容器技术,2=框架")
                .description("数据流作业")
                .cron("30 * * * * ?")
                .overwrite(true)
                .monitorExecution(false)
                .misfire(true)                
                .build();
    }
}      

​shardingItemParameters​

​​方法用于设置分片项和分片参数的映射,分片项和分片参数用等号分割,多个分片项和分片参数用逗号分割,分片项从零开始。​

​monitorExecution​

​​方法是用于设置是否启动​

​monitorExecution​

​​(默认启动,默认值为​

​true​

​​),对于短间隔作业,最好禁用​

​monitorExecution​

​​以提高性能(会增大​

​ZooKeeper​

​​的压力,使得​

​ZooKeeper​

​​性能下降,因为每次​

​cron​

​​时间间隔会写数据到​

​ZooKeeper​

​​上,用来保证数据不会重复获取)。 如果禁用​

​monitorExecution​

​​,它不能保证数据重复获取,并且不能失效转移,因此作业需要保持幂等性。 对于长间隔作业,最好启用​

​monitorExecution​

​​以保证只获取一次数据。​

​misfire​

​​方法用于设置是否启动错过任务重执行(默认启动,默认值也为​

​true​

​​),​

​ElasticJob​

​不允许作业在同一时间内叠加执行。当作业的执行时长超过其运行间隔(因为某种原因),错过任务重执行能够保证作业在完成上次的任务后继续执行逾期的作业。

结果如下图所示:

ElasticJob‐Lite:Dataflow作业
ElasticJob‐Lite:Dataflow作业
ElasticJob‐Lite:Dataflow作业