天天看點

ElasticJob‐Lite:擴充作業類型

​ElasticJob​

​​的作業分類基于​

​class​

​​和​

​type​

​​兩種類型。基于​

​class​

​​的作業需要開發者自行通過實作接口的方式織入業務邏輯;基于​

​type​

​​的作業則無需編碼,隻需要提供相應配置即可。基于​

​class​

​​的作業接口的方法參數​

​shardingContext​

​​包含作業配置、片和運作時資訊。可通過​

​getShardingTotalCount()​

​​、​

​getShardingItem()​

​等方法分别擷取分片總數和運作在本作業伺服器的分片序列号等。

​ElasticJob​

​​目前提供​

​Simple​

​​、​

​Dataflow​

​​這兩種基于​

​class​

​​的作業類型,并提供​

​Script​

​​、​

​HTTP​

​​這兩種基于​

​type​

​​的作業類型,使用者可通過實作​

​SPI​

​接口自行擴充作業類型。

添加依賴(​

​3.0.1​

​​是目前最新的​

​Releases​

​版本):

<dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-core</artifactId>
            <version>3.0.1</version>
        </dependency>      

本篇部落格介紹如何通過實作​

​SPI​

​接口自行擴充作業類型。

擴充作業類型

想要通過實作​

​SPI​

​​接口自行擴充作業類型需要三個步驟(基于​

​class​

​​的作業類型),而基于​

​type​

​的作業類型隻需要後面兩個步驟:

  1. 定義作業邏輯的執行接口(基于​

    ​type​

    ​​的作業類型不需要此步驟,比如​

    ​Script​

    ​​作業的作業邏輯由腳本程式執行,而​

    ​HTTP​

    ​​作業的作業邏輯由請求的服務端執行,是以基于​

    ​type​

    ​的作業類型不需要定義作業邏輯的執行接口)。
  2. 實作作業邏輯執行接口的​

    ​JobItemExecutor​

    ​。
  3. 通過​

    ​Java SPI​

    ​​的方式聲明實作的​

    ​JobItemExecutor​

    ​。

作業邏輯執行接口

​KavenJob​

​​接口(繼承​

​ElasticJob​

​接口,作業邏輯的執行接口):

package com.kaven.job.my;

import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;

public interface KavenJob extends ElasticJob {
    void work(ShardingContext shardingContext, String jobExecutorName);
}      

​KavenJob​

​​接口的實作類​

​MyKavenJob​

​(實作作業邏輯的執行):

package com.kaven.job.my;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import java.text.SimpleDateFormat;
import java.util.Date;

public class MyKavenJob implements KavenJob{

    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    public void work(ShardingContext shardingContext, String jobExecutorName) {
        String job = shardingContext.getShardingParameter();
        if(job == null || job.trim().equals("")) {
            System.out.println("請指定幫[Kaven]執行的任務名稱!");
            throw new RuntimeException();
        }
        System.out.printf("%s[%s]:幫[Kaven]執行%s任務!\n", jobExecutorName, formatter.format(new Date()), job);
    }
}      

内置的​

​Simple​

​​和​

​Dataflow​

​作業的作業邏輯執行接口也是這樣定義的:

package org.apache.shardingsphere.elasticjob.simple.job;

import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;

public interface SimpleJob extends ElasticJob {
    void execute(ShardingContext var1);
}      
package org.apache.shardingsphere.elasticjob.dataflow.job;

import java.util.List;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;

public interface DataflowJob<T> extends ElasticJob {
    List<T> fetchData(ShardingContext var1);

    void processData(ShardingContext var1, List<T> var2);
}      

是以想要定義​

​Simple​

​​或​

​Dataflow​

​​作業就隻需要分别實作​

​SimpleJob​

​​或​

​DataflowJob​

​接口即可。

JobItemExecutor

​ElasticJob​

​​的作業分類基于​

​class​

​​和​

​type​

​​兩種類型,是以​

​JobItemExecutor​

​​必須繼承或實作​

​ClassedJobItemExecutor​

​​或者​

​TypedJobItemExecutor​

​接口。

​KavenJobItemExecutor​

​​接口(繼承了​

​ClassedJobItemExecutor​

​​,如果要繼承​

​TypedJobItemExecutor​

​​接口來擴充​

​type​

​​類型作業的自定義​

​JobItemExecutor​

​,也是類似的):

package com.kaven.job.my;

import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;

public interface KavenJobItemExecutor extends ClassedJobItemExecutor<KavenJob> {
    String getJobExecutorName();
}      

​KavenJobItemExecutor​

​​接口的實作類​

​MyKavenJobExecutor​

