文章目錄
- 一、前言
- 二、修改Job預設的分片算法
-
- 1、通過xml配置job時修改分片算法
- 2、如果通過bean注入Job的,那麼添加如下
- 3、修改效果
-
- 1、不指定預設分片政策如下:
- 2、指定OdevitySortByNameJobShardingStrategy
- 3、指定RotateServerByNameJobShardingStrategy
- 4、如果不想調用http進行統計,還可以直接從zk擷取任務節點進行統計
一、前言
elastic-job版本2.1.5
如果大部分應用分片數都是1,如果用預設的分片算法,那麼所有任務都會配置設定到 Ip 最小的那個節點上,這樣會導緻嚴重的負載不均衡。好在elastic-job提供了3種分片算法
1、AverageAllocationJobShardingStrategy平均配置設定分片算法
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
政策說明:
基于平均配置設定算法的分片政策,也是預設的分片政策。
如果分片不能整除,則不能整除的多餘分片将依次追加到序号小的伺服器。如:
如果有3台伺服器,分成9片,則每台伺服器分到的分片是:
伺服器1=[0,1,2], 伺服器2=[3,4,5], 伺服器3=[6,7,8]
如果有3台伺服器,分成8片,則每台伺服器分到的分片是:
伺服器1=[0,1,6], 伺服器2=[2,3,7], 伺服器3=[4,5]
如果有3台伺服器,分成10片,則每台伺服器分到的分片是:
伺服器1=[0,1,2,9], 伺服器2=[3,4,5], 伺服器3=[6,7,8]
2、OdevitySortByNameJobShardingStrategy 作業名的哈希值奇偶數決定IP升降序算法
3、RotateServerByNameJobShardingStrategy 作業名的哈希值對伺服器清單進行輪轉的分片政策
這裡先不展開這3種分片算法的源碼了,源碼解析可檢視另一篇:
elastic-job源碼分析
二、修改Job預設的分片算法
通過上面三種分片的分析,可以看出,如果用
OdevitySortByNameJobShardingStrategy
那麼任務會根據名字的hash 配置設定置 最大ip節點和最小ip節點上,那麼靠中間位置的節點 也得不到利用。而
RotateServerByNameJobShardingStrategy
作業名的哈希值對伺服器清單進行輪轉的分片政策。
那麼下面修改job的分片政策:
如果注冊job時不指定分片算法,就是預設的分片算法
AverageAllocationJobShardingStrategy
這樣所有任務在一個節點上運作。
1、通過xml配置job時修改分片算法
2、如果通過bean注入Job的,那麼添加如下
在定義LiteJobConfiguration時指定分片參數
LiteJobConfiguration getLiteJobConfiguration(TaskJobEnum taskJobEnum, Class<? extends SimpleJob> jobClass) {
// 定義作業核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(taskJobEnum.getJobName(),
taskJobEnum.getJobCron(), taskJobEnum.getShardingTotalCount()).
shardingItemParameters(taskJobEnum.getShardingItemParameters()).build();
// 定義SIMPLE類型配置
SimpleJobConfiguration simpleJobConfig =
new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
// 定義Lite作業根配置
return LiteJobConfiguration.newBuilder(simpleJobConfig).
//jobShardingStrategyClass("com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy").
jobShardingStrategyClass("com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy").
overwrite(true).build();
}
3、修改效果
1、不指定預設分片政策如下:
部署了4個執行個體,但是以任務都分片到一個節點下去了。
2、指定OdevitySortByNameJobShardingStrategy
因為通過elastic-job的控台不是很好觀察 任務所在節點。可以直接通過浏覽器檢視 elastic-job-console 檢視任務清單和檢視任務詳情時所調用的接口,如下
擷取Job詳情
http://elastic-job-console控台ip:8899/api/jobs/任務名字/sharding?order=asc&offset=0&limit=10&_=2342342423
// 擷取jobs
http://elastic-job-console控台ip/api/jobs?order=asc&offset=0&limit=100&_=2342342342342
//帶條件搜尋
http://elastic-job-console控台ip:8899/api/jobs?search=搜尋關鍵字&order=asc&offset=0&limit=10&_=2342342342342
可以用java程式調用上面接統計即可。
@Test
public void hhtp_test(){
List<JobEEEEEE> jobEEEEEES = new ArrayList<>();
CloseableHttpClient HTTP_CLIENT = HttpClients.createDefault();
//TaskJobEnum 這個枚舉類定義了所有任務,是以直接周遊這個枚舉可得到 這個應用注冊到elastic-job上的任務名
for (TaskJobEnum value : TaskJobEnum.values()) {
JobEEEEEE each = new JobEEEEEE();
jobEEEEEES.add(each);
each.setJobName(value.getJobName());
String jobName = value.getJobName();
//elastic-job控台 檢視Job詳情的位址
String api = "elastic-job-console控台ip:8899/api/jobs/"+jobName+"/sharding?order=asc&offset=0&limit=10&_=34234234234";
HttpGet get = new HttpGet(api);
try {
HttpResponse response = HTTP_CLIENT.execute(get);
String res = EntityUtils.toString(response.getEntity());
List<HashMap> list = JSON.parseArray(res, HashMap.class);
//隻設定了一個分片是以,這個就直接取的list.get(0)。如果任務設定了多個分片,那麼得到 這個任務所執行的所有Ip
System.out.print(" "+ list.get(0).get("serverIp").toString());
System.out.println(jobName+" "+res);
//擷取運作的ip
each.setServiceIP(list.get(0).get("serverIp").toString());
} catch (IOException e) {
e.printStackTrace();
}
}
Map<String,List<String>> map = new HashMap<>();
//将ip相同的 任務,加到同一個 清單中,最後 列印ip 上所有運作的作務名。
for (JobEEEEEE jobEEEEEE : jobEEEEEES) {
if(map.get(jobEEEEEE.getServiceIP())!=null){
map.get(jobEEEEEE.getServiceIP()).add(jobEEEEEE.getJobName());
}else{
List<String> objects = new ArrayList<>();
objects.add(jobEEEEEE.getJobName());
map.put(jobEEEEEE.getServiceIP(),objects);
}
}
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
System.out.print(entry.getKey()+" "+entry.getValue().size()+" ");
System.out.println(entry.getValue());
}
}
得到統計資料:已分片的任務被配置設定到4個節點中的2個上去了。
節點Ip:172.23.7.170 運作任務數:4 任務名清單:[。。。。。。。]
節點Ip:172.23.133.167 運作任務數:10 任務名清單:[。。。。。。。]
3、指定RotateServerByNameJobShardingStrategy
4、如果不想調用http進行統計,還可以直接從zk擷取任務節點進行統計
可以直接登入到zk 去檢視elastic-job任務注冊節點的路徑結構。
用ZooInspector工具,登入zk。
可以直接連zk進行操作,也可以直接用
ZookeeperRegistryCenter
操作,它封裝了擷取路徑下資料的方法
@Autowired
ZookeeperRegistryCenter regCenter;
@Test
public void get_test1111(){
List<String> jobNames = regCenter.getChildrenKeys("/");
List<JobEEEEEE> jobEEEEEES = new ArrayList<>();
for (String jobName : jobNames) {
//過濾不想要的job
List<TaskJobEnum> joblocal = Arrays.asList(TaskJobEnum.values());
Optional<TaskJobEnum> first = joblocal.stream().filter(e -> e.getJobName().equals(jobName)).findFirst();
if(!first.isPresent()){
continue;
}
JobEEEEEE eacheeee = new JobEEEEEE();
jobEEEEEES.add(eacheeee);
eacheeee.setJobName(jobName);
//拼接路徑 /jobName/sharding
String shardingRootPath = String.format("/%s/%s", jobName, "sharding");
//得到 /dd-job/jobName/sharing路徑下的資料,也就是這個任所有分片項 0,1,2,3
List<String> items = regCenter.getChildrenKeys(shardingRootPath);
for (String each : items) {
String p = String.format("%s/%s/%s", shardingRootPath, each, "instance");
//得到 /dd-job/任務名/sharing/分片項/instance 下的執行個體id
String instanceId = regCenter.get(p);
instanceId = instanceId.substring(0,instanceId.indexOf("@"));
eacheeee.setServiceIP(instanceId);
}
}
//統計
Map<String,List<String>> map = new HashMap<>();
for (JobEEEEEE jobEEEEEE : jobEEEEEES) {
if(map.get(jobEEEEEE.getServiceIP())!=null){
map.get(jobEEEEEE.getServiceIP()).add(jobEEEEEE.getJobName());
}else{
List<String> objects = new ArrayList<>();
objects.add(jobEEEEEE.getJobName());
map.put(jobEEEEEE.getServiceIP(),objects);
}
}
//隻要有效ip上的統計,因為有些作務 未執行,狀态是 sharding狀态 顯示還是舊的運作ip
List<String> include = Arrays.asList("172.23.14.1","172.23.7.1","172.23.62.1","172.23.146.1");
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
if(!include.contains(entry.getKey())){
continue;
}
System.out.print(entry.getKey()+" "+entry.getValue().size()+" ");
System.out.println(entry.getValue());
}
}
統計結果:
運作節點:172.23.146.1 任務數:8 任務名稱例表:[]
運作節點:172.23.14.1 任務數:3 任務名稱例表:[]
運作節點:172.23.7.1 任務數:4 任務名稱例表:[]
運作節點:172.23.62.1 任務數:5 任務名稱例表:[]