天天看點

Kylin源碼分析系列一—任務排程Kylin源碼分析系列一—任務排程

Kylin源碼分析系列一—任務排程

注:Kylin源碼分析系列基于Kylin的2.5.0版本的源碼,其他版本可以類比。

一. 相關介紹

       Kylin在Web上觸發Cube的相關操作後并不是馬上執行相關的操作,而是将建構的任務送出到任務排程服務,任務排程服務每隔一段時間會将送出了未執行的job進行排程執行,預設是30s排程一次,可根據配置項kylin.job.scheduler.poll-interval-second來配置排程時間間隔。

       任務排程服務的服務類為JobService,包路徑:org.apache.kylin.rest.service.JobService。JobService是通過實作InitializingBean接口,繼而實作afterPropertiesSet的方法 ,然後通過配置spring加載bean的方式被初始化的;具體是通過配置檔案來裝配bean的,涉及到的配置檔案有:在./tomcat/webapps/kylin/WEB-INF/web.xml中引入了./tomcat/webapps/kylin/WEB-INF/classes/applicationContext.xml,然後在applicationContext.xml中配置有:

<context:component-scan base-package="org.apache.kylin.rest"/>

       然後spring去掃描目錄org.apache.kylin.rest下的标有@Component的類,并注冊成bean。由于JobService是通過實作InitializingBean接口,繼而實作afterPropertiesSet的方法來初始化bean的,是以在JobService這個bean被初始化的時候,afterPropertiesSet會被調用執行,繼而實作JobService的初始化,kylin中的其他服務也是這要被初始化的。

二. 源碼分析

下面看下源碼:

任務排程服務初始化:

public void afterPropertiesSet() throws Exception {
    String timeZone = getConfig().getTimeZone();
    TimeZone tzone = TimeZone.getTimeZone(timeZone);
    TimeZone.setDefault(tzone);
    final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); 

    //擷取配置的任務排程器,預設為org.apache.kylin.job.impl.threadpool.DefaultScheduler

    final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory
            .scheduler(kylinConfig.getSchedulerType());
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                //排程服務初始化
                scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
                if (!scheduler.hasStarted()) {
                    logger.info("scheduler has not been started");
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }).start();

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                scheduler.shutdown();
            } catch (SchedulerException e) {
                logger.error("error occurred to shutdown scheduler", e);
            }
        }
    }));
}
           

Kylin的任務排程器有三種:

public Map<Integer, String> getSchedulers() {

    Map<Integer, String> r = Maps.newLinkedHashMap();

    r.put(0, "org.apache.kylin.job.impl.threadpool.DefaultScheduler");

    r.put(2, "org.apache.kylin.job.impl.threadpool.DistributedScheduler");

    r.put(77, "org.apache.kylin.job.impl.threadpool.NoopScheduler");

    r.putAll(convertKeyToInteger(getPropertiesByPrefix("kylin.job.scheduler.provider.")));

    return r;

}
           

通過配置項kylin.job.scheduler.default來配置,預設配置為0,即為DefaultScheduler,下面回到任務排程服務的初始化,調用DefaultScheduler的init方法:

public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException {

    jobLock = lock;
    String serverMode = jobEngineConfig.getConfig().getServerMode();
    //隻有服務模式為job和all的需要運作任務排程服務,query不需要
    if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
        logger.info("server mode: " + serverMode + ", no need to run job scheduler");
        return;

    }
    logger.info("Initializing Job Engine ....");

    if (!initialized) {
        initialized = true;
    } else {
        return;
    }

    this.jobEngineConfig = jobEngineConfig;

    if (jobLock.lockJobEngine() == false) {
        throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
    }
    executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());

    //load all executable, set them to a consistent status
    fetcherPool = Executors.newScheduledThreadPool(1);
    int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
    jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS,
            new SynchronousQueue<Runnable>());
    context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
    logger.info("Staring resume all running jobs.");
    executableManager.resumeAllRunningJobs();
    logger.info("Finishing resume all running jobs.");

    //擷取排程時間間隔,
    int pollSecond = jobEngineConfig.getPollIntervalSecond();
    logger.info("Fetching jobs every {} seconds", pollSecond);
    JobExecutor jobExecutor = new JobExecutor() {

        @Override
        public void execute(AbstractExecutable executable) {

            jobPool.execute(new JobRunner(executable));
        }

    };
    //判斷任務排程是否考慮優先級,預設不考慮,即使用DefaultFetcherRunner
    fetcher = jobEngineConfig.getJobPriorityConsidered()
            ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
            : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
    logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));

    //每隔pollSecond去擷取一次任務
    fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
    hasStarted = true;

}
           