​:

package com.kaven.job.my;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;

public class MyKavenJobExecutor implements KavenJobItemExecutor {

    public MyKavenJobExecutor() {}
    
    @Override
    public Class<KavenJob> getElasticJobClass() {
        return KavenJob.class;
    }

    @Override
    public void process(KavenJob kavenJob, JobConfiguration jobConfiguration, JobFacade jobFacade, ShardingContext shardingContext) {
        kavenJob.work(shardingContext, getJobExecutorName());
    }

    @Override
    public String getJobExecutorName() {
        return this.getClass().getName();
    }
}      

很顯然​

​MyKavenJob​

​​類中的​

​work​

​​方法将在​

​MyKavenJobExecutor​

​​類的​

​process​

​​方法中調用,這是由​

​ElasticJob​

​​控制的。​

​Simple​

​​、​

​Dataflow​

​​、​

​Script​

​​和​

​HTTP​

​​作業也是如此,在​

​JobItemExecutor​

​​中執行作業的分片(以​

​Simple​

​​和​

​Script​

​作業為例):

package org.apache.shardingsphere.elasticjob.simple.executor;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {
    public SimpleJobExecutor() {
    }

    public void process(SimpleJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) {
        elasticJob.execute(shardingContext);
    }

    public Class<SimpleJob> getElasticJobClass() {
        return SimpleJob.class;
    }
}      
package org.apache.shardingsphere.elasticjob.script.executor;

import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.infra.json.GsonFactory;

public final class ScriptJobExecutor implements TypedJobItemExecutor {
    public ScriptJobExecutor() {
    }

    public void process(ElasticJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) {
        CommandLine commandLine = CommandLine.parse(this.getScriptCommandLine(jobConfig.getProps()));
        commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);

        try {
            (new DefaultExecutor()).execute(commandLine);
        } catch (IOException var7) {
            throw new JobSystemException("Execute script failure.", new Object[]{var7});
        }
    }

    private String getScriptCommandLine(Properties props) {
        String result = props.getProperty("script.command.line");
        if (Strings.isNullOrEmpty(result)) {
            throw new JobConfigurationException("Cannot find script command line, job is not executed.", new Object[0]);
        } else {
            return result;
        }
    }

    public String getType() {
        return "SCRIPT";
    }
}      

聲明JobItemExecutor

通過​

​Java SPI​

​​的方式聲明實作的​

​JobItemExecutor​

​​,那為什麼需要聲明實作的​

​JobItemExecutor​

​​?因為​

​ElasticJob​

​​需要知道作業對應的​

​JobItemExecutor​

​​,以便用它來執行該作業的分片。​

​ElasticJob​

​​通過​

​ScheduleJobBootstrap​

​執行個體來完成定時任務的排程。

​Application​

​類(啟動類):

package com.kaven.job;

import com.kaven.job.my.MyKavenJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:05
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class Application {

    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new MyKavenJob(),
                createJobConfiguration()).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9999", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }
    private static JobConfiguration createJobConfiguration() {
        String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=其他";
        return JobConfiguration.newBuilder("KavenJob", 6)
                .cron("30 * * * * ?")
                .shardingItemParameters(jobs)
                .overwrite(true)
                .failover(true)
                .build();
    }
}      

建立​

​ScheduleJobBootstrap​

​​執行個體,也會建立​

​JobScheduler​

​執行個體。

public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {
        this.jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
    }

    public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {
        this.jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig);
    }      

而在建立​

​JobScheduler​

​​執行個體時,還會建立​

​ElasticJobExecutor​

​執行個體。

public JobScheduler(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {
        ...
        this.jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, this.jobFacade);
        ...
    }

    public JobScheduler(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {
        ...
        this.jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig, this.jobFacade);
        ...
    }      

在建立​

​ElasticJobExecutor​

​​執行個體時,會通過​

​JobItemExecutorFactory​

​​類擷取作業對應的​

​JobItemExecutor​

​。

public ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade) {
        this(elasticJob, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(elasticJob.getClass()));
    }
    
    public ElasticJobExecutor(final String type, final JobConfiguration jobConfig, final JobFacade jobFacade) {
        this(null, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(type));
    }      

​JobItemExecutorFactory​

​類:

package org.apache.shardingsphere.elasticjob.executor.item;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;

/**
 * Job item executor factory.
 */
