Kylin源碼分析系列一—任務排程
注:Kylin源碼分析系列基于Kylin的2.5.0版本的源碼,其他版本可以類比。
一. 相關介紹
Kylin在Web上觸發Cube的相關操作後并不是馬上執行相關的操作,而是将建構的任務送出到任務排程服務,任務排程服務每隔一段時間會将送出了未執行的job進行排程執行,預設是30s排程一次,可根據配置項kylin.job.scheduler.poll-interval-second來配置排程時間間隔。
任務排程服務的服務類為JobService,包路徑:org.apache.kylin.rest.service.JobService。JobService是通過實作InitializingBean接口,繼而實作afterPropertiesSet的方法 ,然後通過配置spring加載bean的方式被初始化的;具體是通過配置檔案來裝配bean的,涉及到的配置檔案有:在./tomcat/webapps/kylin/WEB-INF/web.xml中引入了./tomcat/webapps/kylin/WEB-INF/classes/applicationContext.xml,然後在applicationContext.xml中配置有:
<context:component-scan base-package="org.apache.kylin.rest"/>
然後spring去掃描目錄org.apache.kylin.rest下的标有@Component的類,并注冊成bean。由于JobService是通過實作InitializingBean接口,繼而實作afterPropertiesSet的方法來初始化bean的,是以在JobService這個bean被初始化的時候,afterPropertiesSet會被調用執行,繼而實作JobService的初始化,kylin中的其他服務也是這要被初始化的。
二. 源碼分析
下面看下源碼:
任務排程服務初始化:
public void afterPropertiesSet() throws Exception {
String timeZone = getConfig().getTimeZone();
TimeZone tzone = TimeZone.getTimeZone(timeZone);
TimeZone.setDefault(tzone);
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
//擷取配置的任務排程器,預設為org.apache.kylin.job.impl.threadpool.DefaultScheduler
final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory
.scheduler(kylinConfig.getSchedulerType());
new Thread(new Runnable() {
@Override
public void run() {
try {
//排程服務初始化
scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
logger.info("scheduler has not been started");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
try {
scheduler.shutdown();
} catch (SchedulerException e) {
logger.error("error occurred to shutdown scheduler", e);
}
}
}));
}
Kylin的任務排程器有三種:
public Map<Integer, String> getSchedulers() {
Map<Integer, String> r = Maps.newLinkedHashMap();
r.put(0, "org.apache.kylin.job.impl.threadpool.DefaultScheduler");
r.put(2, "org.apache.kylin.job.impl.threadpool.DistributedScheduler");
r.put(77, "org.apache.kylin.job.impl.threadpool.NoopScheduler");
r.putAll(convertKeyToInteger(getPropertiesByPrefix("kylin.job.scheduler.provider.")));
return r;
}
通過配置項kylin.job.scheduler.default來配置,預設配置為0,即為DefaultScheduler,下面回到任務排程服務的初始化,調用DefaultScheduler的init方法:
public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException {
jobLock = lock;
String serverMode = jobEngineConfig.getConfig().getServerMode();
//隻有服務模式為job和all的需要運作任務排程服務,query不需要
if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
logger.info("server mode: " + serverMode + ", no need to run job scheduler");
return;
}
logger.info("Initializing Job Engine ....");
if (!initialized) {
initialized = true;
} else {
return;
}
this.jobEngineConfig = jobEngineConfig;
if (jobLock.lockJobEngine() == false) {
throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
}
executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
//load all executable, set them to a consistent status
fetcherPool = Executors.newScheduledThreadPool(1);
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS,
new SynchronousQueue<Runnable>());
context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
logger.info("Staring resume all running jobs.");
executableManager.resumeAllRunningJobs();
logger.info("Finishing resume all running jobs.");
//擷取排程時間間隔,
int pollSecond = jobEngineConfig.getPollIntervalSecond();
logger.info("Fetching jobs every {} seconds", pollSecond);
JobExecutor jobExecutor = new JobExecutor() {
@Override
public void execute(AbstractExecutable executable) {
jobPool.execute(new JobRunner(executable));
}
};
//判斷任務排程是否考慮優先級,預設不考慮,即使用DefaultFetcherRunner
fetcher = jobEngineConfig.getJobPriorityConsidered()
? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
: new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));
//每隔pollSecond去擷取一次任務
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
hasStarted = true;
}
下面間隔性的執行DefaultFetcherRunner的run方法:
synchronized public void run() {
try (SetThreadName ignored = new SetThreadName(//
"FetcherRunner %s", System.identityHashCode(this))) {//
// logger.debug("Job Fetcher is running...");
Map<String, Executable> runningJobs = context.getRunningJobs();
// 任務排程池是否滿了,預設隻能同時執行10個job
if (isJobPoolFull()) {
return;
}
......
//擷取索引的job
for (final String id : executableManager.getAllJobIds()) {
......
//根據任務id擷取具體的任務
final AbstractExecutable executable = executableManager.getJob(id);
......
//添加任務到任務排程池
addToJobPool(executable, executable.getDefaultPriority());
}
......
}
}
主要看下是從哪擷取到的所有的job,上面是調用executableManager.getAllJobIds()來擷取所有的任務id的,下面看下這個函數:
public List<String> getJobIds() throws PersistentException {
try {
NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
if (resources == null) {
return Collections.emptyList();
}
ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
for (String path : resources) {
result.add(path.substring(path.lastIndexOf("/") + 1));
}
return result;
} catch (IOException e) {
logger.error("error get all Jobs:", e);
throw new PersistentException(e);
}
}
store.listResources 到存儲kylin中繼資料的資料庫擷取以“/execute”開始的中繼資料條目,然後截取出任務的id,接着調用executableManager.getJob(id)來擷取具體的任務資訊,依然是到存儲kylin中繼資料的資料庫中擷取,資料庫中的任務的中繼資料條目如下所示(使用的hbase存儲的中繼資料):
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL6J0MiZGbXRWek1mYsB3MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwUTM5EzNxYTM1ATMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
最後調用addToJobPool将任務添加到任務排程池:
protected void addToJobPool(AbstractExecutable executable, int priority) {
String jobDesc = executable.toString();
logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
try {
context.addRunningJob(executable);
//送出任務到排程池中執行
jobExecutor.execute(executable);
logger.info(jobDesc + " scheduled");
} catch (Exception ex) {
context.removeRunningJob(executable);
logger.warn(jobDesc + " fail to schedule", ex);
}
}
回到DefaultScheduler中的init函數中的jobExecutor,最終調用JobRunner的run方法來執行任務,主要是調用executable.execute(context),kylin中的具體任務都是繼承類AbstractExecutable,如果重寫了execute方法,就調用具體任務的execute方法來執行相應的任務,如果未重寫execute方法,則調用AbstractExecutable中的execute方法,然後調用doWork來執行任務,spark的相關任務的任務類型是SparkExecutable,該類繼承自AbstractExecutable,自己實作了doWork方法來送出spark任務,spark任務送出運作的主類為SparkEntry,調用main方法,然後調用AbstractApplication的execute方法,最後調用具體任務類的execute方法運作。上面就是kylin中任務排程的相關代碼,下面看下任務是怎麼送出到任務排程服務的。
任務送出最終要調用到JobService中submitJobInternal方法,這個方法中最終調用getExecutableManager().addJob(job)來送出任務(這裡的job是一個DefaultChainedExecutable的執行個體,裡面包含各種Executable類型的task),這裡的getExecutableManager擷取了ExecutableManager的單例,然後調用addJob來送出任務,然後調用executableDao.addJob(parse(executable)),接着調用writeJobResource(pathOfJob(job), job)将job資訊序列化後存入中繼資料資料庫表中。