下面間隔性的執行DefaultFetcherRunner的run方法:

synchronized public void run() {

    try (SetThreadName ignored = new SetThreadName(//
            "FetcherRunner %s", System.identityHashCode(this))) {//
        // logger.debug("Job Fetcher is running...");
        Map<String, Executable> runningJobs = context.getRunningJobs();
        // 任務排程池是否滿了,預設隻能同時執行10個job
        if (isJobPoolFull()) {
            return;
        }
        ......
        //擷取索引的job
        for (final String id : executableManager.getAllJobIds()) {
            ......
            //根據任務id擷取具體的任務
            final AbstractExecutable executable = executableManager.getJob(id);
            ......
            //添加任務到任務排程池
            addToJobPool(executable, executable.getDefaultPriority());
        }
      ......
    }
}
           

主要看下是從哪擷取到的所有的job,上面是調用executableManager.getAllJobIds()來擷取所有的任務id的,下面看下這個函數:

public List<String> getJobIds() throws PersistentException {

    try {
        NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
        if (resources == null) {
            return Collections.emptyList();
        }

        ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
        for (String path : resources) {
            result.add(path.substring(path.lastIndexOf("/") + 1));
        }
        return result;
    } catch (IOException e) {
        logger.error("error get all Jobs:", e);
        throw new PersistentException(e);
    }
}
           

store.listResources 到存儲kylin中繼資料的資料庫擷取以“/execute”開始的中繼資料條目,然後截取出任務的id,接着調用executableManager.getJob(id)來擷取具體的任務資訊,依然是到存儲kylin中繼資料的資料庫中擷取,資料庫中的任務的中繼資料條目如下所示(使用的hbase存儲的中繼資料):

Kylin源碼分析系列一—任務排程Kylin源碼分析系列一—任務排程

最後調用addToJobPool将任務添加到任務排程池:

protected void addToJobPool(AbstractExecutable executable, int priority) {

    String jobDesc = executable.toString();
    logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
    try {
        context.addRunningJob(executable);
        //送出任務到排程池中執行
        jobExecutor.execute(executable);
        logger.info(jobDesc + " scheduled");
    } catch (Exception ex) {
        context.removeRunningJob(executable);
        logger.warn(jobDesc + " fail to schedule", ex);
    }
}
           

        回到DefaultScheduler中的init函數中的jobExecutor,最終調用JobRunner的run方法來執行任務,主要是調用executable.execute(context),kylin中的具體任務都是繼承類AbstractExecutable,如果重寫了execute方法,就調用具體任務的execute方法來執行相應的任務,如果未重寫execute方法,則調用AbstractExecutable中的execute方法,然後調用doWork來執行任務,spark的相關任務的任務類型是SparkExecutable,該類繼承自AbstractExecutable,自己實作了doWork方法來送出spark任務,spark任務送出運作的主類為SparkEntry,調用main方法,然後調用AbstractApplication的execute方法,最後調用具體任務類的execute方法運作。上面就是kylin中任務排程的相關代碼,下面看下任務是怎麼送出到任務排程服務的。

        任務送出最終要調用到JobService中submitJobInternal方法,這個方法中最終調用getExecutableManager().addJob(job)來送出任務(這裡的job是一個DefaultChainedExecutable的執行個體,裡面包含各種Executable類型的task),這裡的getExecutableManager擷取了ExecutableManager的單例,然後調用addJob來送出任務,然後調用executableDao.addJob(parse(executable)),接着調用writeJobResource(pathOfJob(job), job)将job資訊序列化後存入中繼資料資料庫表中。

繼續閱讀