天天看点

ElasticJob‐Lite:自定义作业分片策略

Java SPI机制

在上一篇博客中介绍了​

​ElasticJob​

​的作业分片策略:

  • ​​ElasticJob‐Lite:作业分片策略介绍与源码分析​​

其中提到了​

​ElasticJob​

​​是通过​

​Java​

​​提供的​

​SPI​

​​机制(​

​ServiceLoader​

​类)加载所有作业分片策略。

​ServiceLoader​

​​类就是​

​Java​

​​提供的​

​SPI​

​​,​

​SPI​

​​(​

​Service Provider Interface​

​​)是​

​JDK​

​​内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,不同厂商可以针对同一接口做出不同的实现,比如​

​java.sql.Driver​

​​接口,​

​MySQL​

​​和​

​PostgreSQL​

​​都提供了对应的实现给用户使用,而​

​Java​

​​的​

​SPI​

​​机制可以为某个接口寻找服务实现。​

​Java​

​​中​

​SPI​

​机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

​ServiceLoader​

​类正常工作的唯一要求是服务提供类必须具有无参构造函数,以便它们可以在加载期间实例化。通过在资源目录的​

​META-INF/services​

​​中放置服务提供者配置文件来标识服务提供者,文件名是服务类型的完全限定名(比如​

​ElasticJobListener​

​​类的完全限定名),该文件包含具体的服务提供者类的完全限定名列表(​

​ElasticJobListener​

​​实现类的完全限定名列表),每行一个,每个名称周围的空格和制表符以及空行都将被忽略,该文件必须以​

​UTF-8​

​编码。

自定义作业分片策略

所有可用的作业分片策略在​

​JobShardingStrategyFactory​

​​类的静态块中被加载(通过​

​ElasticJobServiceLoader​

​​类,该类是​

​ElasticJob​

​​基于​

​Java SPI​

​机制实现的特定于作业的服务加载器)。

static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }      

加载的类型是​

​JobShardingStrategy.class​

​,因此自定义的作业分片策略需要实现该接口。

自定义作业分片策略​

​ShuffleJobShardingStrategy​

​类:

package com.kaven.job.my;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.*;

public class ShuffleJobShardingStrategy implements JobShardingStrategy {

    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        // 先将作业分片项装入容器
        List<Integer> shuffleShardingList = new ArrayList<>(shardingTotalCount);
        for (int i = 0; i < shardingTotalCount; i++) {
            shuffleShardingList.add(i);
        }
        // 将容器中的作业分片项顺序打乱(使用容器的shuffle方法)
        Collections.shuffle(shuffleShardingList);
        // 模仿AverageAllocationJobShardingStrategy作业分片策略进行分配
        Map<JobInstance, List<Integer>> result = shardingShuffle(jobInstances, shardingTotalCount, shuffleShardingList);
        addShuffle(jobInstances, shardingTotalCount, result, shuffleShardingList);
        return result;
    }

    private Map<JobInstance, List<Integer>> shardingShuffle(final List<JobInstance> shardingUnits,
                                                            final int shardingTotalCount,
                                                            final List<Integer> shuffleShardingList) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        // 每个作业服务器最少应该分配的作业分片项数
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            // 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1)
            // itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                // 给作业分片项列表添加容器中的第i个作业分片项
                shardingItems.add(shuffleShardingList.get(i));
            }
            // 将作业服务器与它执行的作业分片项列表进行关联
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }

    private void addShuffle(final List<JobInstance> shardingUnits, final int shardingTotalCount,
                            final Map<JobInstance, List<Integer>> shardingResults,
                            final List<Integer> shuffleShardingList) {
        // 无法平均分配的分片项数
        int aliquant = shardingTotalCount % shardingUnits.size();
        // 已分配的无法平均分配的分片项数
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            // 是否还有无法平均分配的分片项
            if (count < aliquant) {
                // 分配给序号较小的作业服务器
                entry.getValue().add(
                        shuffleShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count)
                );
            }
            // 已分配数更新
            count++;
        }
    }

    // 作业分片策略的标识符
    @Override
    public String getType() {
        return "Shuffle";
    }
}      

博主自定义的​

​ShuffleJobShardingStrategy​

​​作业分片策略是模仿​

​AverageAllocationJobShardingStrategy​

​​作业分片策略(默认的作业分片策略),只是先将作业分片项装入容器,然后将容器中的作业分片项顺序打乱(使用容器的​

​shuffle​

​​方法),之后再基于该作业分片项容器使用​

​AverageAllocationJobShardingStrategy​

​​作业分片策略给作业服务器分配该容器中的作业分片项,如果不了解​

​AverageAllocationJobShardingStrategy​

​作业分片策略,可以去看看最上面列出的博客。

添加服务实现

在​

​resources​

​​的​

​META-INF/services​

​中放置服务提供者配置文件来标识服务提供者,如下图所示:

ElasticJob‐Lite:自定义作业分片策略

测试

作业定义(​

​Simple​

​作业):

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import java.text.SimpleDateFormat;
import java.util.Date;

public class MySimpleJob implements SimpleJob {

    private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    @Override
    public void execute(ShardingContext shardingContext) {
        String job = shardingContext.getShardingParameter();
        if(job == null || job.trim().equals("")) {
            System.out.println("请指定帮[Kaven]执行的任务名称!");
            throw new RuntimeException();
        }
        System.out.printf("%s 执行任务%d - [%s]!\n", formatter.format(new Date()),
                shardingContext.getShardingItem(), job);
    }
}      

启动类:

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

public class Application {

    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration())
                .schedule();
    }
    
    // 注册中心
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }
   
    // 作业配置
    private static JobConfiguration createJobConfiguration() {
        String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=写博客,6=看源码";
        return JobConfiguration.newBuilder("MySimpleJob", 7)
                .cron("30 0/2 * * * ?")
                .shardingItemParameters(jobs)
                // 使用自定义的作业分片策略
                .jobShardingStrategyType("Shuffle")
                .overwrite(true)
                .build();
    }
}      

启动三个作业服务器,输出如下图所示:

ElasticJob‐Lite:自定义作业分片策略
ElasticJob‐Lite:自定义作业分片策略
ElasticJob‐Lite:自定义作业分片策略

输出符合预期,因为自定义作业分片策略是模仿​

​AverageAllocationJobShardingStrategy​

​作业分片策略,但自定义作业分片策略中将作业的分片项顺序打乱了,因此给每个作业服务器分配的作业分片项可能不是连续的。

修改作业配置(使用默认的作业分片策略):

private static JobConfiguration createJobConfiguration() {
        String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=写博客,6=看源码";
        return JobConfiguration.newBuilder("MySimpleJob", 7)
                .cron("30 0/2 * * * ?")
                .shardingItemParameters(jobs)
//                .jobShardingStrategyType("Shuffle")
                .overwrite(true)
                .build();
    }      

启动三个作业服务器,输出如下图所示:

ElasticJob‐Lite:自定义作业分片策略
ElasticJob‐Lite:自定义作业分片策略
ElasticJob‐Lite:自定义作业分片策略