的作業分類基于
ElasticJob
和
class
兩種類型。基于
type
的作業需要開發者自行通過實作接口的方式織入業務邏輯;基于
class
的作業則無需編碼,隻需要提供相應配置即可。基于
type
的作業接口的方法參數
class
包含作業配置、片和運作時資訊。可通過
shardingContext
、
getShardingTotalCount()
getShardingItem()
等方法分别擷取分片總數和運作在本作業伺服器的分片序列号等。
目前提供
ElasticJob
、
Simple
這兩種基于
Dataflow
的作業類型,并提供
class
、
Script
這兩種基于
HTTP
的作業類型,使用者可通過實作
type
接口自行擴充作業類型。
SPI
添加依賴(
3.0.1
是目前最新的
Releases
版本):
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>3.0.1</version>
</dependency>
本篇部落格介紹如何通過實作
SPI
接口自行擴充作業類型。
擴充作業類型
想要通過實作
SPI
接口自行擴充作業類型需要三個步驟(基于
class
的作業類型),而基于
type
的作業類型隻需要後面兩個步驟:
- 定義作業邏輯的執行接口(基于
的作業類型不需要此步驟,比如type
作業的作業邏輯由腳本程式執行,而Script
作業的作業邏輯由請求的服務端執行,是以基于HTTP
的作業類型不需要定義作業邏輯的執行接口)。type
- 實作作業邏輯執行接口的
。JobItemExecutor
- 通過
的方式聲明實作的Java SPI
。JobItemExecutor
作業邏輯執行接口
KavenJob
接口(繼承
ElasticJob
接口,作業邏輯的執行接口):
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
public interface KavenJob extends ElasticJob {
void work(ShardingContext shardingContext, String jobExecutorName);
}
KavenJob
接口的實作類
MyKavenJob
(實作作業邏輯的執行):
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyKavenJob implements KavenJob{
private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void work(ShardingContext shardingContext, String jobExecutorName) {
String job = shardingContext.getShardingParameter();
if(job == null || job.trim().equals("")) {
System.out.println("請指定幫[Kaven]執行的任務名稱!");
throw new RuntimeException();
}
System.out.printf("%s[%s]:幫[Kaven]執行%s任務!\n", jobExecutorName, formatter.format(new Date()), job);
}
}
内置的
Simple
和
Dataflow
作業的作業邏輯執行接口也是這樣定義的:
package org.apache.shardingsphere.elasticjob.simple.job;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
public interface SimpleJob extends ElasticJob {
void execute(ShardingContext var1);
}
package org.apache.shardingsphere.elasticjob.dataflow.job;
import java.util.List;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
public interface DataflowJob<T> extends ElasticJob {
List<T> fetchData(ShardingContext var1);
void processData(ShardingContext var1, List<T> var2);
}
是以想要定義
Simple
或
Dataflow
作業就隻需要分别實作
SimpleJob
或
DataflowJob
接口即可。
JobItemExecutor
ElasticJob
的作業分類基于
class
和
type
兩種類型,是以
JobItemExecutor
必須繼承或實作
ClassedJobItemExecutor
或者
TypedJobItemExecutor
接口。
KavenJobItemExecutor
接口(繼承了
ClassedJobItemExecutor
,如果要繼承
TypedJobItemExecutor
接口來擴充
type
類型作業的自定義
JobItemExecutor
,也是類似的):
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
public interface KavenJobItemExecutor extends ClassedJobItemExecutor<KavenJob> {
String getJobExecutorName();
}
KavenJobItemExecutor
接口的實作類
MyKavenJobExecutor
:
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
public class MyKavenJobExecutor implements KavenJobItemExecutor {
public MyKavenJobExecutor() {}
@Override
public Class<KavenJob> getElasticJobClass() {
return KavenJob.class;
}
@Override
public void process(KavenJob kavenJob, JobConfiguration jobConfiguration, JobFacade jobFacade, ShardingContext shardingContext) {
kavenJob.work(shardingContext, getJobExecutorName());
}
@Override
public String getJobExecutorName() {
return this.getClass().getName();
}
}
很顯然
MyKavenJob
類中的
work
方法将在
MyKavenJobExecutor
類的
process
方法中調用,這是由
ElasticJob
控制的。
Simple
、
Dataflow
、
Script
和
HTTP
作業也是如此,在
JobItemExecutor
中執行作業的分片(以
Simple
和
Script
作業為例):
package org.apache.shardingsphere.elasticjob.simple.executor;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {
public SimpleJobExecutor() {
}
public void process(SimpleJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) {
elasticJob.execute(shardingContext);
}
public Class<SimpleJob> getElasticJobClass() {
return SimpleJob.class;
}
}
package org.apache.shardingsphere.elasticjob.script.executor;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.infra.json.GsonFactory;
public final class ScriptJobExecutor implements TypedJobItemExecutor {
public ScriptJobExecutor() {
}
public void process(ElasticJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) {
CommandLine commandLine = CommandLine.parse(this.getScriptCommandLine(jobConfig.getProps()));
commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);
try {
(new DefaultExecutor()).execute(commandLine);
} catch (IOException var7) {
throw new JobSystemException("Execute script failure.", new Object[]{var7});
}
}
private String getScriptCommandLine(Properties props) {
String result = props.getProperty("script.command.line");
if (Strings.isNullOrEmpty(result)) {
throw new JobConfigurationException("Cannot find script command line, job is not executed.", new Object[0]);
} else {
return result;
}
}
public String getType() {
return "SCRIPT";
}
}
聲明JobItemExecutor
通過
Java SPI
的方式聲明實作的
JobItemExecutor
,那為什麼需要聲明實作的
JobItemExecutor
?因為
ElasticJob
需要知道作業對應的
JobItemExecutor
,以便用它來執行該作業的分片。
ElasticJob
通過
ScheduleJobBootstrap
執行個體來完成定時任務的排程。
Application
類(啟動類):
package com.kaven.job;
import com.kaven.job.my.MyKavenJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
/**
* @Author: ITKaven
* @Date: 2021/11/20 17:05
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MyKavenJob(),
createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9999", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=其他";
return JobConfiguration.newBuilder("KavenJob", 6)
.cron("30 * * * * ?")
.shardingItemParameters(jobs)
.overwrite(true)
.failover(true)
.build();
}
}
建立
ScheduleJobBootstrap
執行個體,也會建立
JobScheduler
執行個體。
public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {
this.jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
}
public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {
this.jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig);
}
而在建立
JobScheduler
執行個體時,還會建立
ElasticJobExecutor
執行個體。
public JobScheduler(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {
...
this.jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, this.jobFacade);
...
}
public JobScheduler(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {
...
this.jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig, this.jobFacade);
...
}
在建立
ElasticJobExecutor
執行個體時,會通過
JobItemExecutorFactory
類擷取作業對應的
JobItemExecutor
。
public ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade) {
this(elasticJob, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(elasticJob.getClass()));
}
public ElasticJobExecutor(final String type, final JobConfiguration jobConfig, final JobFacade jobFacade) {
this(null, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(type));
}
JobItemExecutorFactory
類:
package org.apache.shardingsphere.elasticjob.executor.item;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
/**
* Job item executor factory.
*/
@SuppressWarnings("rawtypes")
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobItemExecutorFactory {
private static final Map<Class, ClassedJobItemExecutor> CLASSED_EXECUTORS = new HashMap<>();
static {
ElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);
ServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));
}
/**
* Get executor.
*
* @param elasticJobClass elastic job class
* @return job item executor
*/
@SuppressWarnings("unchecked")
public static JobItemExecutor getExecutor(final Class<? extends ElasticJob> elasticJobClass) {
for (Entry<Class, ClassedJobItemExecutor> entry : CLASSED_EXECUTORS.entrySet()) {
if (entry.getKey().isAssignableFrom(elasticJobClass)) {
return entry.getValue();
}
}
throw new JobConfigurationException("Can not find executor for elastic job class `%s`", elasticJobClass.getName());
}
/**
* Get executor.
*
* @param elasticJobType elastic job type
* @return job item executor
*/
public static JobItemExecutor getExecutor(final String elasticJobType) {
return ElasticJobServiceLoader.getCachedTypedServiceInstance(TypedJobItemExecutor.class, elasticJobType)
.orElseThrow(() -> new JobConfigurationException("Cannot find executor for elastic job type `%s`", elasticJobType));
}
}
ServiceLoader
類就是
Java
提供的
SPI
,
SPI
(
Service Provider Interface
)是
JDK
内置的一種服務提供發現機制,可以用來啟用架構擴充和替換元件,主要是被架構的開發人員使用,不同廠商可以針對同一接口做出不同的實作,比如
java.sql.Driver
接口,
MySQL
和
PostgreSQL
都提供了對應的實作給使用者使用,而
Java
的
SPI
機制可以為某個接口尋找服務實作。
Java
中
SPI
機制主要思想是将裝配的控制權移到程式之外,在子產品化設計中這個機制尤其重要,其核心思想就是解耦。
ServiceLoader
類正常工作的唯一要求是服務提供類必須具有無參構造函數,以便它們可以在加載期間執行個體化。通過在資源目錄的
META-INF/services
中放置服務提供者配置檔案來辨別服務提供者,檔案名是服務類型的完全限定名,該檔案包含具體的服務提供者類的完全限定名清單,每行一個,每個名稱周圍的空格和制表符以及空行都将被忽略,該檔案必須以
UTF-8
編碼。如下圖所示:
完成這三個步驟,我們實作的
JobItemExecutor
就可以被
ElasticJob
發現,以便将它們用于對應作業分片的執行。
static {
// 加載type作業的JobItemExecutor
ElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);
// 加載class作業的JobItemExecutor
ServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));
}
在
JobItemExecutorFactory
類的靜态塊中會加載
class
和
type
作業的
JobItemExecutor
,加載
class
作業的
JobItemExecutor
時,以
each.getElasticJobClass()
為
key
,
each
為
value
。而
MyKavenJobExecutor
類的
getElasticJobClass
方法傳回
KavenJob.class
,這樣作業和
JobItemExecutor
就對應起來了。
@Override
public Class<KavenJob> getElasticJobClass() {
return KavenJob.class;
}
加載
type
作業的
JobItemExecutor
是在
ElasticJobServiceLoader
類中完成的(也是使用
ServiceLoader
類來加載),以
instance.getType()
為
key
,
instance
為
value
。
public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
if (TYPED_SERVICES.containsKey(typedService)) {
return;
}
ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
}
private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
}
這也是為什麼
ScheduleJobBootstrap
構造器的
elasticJobType
參數需要全部大寫(比如
SCRIPT
和
HTTP
)的原因。
new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT",
createJobConfiguration()).schedule();
因為這兩種作業對應的
JobItemExecutor
就是使用
getType()
的傳回值作為
key
進行存儲的。
public String getType() {
return "SCRIPT";
}
public String getType() {
return "HTTP";
}
這樣就通過實作
SPI
接口自行擴充了作業類型,輸出如下圖所示: