天天看點

elastic-job源碼分析之分片算法應用一、前言二、修改Job預設的分片算法

文章目錄

  • 一、前言
  • 二、修改Job預設的分片算法
    • 1、通過xml配置job時修改分片算法
    • 2、如果通過bean注入Job的,那麼添加如下
    • 3、修改效果
      • 1、不指定預設分片政策如下:
      • 2、指定OdevitySortByNameJobShardingStrategy
      • 3、指定RotateServerByNameJobShardingStrategy
      • 4、如果不想調用http進行統計,還可以直接從zk擷取任務節點進行統計

一、前言

elastic-job版本2.1.5

如果大部分應用分片數都是1,如果用預設的分片算法,那麼所有任務都會配置設定到 Ip 最小的那個節點上,這樣會導緻嚴重的負載不均衡。好在elastic-job提供了3種分片算法

1、AverageAllocationJobShardingStrategy平均配置設定分片算法
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
政策說明:
基于平均配置設定算法的分片政策,也是預設的分片政策。
如果分片不能整除,則不能整除的多餘分片将依次追加到序号小的伺服器。如:

如果有3台伺服器,分成9片,則每台伺服器分到的分片是:
伺服器1=[0,1,2], 伺服器2=[3,4,5], 伺服器3=[6,7,8]
如果有3台伺服器,分成8片,則每台伺服器分到的分片是:
伺服器1=[0,1,6], 伺服器2=[2,3,7], 伺服器3=[4,5]
如果有3台伺服器,分成10片,則每台伺服器分到的分片是:
伺服器1=[0,1,2,9], 伺服器2=[3,4,5], 伺服器3=[6,7,8]

2、OdevitySortByNameJobShardingStrategy 作業名的哈希值奇偶數決定IP升降序算法
3、RotateServerByNameJobShardingStrategy 作業名的哈希值對伺服器清單進行輪轉的分片政策
           

這裡先不展開這3種分片算法的源碼了,源碼解析可檢視另一篇:

elastic-job源碼分析

二、修改Job預設的分片算法

通過上面三種分片的分析,可以看出,如果用

OdevitySortByNameJobShardingStrategy

那麼任務會根據名字的hash 配置設定置 最大ip節點和最小ip節點上,那麼靠中間位置的節點 也得不到利用。而

RotateServerByNameJobShardingStrategy

作業名的哈希值對伺服器清單進行輪轉的分片政策。

那麼下面修改job的分片政策:

如果注冊job時不指定分片算法,就是預設的分片算法

AverageAllocationJobShardingStrategy

這樣所有任務在一個節點上運作。

1、通過xml配置job時修改分片算法

2、如果通過bean注入Job的,那麼添加如下

在定義LiteJobConfiguration時指定分片參數

LiteJobConfiguration getLiteJobConfiguration(TaskJobEnum taskJobEnum, Class<? extends SimpleJob> jobClass) {
        // 定義作業核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(taskJobEnum.getJobName(),
                taskJobEnum.getJobCron(), taskJobEnum.getShardingTotalCount()).
                shardingItemParameters(taskJobEnum.getShardingItemParameters()).build();
        // 定義SIMPLE類型配置
        SimpleJobConfiguration simpleJobConfig =
                new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
        // 定義Lite作業根配置
        return LiteJobConfiguration.newBuilder(simpleJobConfig).
                //jobShardingStrategyClass("com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy").
                jobShardingStrategyClass("com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy").
                overwrite(true).build();
    }
           

3、修改效果

1、不指定預設分片政策如下:

部署了4個執行個體,但是以任務都分片到一個節點下去了。

2、指定OdevitySortByNameJobShardingStrategy

因為通過elastic-job的控台不是很好觀察 任務所在節點。可以直接通過浏覽器檢視 elastic-job-console 檢視任務清單和檢視任務詳情時所調用的接口,如下

擷取Job詳情
http://elastic-job-console控台ip:8899/api/jobs/任務名字/sharding?order=asc&offset=0&limit=10&_=2342342423
// 擷取jobs
 http://elastic-job-console控台ip/api/jobs?order=asc&offset=0&limit=100&_=2342342342342
 //帶條件搜尋
http://elastic-job-console控台ip:8899/api/jobs?search=搜尋關鍵字&order=asc&offset=0&limit=10&_=2342342342342
           

可以用java程式調用上面接統計即可。

@Test
 public void hhtp_test(){

     List<JobEEEEEE> jobEEEEEES = new ArrayList<>();

     CloseableHttpClient HTTP_CLIENT = HttpClients.createDefault();
     //TaskJobEnum 這個枚舉類定義了所有任務,是以直接周遊這個枚舉可得到 這個應用注冊到elastic-job上的任務名
     for (TaskJobEnum value : TaskJobEnum.values()) {
         JobEEEEEE each = new JobEEEEEE();
         jobEEEEEES.add(each);
         each.setJobName(value.getJobName());
         String jobName = value.getJobName();
         //elastic-job控台 檢視Job詳情的位址
         String api = "elastic-job-console控台ip:8899/api/jobs/"+jobName+"/sharding?order=asc&offset=0&limit=10&_=34234234234";
         HttpGet get = new HttpGet(api);
         try {
             HttpResponse response = HTTP_CLIENT.execute(get);
             String res = EntityUtils.toString(response.getEntity());

             List<HashMap> list = JSON.parseArray(res, HashMap.class);
             //隻設定了一個分片是以,這個就直接取的list.get(0)。如果任務設定了多個分片,那麼得到 這個任務所執行的所有Ip
             System.out.print("  "+  list.get(0).get("serverIp").toString());
             System.out.println(jobName+"  "+res);
             //擷取運作的ip
             each.setServiceIP(list.get(0).get("serverIp").toString());
         } catch (IOException e) {
             e.printStackTrace();
         }
     }

     Map<String,List<String>>  map = new HashMap<>();
     //将ip相同的 任務,加到同一個 清單中,最後 列印ip 上所有運作的作務名。
     for (JobEEEEEE jobEEEEEE : jobEEEEEES) {
         if(map.get(jobEEEEEE.getServiceIP())!=null){
             map.get(jobEEEEEE.getServiceIP()).add(jobEEEEEE.getJobName());
         }else{
             List<String> objects = new ArrayList<>();
             objects.add(jobEEEEEE.getJobName());
             map.put(jobEEEEEE.getServiceIP(),objects);
         }
     }
     for (Map.Entry<String, List<String>> entry : map.entrySet()) {
         System.out.print(entry.getKey()+"  "+entry.getValue().size()+"  ");
         System.out.println(entry.getValue());
     }
 }
           

得到統計資料:已分片的任務被配置設定到4個節點中的2個上去了。

節點Ip:172.23.7.170   運作任務數:4  任務名清單:[。。。。。。。]
節點Ip:172.23.133.167  運作任務數:10 任務名清單:[。。。。。。。]
           

3、指定RotateServerByNameJobShardingStrategy

4、如果不想調用http進行統計,還可以直接從zk擷取任務節點進行統計

可以直接登入到zk 去檢視elastic-job任務注冊節點的路徑結構。

用ZooInspector工具,登入zk。

elastic-job源碼分析之分片算法應用一、前言二、修改Job預設的分片算法
elastic-job源碼分析之分片算法應用一、前言二、修改Job預設的分片算法

可以直接連zk進行操作,也可以直接用

ZookeeperRegistryCenter

操作,它封裝了擷取路徑下資料的方法

@Autowired
ZookeeperRegistryCenter regCenter;
@Test
public void get_test1111(){
    List<String> jobNames = regCenter.getChildrenKeys("/");
    List<JobEEEEEE> jobEEEEEES = new ArrayList<>();
    for (String jobName : jobNames) {
        //過濾不想要的job
        List<TaskJobEnum> joblocal = Arrays.asList(TaskJobEnum.values());
        Optional<TaskJobEnum> first = joblocal.stream().filter(e -> e.getJobName().equals(jobName)).findFirst();
        if(!first.isPresent()){
            continue;
        }
        JobEEEEEE eacheeee = new JobEEEEEE();
        jobEEEEEES.add(eacheeee);
        eacheeee.setJobName(jobName);
        //拼接路徑 /jobName/sharding
        String shardingRootPath = String.format("/%s/%s", jobName, "sharding");
        //得到 /dd-job/jobName/sharing路徑下的資料,也就是這個任所有分片項 0,1,2,3
        List<String> items = regCenter.getChildrenKeys(shardingRootPath);
        for (String each : items) {
            String p = String.format("%s/%s/%s", shardingRootPath, each, "instance");
            //得到 /dd-job/任務名/sharing/分片項/instance 下的執行個體id
            String instanceId = regCenter.get(p);
            instanceId = instanceId.substring(0,instanceId.indexOf("@"));
            eacheeee.setServiceIP(instanceId);
        }

    }
    //統計
    Map<String,List<String>>  map = new HashMap<>();
    for (JobEEEEEE jobEEEEEE : jobEEEEEES) {
        if(map.get(jobEEEEEE.getServiceIP())!=null){
            map.get(jobEEEEEE.getServiceIP()).add(jobEEEEEE.getJobName());
        }else{
            List<String> objects = new ArrayList<>();
            objects.add(jobEEEEEE.getJobName());
            map.put(jobEEEEEE.getServiceIP(),objects);
        }
    }
    //隻要有效ip上的統計,因為有些作務 未執行,狀态是 sharding狀态 顯示還是舊的運作ip
    List<String> include = Arrays.asList("172.23.14.1","172.23.7.1","172.23.62.1","172.23.146.1");
    for (Map.Entry<String, List<String>> entry : map.entrySet()) {
        if(!include.contains(entry.getKey())){
            continue;
        }
        System.out.print(entry.getKey()+"  "+entry.getValue().size()+"  ");
        System.out.println(entry.getValue());
    }

}
           

統計結果:

運作節點:172.23.146.1  任務數:8  任務名稱例表:[]
運作節點:172.23.14.1  任務數:3  任務名稱例表:[]
運作節點:172.23.7.1  任務數:4  任務名稱例表:[]
運作節點:172.23.62.1  任務數:5  任務名稱例表:[]

           

繼續閱讀