Java SPI機制
在上一篇部落格中介紹了
ElasticJob
的作業分片政策:
- ElasticJob‐Lite:作業分片政策介紹與源碼分析
其中提到了
ElasticJob
是通過
Java
提供的
SPI
機制(
ServiceLoader
類)加載所有作業分片政策。
ServiceLoader
類就是
Java
提供的
SPI
,
SPI
(
Service Provider Interface
)是
JDK
内置的一種服務提供發現機制,可以用來啟用架構擴充和替換元件,主要是被架構的開發人員使用,不同廠商可以針對同一接口做出不同的實作,比如
java.sql.Driver
接口,
MySQL
和
PostgreSQL
都提供了對應的實作給使用者使用,而
Java
的
SPI
機制可以為某個接口尋找服務實作。
Java
中
SPI
機制主要思想是将裝配的控制權移到程式之外,在子產品化設計中這個機制尤其重要,其核心思想就是解耦。
ServiceLoader
類正常工作的唯一要求是服務提供類必須具有無參構造函數,以便它們可以在加載期間執行個體化。通過在資源目錄的
META-INF/services
中放置服務提供者配置檔案來辨別服務提供者,檔案名是服務類型的完全限定名(比如
ElasticJobListener
類的完全限定名),該檔案包含具體的服務提供者類的完全限定名清單(
ElasticJobListener
實作類的完全限定名清單),每行一個,每個名稱周圍的空格和制表符以及空行都将被忽略,該檔案必須以
UTF-8
編碼。
自定義作業分片政策
所有可用的作業分片政策在
JobShardingStrategyFactory
類的靜态塊中被加載(通過
ElasticJobServiceLoader
類,該類是
ElasticJob
基于
Java SPI
機制實作的特定于作業的服務加載器)。
static {
ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
}
加載的類型是
JobShardingStrategy.class
,是以自定義的作業分片政策需要實作該接口。
自定義作業分片政策
ShuffleJobShardingStrategy
類:
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import java.util.*;
public class ShuffleJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
// 先将作業分片項裝入容器
List<Integer> shuffleShardingList = new ArrayList<>(shardingTotalCount);
for (int i = 0; i < shardingTotalCount; i++) {
shuffleShardingList.add(i);
}
// 将容器中的作業分片項順序打亂(使用容器的shuffle方法)
Collections.shuffle(shuffleShardingList);
// 模仿AverageAllocationJobShardingStrategy作業分片政策進行配置設定
Map<JobInstance, List<Integer>> result = shardingShuffle(jobInstances, shardingTotalCount, shuffleShardingList);
addShuffle(jobInstances, shardingTotalCount, result, shuffleShardingList);
return result;
}
private Map<JobInstance, List<Integer>> shardingShuffle(final List<JobInstance> shardingUnits,
final int shardingTotalCount,
final List<Integer> shuffleShardingList) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
// 每個作業伺服器最少應該配置設定的作業分片項數
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
// 每個作業伺服器申請的作業分片項清單(容量為itemCountPerSharding + 1)
// itemCountPerSharding + 1為每個作業伺服器最多應該配置設定的作業分片項數
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
// 給作業分片項清單添加容器中的第i個作業分片項
shardingItems.add(shuffleShardingList.get(i));
}
// 将作業伺服器與它執行的作業分片項清單進行關聯
result.put(each, shardingItems);
count++;
}
return result;
}
private void addShuffle(final List<JobInstance> shardingUnits, final int shardingTotalCount,
final Map<JobInstance, List<Integer>> shardingResults,
final List<Integer> shuffleShardingList) {
// 無法平均配置設定的分片項數
int aliquant = shardingTotalCount % shardingUnits.size();
// 已配置設定的無法平均配置設定的分片項數
int count = 0;
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
// 是否還有無法平均配置設定的分片項
if (count < aliquant) {
// 配置設定給序号較小的作業伺服器
entry.getValue().add(
shuffleShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count)
);
}
// 已配置設定數更新
count++;
}
}
// 作業分片政策的辨別符
@Override
public String getType() {
return "Shuffle";
}
}
部落客自定義的
ShuffleJobShardingStrategy
作業分片政策是模仿
AverageAllocationJobShardingStrategy
作業分片政策(預設的作業分片政策),隻是先将作業分片項裝入容器,然後将容器中的作業分片項順序打亂(使用容器的
shuffle
方法),之後再基于該作業分片項容器使用
AverageAllocationJobShardingStrategy
作業分片政策給作業伺服器配置設定該容器中的作業分片項,如果不了解
AverageAllocationJobShardingStrategy
作業分片政策,可以去看看最上面列出的部落格。
添加服務實作
在
resources
的
META-INF/services
中放置服務提供者配置檔案來辨別服務提供者,如下圖所示:
測試
作業定義(
Simple
作業):
package com.kaven.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MySimpleJob implements SimpleJob {
private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void execute(ShardingContext shardingContext) {
String job = shardingContext.getShardingParameter();
if(job == null || job.trim().equals("")) {
System.out.println("請指定幫[Kaven]執行的任務名稱!");
throw new RuntimeException();
}
System.out.printf("%s 執行任務%d - [%s]!\n", formatter.format(new Date()),
shardingContext.getShardingItem(), job);
}
}
啟動類:
package com.kaven.job;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration())
.schedule();
}
// 注冊中心
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9000", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
// 作業配置
private static JobConfiguration createJobConfiguration() {
String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=寫部落格,6=看源碼";
return JobConfiguration.newBuilder("MySimpleJob", 7)
.cron("30 0/2 * * * ?")
.shardingItemParameters(jobs)
// 使用自定義的作業分片政策
.jobShardingStrategyType("Shuffle")
.overwrite(true)
.build();
}
}
啟動三個作業伺服器,輸出如下圖所示:
輸出符合預期,因為自定義作業分片政策是模仿
AverageAllocationJobShardingStrategy
作業分片政策,但自定義作業分片政策中将作業的分片項順序打亂了,是以給每個作業伺服器配置設定的作業分片項可能不是連續的。
修改作業配置(使用預設的作業分片政策):
private static JobConfiguration createJobConfiguration() {
String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=寫部落格,6=看源碼";
return JobConfiguration.newBuilder("MySimpleJob", 7)
.cron("30 0/2 * * * ?")
.shardingItemParameters(jobs)
// .jobShardingStrategyType("Shuffle")
.overwrite(true)
.build();
}
啟動三個作業伺服器,輸出如下圖所示: