天天看点

ElasticJob‐Lite:作业分片策略介绍与源码分析

分片

弹性调度是​

​ElasticJob​

​​最重要的功能,也是这款产品名称的由来。它是一款能够让任务通过分片进行水

平扩展的任务处理系统。

​ElasticJob​

​​中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,​

​ElasticJob​

​​会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

举例说明,如果作业分为​

​4​

​​片,用两台服务器执行,则每个服务器分到​

​2​

​​片,分别负责作业的​

​50%​

​的负载,如下图所示。

ElasticJob‐Lite:作业分片策略介绍与源码分析

​ElasticJob​

​​并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。分片项为数字,始于​

​0​

​​而终于分片总数减​

​1​

​​。以上是​

​ElasticJob​

​​的官方文档对分片的描述,而文档对作业分片策略的介绍非常简单,只给了作业分片策略的​

​SPI​

​名称,如下图所示:

ElasticJob‐Lite:作业分片策略介绍与源码分析

作业分片策略

博主目前使用的是​

​3.0.1​

​​版本的​

​ElasticJob‐Lite​

​(目前最新版本)。

<dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-core</artifactId>
            <version>3.0.1</version>
        </dependency>      
ElasticJob‐Lite:作业分片策略介绍与源码分析

作业分片策略的​

​SPI​

​​名称是​

​JobShardingStrategy​

​,是作业分片策略的顶层设计。

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;

import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;

import java.util.List;
import java.util.Map;

/**
 * 作业分片策略
 */
public interface JobShardingStrategy extends TypedSPI {
    
    /**
     * 作业分片
     * jobInstances – 参与分片的所有作业实例(作业服务器)
     * jobName -作业名称
     * shardingTotalCount – 分片总数
     */
    Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}      

​JobShardingStrategy​

​​接口的​

​sharding​

​​方法就是用来定义作业分片的逻辑,供子类实现,目前有三个实现类:​

​AverageAllocationJobShardingStrategy​

​​、​

​OdevitySortByNameJobShardingStrategy​

​​以及​

​RoundRobinByNameJobShardingStrategy​

​。

ElasticJob‐Lite:作业分片策略介绍与源码分析

AverageAllocationJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
        addAliquant(jobInstances, shardingTotalCount, result);
        return result;
    }
    
    private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
    
    private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            count++;
        }
    }
    
    @Override
    public String getType() {
        return "AVG_ALLOCATION";
    }
}      

这是一种尽量平均分配的分片策略,如果作业的分片项无法平均分配给所有的作业服务器,即作业的分片项数​

​%​

​作业服务器数不为零,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。 例如:

  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​9​

    ​​,每个作业服务器的分片项为:​

    ​1=[0,1,2]​

    ​​,​

    ​2=[3,4,5]​

    ​​,​

    ​3=[6,7,8]​

    ​。
  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​8​

    ​​,每个作业服务器的分片项为:​

    ​1=[0,1,6]​

    ​​,​

    ​2=[2,3,7]​

    ​​,​

    ​3=[4,5]​

    ​。
  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​10​

    ​​,每个作业服务器的分片项为:​

    ​1=[0,1,2,9]​

    ​​,​

    ​2=[3,4,5]​

    ​​,​

    ​3=[6,7,8]​

    ​。

先给每个作业服务器分配相同数量的作业分片项(数量为:作业的分片项数​

​/​

​作业服务器数)。

private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        // 每个作业服务器最少应该分配的作业分片项数
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            // 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1)
            // itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                // 给作业分片项列表添加第i个作业分片项
                shardingItems.add(i);
            }
            // 将作业服务器与它执行的作业分片项列表进行关联
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }      

如果作业的分片项无法平均分配给所有的作业服务器,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。

private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
        // 无法平均分配的分片项数
        int aliquant = shardingTotalCount % shardingUnits.size();
        // 已分配的无法平均分配的分片项数
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            // 是否还有无法平均分配的分片项
            if (count < aliquant) {
                // 分配给序号较小的作业服务器
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            // 已分配数更新
            count++;
        }
    }      

OdevitySortByNameJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
    
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        long jobNameHash = jobName.hashCode();
        if (0 == jobNameHash % 2) {
            Collections.reverse(jobInstances);
        }
        return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
    }
    
    @Override
    public String getType() {
        return "ODEVITY";
    }
}      

其实还是使用​

​AverageAllocationJobShardingStrategy​

​​作业分片策略进行分配,只是会先根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行​

