天天看點

ElasticJob‐Lite:自定義作業分片政策

Java SPI機制

在上一篇部落格中介紹了​

​ElasticJob​

​的作業分片政策:

  • ​​ElasticJob‐Lite:作業分片政策介紹與源碼分析​​

其中提到了​

​ElasticJob​

​​是通過​

​Java​

​​提供的​

​SPI​

​​機制(​

​ServiceLoader​

​類)加載所有作業分片政策。

​ServiceLoader​

​​類就是​

​Java​

​​提供的​

​SPI​

​​,​

​SPI​

​​(​

​Service Provider Interface​

​​)是​

​JDK​

​​内置的一種服務提供發現機制,可以用來啟用架構擴充和替換元件,主要是被架構的開發人員使用,不同廠商可以針對同一接口做出不同的實作,比如​

​java.sql.Driver​

​​接口,​

​MySQL​

​​和​

​PostgreSQL​

​​都提供了對應的實作給使用者使用,而​

​Java​

​​的​

​SPI​

​​機制可以為某個接口尋找服務實作。​

​Java​

​​中​

​SPI​

​機制主要思想是将裝配的控制權移到程式之外,在子產品化設計中這個機制尤其重要,其核心思想就是解耦。

​ServiceLoader​

​類正常工作的唯一要求是服務提供類必須具有無參構造函數,以便它們可以在加載期間執行個體化。通過在資源目錄的​

​META-INF/services​

​​中放置服務提供者配置檔案來辨別服務提供者,檔案名是服務類型的完全限定名(比如​

​ElasticJobListener​

​​類的完全限定名),該檔案包含具體的服務提供者類的完全限定名清單(​

​ElasticJobListener​

​​實作類的完全限定名清單),每行一個,每個名稱周圍的空格和制表符以及空行都将被忽略,該檔案必須以​

​UTF-8​

​編碼。

自定義作業分片政策

所有可用的作業分片政策在​

​JobShardingStrategyFactory​

​​類的靜态塊中被加載(通過​

​ElasticJobServiceLoader​

​​類,該類是​

​ElasticJob​

​​基于​

​Java SPI​

​機制實作的特定于作業的服務加載器)。

static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }      

加載的類型是​

​JobShardingStrategy.class​

​,是以自定義的作業分片政策需要實作該接口。

自定義作業分片政策​

​ShuffleJobShardingStrategy​

​類:

package com.kaven.job.my;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.*;

public class ShuffleJobShardingStrategy implements JobShardingStrategy {

    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        // 先将作業分片項裝入容器
        List<Integer> shuffleShardingList = new ArrayList<>(shardingTotalCount);
        for (int i = 0; i < shardingTotalCount; i++) {
            shuffleShardingList.add(i);
        }
        // 将容器中的作業分片項順序打亂(使用容器的shuffle方法)
        Collections.shuffle(shuffleShardingList);
        // 模仿AverageAllocationJobShardingStrategy作業分片政策進行配置設定
        Map<JobInstance, List<Integer>> result = shardingShuffle(jobInstances, shardingTotalCount, shuffleShardingList);
        addShuffle(jobInstances, shardingTotalCount, result, shuffleShardingList);
        return result;
    }

    private Map<JobInstance, List<Integer>> shardingShuffle(final List<JobInstance> shardingUnits,
                                                            final int shardingTotalCount,
                                                            final List<Integer> shuffleShardingList) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        // 每個作業伺服器最少應該配置設定的作業分片項數
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            // 每個作業伺服器申請的作業分片項清單(容量為itemCountPerSharding + 1)
            // itemCountPerSharding + 1為每個作業伺服器最多應該配置設定的作業分片項數
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                // 給作業分片項清單添加容器中的第i個作業分片項
                shardingItems.add(shuffleShardingList.get(i));
            }
            // 将作業伺服器與它執行的作業分片項清單進行關聯
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }

    private void addShuffle(final List<JobInstance> shardingUnits, final int shardingTotalCount,
                            final Map<JobInstance, List<Integer>> shardingResults,
                            final List<Integer> shuffleShardingList) {
        // 無法平均配置設定的分片項數
        int aliquant = shardingTotalCount % shardingUnits.size();
        // 已配置設定的無法平均配置設定的分片項數
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            // 是否還有無法平均配置設定的分片項
            if (count < aliquant) {
                // 配置設定給序号較小的作業伺服器
                entry.getValue().add(
                        shuffleShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count)
                );
            }
            // 已配置設定數更新
            count++;
        }
    }

    // 作業分片政策的辨別符
    @Override
    public String getType() {
        return "Shuffle";
    }
}      

部落客自定義的​

​ShuffleJobShardingStrategy​

​​作業分片政策是模仿​

​AverageAllocationJobShardingStrategy​

​​作業分片政策(預設的作業分片政策),隻是先将作業分片項裝入容器,然後将容器中的作業分片項順序打亂(使用容器的​

​shuffle​

​​方法),之後再基于該作業分片項容器使用​

​AverageAllocationJobShardingStrategy​

​​作業分片政策給作業伺服器配置設定該容器中的作業分片項,如果不了解​

​AverageAllocationJobShardingStrategy​

​作業分片政策,可以去看看最上面列出的部落格。

添加服務實作

在​

​resources​

​​的​

​META-INF/services​

​中放置服務提供者配置檔案來辨別服務提供者,如下圖所示:

ElasticJob‐Lite:自定義作業分片政策

測試

作業定義(​

​Simple​

​作業):

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import java.text.SimpleDateFormat;
import java.util.Date;

public class MySimpleJob implements SimpleJob {

    private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    @Override
    public void execute(ShardingContext shardingContext) {
        String job = shardingContext.getShardingParameter();
        if(job == null || job.trim().equals("")) {
            System.out.println("請指定幫[Kaven]執行的任務名稱!");
            throw new RuntimeException();
        }
        System.out.printf("%s 執行任務%d - [%s]!\n", formatter.format(new Date()),
                shardingContext.getShardingItem(), job);
    }
}      

啟動類:

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

public class Application {

    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration())
                .schedule();
    }
    
    // 注冊中心
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }
   
    // 作業配置
    private static JobConfiguration createJobConfiguration() {
        String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=寫部落格,6=看源碼";
        return JobConfiguration.newBuilder("MySimpleJob", 7)
                .cron("30 0/2 * * * ?")
                .shardingItemParameters(jobs)
                // 使用自定義的作業分片政策
                .jobShardingStrategyType("Shuffle")
                .overwrite(true)
                .build();
    }
}      

啟動三個作業伺服器,輸出如下圖所示:

ElasticJob‐Lite:自定義作業分片政策
ElasticJob‐Lite:自定義作業分片政策
ElasticJob‐Lite:自定義作業分片政策

輸出符合預期,因為自定義作業分片政策是模仿​

​AverageAllocationJobShardingStrategy​

​作業分片政策,但自定義作業分片政策中将作業的分片項順序打亂了,是以給每個作業伺服器配置設定的作業分片項可能不是連續的。

修改作業配置(使用預設的作業分片政策):

private static JobConfiguration createJobConfiguration() {
        String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=寫部落格,6=看源碼";
        return JobConfiguration.newBuilder("MySimpleJob", 7)
                .cron("30 0/2 * * * ?")
                .shardingItemParameters(jobs)
//                .jobShardingStrategyType("Shuffle")
                .overwrite(true)
                .build();
    }      

啟動三個作業伺服器,輸出如下圖所示:

ElasticJob‐Lite:自定義作業分片政策
ElasticJob‐Lite:自定義作業分片政策
ElasticJob‐Lite:自定義作業分片政策