@SuppressWarnings("rawtypes")
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobItemExecutorFactory {
    
    private static final Map<Class, ClassedJobItemExecutor> CLASSED_EXECUTORS = new HashMap<>();
    
    static {
        ElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);
        ServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));
    }
    
    /**
     * Get executor.
     * 
     * @param elasticJobClass elastic job class
     * @return job item executor
     */
    @SuppressWarnings("unchecked")
    public static JobItemExecutor getExecutor(final Class<? extends ElasticJob> elasticJobClass) {
        for (Entry<Class, ClassedJobItemExecutor> entry : CLASSED_EXECUTORS.entrySet()) {
            if (entry.getKey().isAssignableFrom(elasticJobClass)) {
                return entry.getValue();
            }
        }
        throw new JobConfigurationException("Can not find executor for elastic job class `%s`", elasticJobClass.getName());
    }
    
    /**
     * Get executor.
     *
     * @param elasticJobType elastic job type
     * @return job item executor
     */
    public static JobItemExecutor getExecutor(final String elasticJobType) {
        return ElasticJobServiceLoader.getCachedTypedServiceInstance(TypedJobItemExecutor.class, elasticJobType)
                .orElseThrow(() -> new JobConfigurationException("Cannot find executor for elastic job type `%s`", elasticJobType));
    }
}      

​ServiceLoader​

​​類就是​

​Java​

​​提供的​

​SPI​

​​,​

​SPI​

​​(​

​Service Provider Interface​

​​)是​

​JDK​

​​内置的一種服務提供發現機制,可以用來啟用架構擴充和替換元件,主要是被架構的開發人員使用,不同廠商可以針對同一接口做出不同的實作,比如​

​java.sql.Driver​

​​接口,​

​MySQL​

​​和​

​PostgreSQL​

​​都提供了對應的實作給使用者使用,而​

​Java​

​​的​

​SPI​

​​機制可以為某個接口尋找服務實作。​

​Java​

​​中​

​SPI​

​機制主要思想是将裝配的控制權移到程式之外,在子產品化設計中這個機制尤其重要,其核心思想就是解耦。

​ServiceLoader​

​類正常工作的唯一要求是服務提供類必須具有無參構造函數,以便它們可以在加載期間執行個體化。通過在資源目錄的​

​META-INF/services​

​​中放置服務提供者配置檔案來辨別服務提供者,檔案名是服務類型的完全限定名,該檔案包含具體的服務提供者類的完全限定名清單,每行一個,每個名稱周圍的空格和制表符以及空行都将被忽略,該檔案必須以​

​UTF-8​

​編碼。如下圖所示:

ElasticJob‐Lite:擴充作業類型

完成這三個步驟,我們實作的​

​JobItemExecutor​

​​就可以被​

​ElasticJob​

​發現,以便将它們用于對應作業分片的執行。

static {
        // 加載type作業的JobItemExecutor
        ElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);
        // 加載class作業的JobItemExecutor
        ServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));
    }      

在​

​JobItemExecutorFactory​

​​類的靜态塊中會加載​

​class​

​​和​

​type​

​​作業的​

​JobItemExecutor​

​​,加載​

​class​

​​作業的​

​JobItemExecutor​

​​時,以​

​each.getElasticJobClass()​

​​為​

​key​

​​,​

​each​

​​為​

​value​

​​。而​

​MyKavenJobExecutor​

​​類的​

​getElasticJobClass​

​​方法傳回​

​KavenJob.class​

​​,這樣作業和​

​JobItemExecutor​

​就對應起來了。

@Override
    public Class<KavenJob> getElasticJobClass() {
        return KavenJob.class;
    }      

加載​

​type​

​​作業的​

​JobItemExecutor​

​​是在​

​ElasticJobServiceLoader​

​​類中完成的(也是使用​

​ServiceLoader​

​​類來加載),以​

​instance.getType()​

​​為​

​key​

​​,​

​instance​

​​為​

​value​

​。

public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
        if (TYPED_SERVICES.containsKey(typedService)) {
            return;
        }
        ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
    }
    private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
        TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
    }      

這也是為什麼​

​ScheduleJobBootstrap​

​​構造器的​

​elasticJobType​

​​參數需要全部大寫(比如​

​SCRIPT​

​​和​

​HTTP​

​)的原因。

new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT",
        createJobConfiguration()).schedule();      

因為這兩種作業對應的​

​JobItemExecutor​

​​就是使用​

​getType()​

​​的傳回值作為​

​key​

​進行存儲的。

public String getType() {
        return "SCRIPT";
    }
    public String getType() {
        return "HTTP";
    }      

這樣就通過實作​

​SPI​

​接口自行擴充了作業類型,輸出如下圖所示:

ElasticJob‐Lite:擴充作業類型
ElasticJob‐Lite:擴充作業類型
ElasticJob‐Lite:擴充作業類型