​reverse​

​操作。例如:

  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​2​

    ​​,作业名称的哈希码为奇数(对作业服务器列表不进行​

    ​reverse​

    ​​操作),每个作业服务器的分片项为:​

    ​1=[0]​

    ​​,​

    ​2=[1]​

    ​​,​

    ​3=[]​

    ​。
  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​2​

    ​​,作业名的哈希码是偶数(对作业服务器列表进行​

    ​reverse​

    ​​操作),每个作业服务器的分片项为:​

    ​3=[0]​

    ​​,​

    ​2=[1]​

    ​​,​

    ​1=[]​

    ​。

RoundRobinByNameJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public final class RoundRobinByNameJobShardingStrategy implements JobShardingStrategy {
    
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
    }
    
    private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) {
        int shardingUnitsSize = shardingUnits.size();
        int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
        if (0 == offset) {
            return shardingUnits;
        }
        List<JobInstance> result = new ArrayList<>(shardingUnitsSize);
        for (int i = 0; i < shardingUnitsSize; i++) {
            int index = (i + offset) % shardingUnitsSize;
            result.add(shardingUnits.get(index));
        }
        return result;
    }
    
    @Override
    public String getType() {
        return "ROUND_ROBIN";
    }
}      

其实跟​

​OdevitySortByNameJobShardingStrategy​

​​作业分片策略类似,都是使用​

​AverageAllocationJobShardingStrategy​

​​作业分片策略进行分配,并且在分配前都会根据作业名称的哈希码将作业服务器列表中的作业服务器项改变顺序,只是变序规则不一样而已,​

​OdevitySortByNameJobShardingStrategy​

​​作业分片策略根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行​

​reverse​

​​操作,而​

​RoundRobinByNameJobShardingStrategy​

​​作业分片策略根据作业名称的哈希码的绝对值​

​%​

​​作业服务器数的值对作业服务器列表进行​

​rotate​

​操作。例如:

  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​2​

    ​​,作业名称的哈希码的绝对值​

    ​%​

    ​​作业服务器数的值为​

    ​0​

    ​​,每个作业服务器的分片项为:​

    ​1=[0]​

    ​​,​

    ​2=[1]​

    ​​,​

    ​3=[]​

    ​。
  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​2​

    ​​,作业名称的哈希码的绝对值​

    ​%​

    ​​作业服务器数的值为​

    ​1​

    ​​,每个作业服务器的分片项为:​

    ​2=[0]​

    ​​,​

    ​3=[1]​

    ​​,​

    ​1=[]​

    ​。
  • 如果有​

    ​3​

    ​​个作业服务器,总分片数为​

    ​2​

    ​​,作业名称的哈希码的绝对值​

    ​%​

    ​​作业服务器数的值为​

    ​2​

    ​​,每个作业服务器的分片项为:​

    ​3=[0]​

    ​​,​

    ​1=[1]​

    ​​,​

    ​2=[]​

    ​。

JobShardingStrategyFactory

作业的分片策略通过​

​JobShardingStrategyFactory​

​​类(作业分片策略工厂类)的​

​getStrategy​

​方法获取,源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;

import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobShardingStrategyFactory {
    
    private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
    
    static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }
    
    public static JobShardingStrategy getStrategy(final String type) {
        if (Strings.isNullOrEmpty(type)) {
            return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, DEFAULT_STRATEGY).get();
        }
        return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, type)
                .orElseThrow(() -> new JobConfigurationException("Cannot find sharding strategy using type '%s'.", type));
    }
}      

在​

​JobShardingStrategyFactory​

​​类的静态块中使用​

​ElasticJobServiceLoader​

​​类的​

​registerTypedService​

​方法加载所有作业分片策略。

static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }      

​ElasticJobServiceLoader​

​​类的相关代码如下所示,通过​

​Java​

​​提供的​

​SPI​

​​机制(​

​ServiceLoader​

​类)加载所有作业分片策略。

private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, TypedSPI>> TYPED_SERVICES = new ConcurrentHashMap<>();
    
    private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, Class<? extends TypedSPI>>> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>();
 
    public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
        if (TYPED_SERVICES.containsKey(typedService)) {
            return;
        }
        ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
    }
    
    private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
        TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
    }      

默认为​

​AverageAllocationJobShardingStrategy​

​作业分片策略,和官方文档给的示意图是对应的。

private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";      
@Override
    public String getType() {
        return "AVG_ALLOCATION";
    }