前段時間寫過一遍文章<一文揭秘定時任務排程架構quartz>,有讀者建議我再講講elastic-job這個任務排程架構,年末沒有那麼忙,就來學習一下elastic-job。
首先一點,elastic-job基于quartz,了解quartz的運作機制有助于對elastic-job的快速了解。
首先看一下elastic-job-lite的架構
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISM9AnYldnJwAzN9c3Pn5GcuQ0MlMWbidXNp5keJpmT5FERPJzZU1EdJpXT3dGVNNTTE5EeFRUT5FEVPhXQq1EdRpnT3lFRPBDOp10drRVT3lkeMBzYE1kMnRkT2NmMiNnSywEd5ITW110MaZHetlVdO1GT0UERNl3YXJGc5kHT20ESjBjUIF2Lc12bj5SYphXa5VWen5WY35iclN3Ztl2Lc9CX6MHc0RHaiojIsJye.png)
我們知道quartz有三個重要的概念:Job,Trigger,Scheduler。那麼elastic-job裡面三個概念是什麼展現的呢?
1.Job
LiteJob繼承自quartz的job接口
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
* Lite排程作業.
*
* @author zhangliang
*/
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
其中,
1.1 ElasticJob實作了不同的Job類型
1.2.JobFacade是作業内部服務門面服務
注意:elasticJob的特性在裡面可以看到如:
任務分片:
将整體任務拆解為多個子任務
可通過伺服器的增減彈性伸縮任務處理能力
分布式協調,任務伺服器上下線的全自動發現與處理
容錯性:
支援定時自我故障檢測與自動修複
分布式任務分片唯一性保證
支援失效轉移和錯過任務重觸發
任務跟蹤
任務排程
public interface JobFacade {
/**
* 讀取作業配置.
*
* @param fromCache 是否從緩存中讀取
* @return 作業配置
*/
JobRootConfiguration loadJobRootConfiguration(boolean fromCache);
/**
* 檢查作業執行環境.
*
* @throws JobExecutionEnvironmentException 作業執行環境異常
*/
void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException;
/**
* 如果需要失效轉移, 則執行作業失效轉移.
*/
void failoverIfNecessary();
/**
* 注冊作業啟動資訊.
*
* @param shardingContexts 分片上下文
*/
void registerJobBegin(ShardingContexts shardingContexts);
/**
* 注冊作業完成資訊.
*
* @param shardingContexts 分片上下文
*/
void registerJobCompleted(ShardingContexts shardingContexts);
/**
* 擷取目前作業伺服器的分片上下文.
*
* @return 分片上下文
*/
ShardingContexts getShardingContexts();
/**
* 設定任務被錯過執行的标記.
*
* @param shardingItems 需要設定錯過執行的任務分片項
* @return 是否滿足misfire條件
*/
boolean misfireIfRunning(Collection<Integer> shardingItems);
/**
* 清除任務被錯過執行的标記.
*
* @param shardingItems 需要清除錯過執行的任務分片項
*/
void clearMisfire(Collection<Integer> shardingItems);
/**
* 判斷作業是否需要執行錯過的任務.
*
* @param shardingItems 任務分片項集合
* @return 作業是否需要執行錯過的任務
*/
boolean isExecuteMisfired(Collection<Integer> shardingItems);
/**
* 判斷作業是否符合繼續運作的條件.
*
* <p>如果作業停止或需要重分片或非流式處理則作業将不會繼續運作.</p>
*
* @return 作業是否符合繼續運作的條件
*/
boolean isEligibleForJobRunning();
/**判斷是否需要重分片.
*
* @return 是否需要重分片
*/
boolean isNeedSharding();
/**
* 作業執行前的執行的方法.
*
* @param shardingContexts 分片上下文
*/
void beforeJobExecuted(ShardingContexts shardingContexts);
/**
* 作業執行後的執行的方法.
*
* @param shardingContexts 分片上下文
*/
void afterJobExecuted(ShardingContexts shardingContexts);
/**
* 釋出執行事件.
*
* @param jobExecutionEvent 作業執行事件
*/
void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);
/**
* 釋出作業狀态追蹤事件.
*
* @param taskId 作業Id
* @param state 作業執行狀态
* @param message 作業執行消息
*/
void postJobStatusTraceEvent(String taskId, State state, String message);
}
2.JobDetail
通用的Job屬性,定義在job.xsd
<xsd:complexType name="base">
<xsd:complexContent>
<xsd:extension base="beans:identifiedType">
<xsd:all>
<xsd:element ref="listener" minOccurs="0" maxOccurs="1" />
<xsd:element ref="distributed-listener" minOccurs="0" maxOccurs="1" />
</xsd:all>
<xsd:attribute name="class" type="xsd:string" />
<xsd:attribute name="job-ref" type="xsd:string" />
<xsd:attribute name="registry-center-ref" type="xsd:string" use="required" />
<xsd:attribute name="cron" type="xsd:string" use="required" />
<xsd:attribute name="sharding-total-count" type="xsd:string" use="required" />
<xsd:attribute name="sharding-item-parameters" type="xsd:string" />
<xsd:attribute name="job-parameter" type="xsd:string" />
<xsd:attribute name="monitor-execution" type="xsd:string" default="true"/>
<xsd:attribute name="monitor-port" type="xsd:string" default="-1"/>
<xsd:attribute name="max-time-diff-seconds" type="xsd:string" default="-1"/>
<xsd:attribute name="failover" type="xsd:string" default="false"/>
<xsd:attribute name="reconcile-interval-minutes" type="xsd:int" default="10"/>
<xsd:attribute name="misfire" type="xsd:string" default="true"/>
<xsd:attribute name="job-sharding-strategy-class" type="xsd:string" />
<xsd:attribute name="description" type="xsd:string" />
<xsd:attribute name="disabled" type="xsd:string" default="false"/>
<xsd:attribute name="overwrite" type="xsd:string" default="false"/>
<xsd:attribute name="executor-service-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultExecutorServiceHandler"/>
<xsd:attribute name="job-exception-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultJobExceptionHandler"/>
<xsd:attribute name="event-trace-rdb-data-source" type="xsd:string" />
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
其中Simple類型的任務完全繼承通用屬性,dataflow類型的任務增加了streaming-process屬性,script增加了script-command-line屬性
使用的解析器定義在spring.handlers
http\://www.dangdang.com/schema/ddframe/reg=io.elasticjob.lite.spring.reg.handler.RegNamespaceHandler
http\://www.dangdang.com/schema/ddframe/job=io.elasticjob.lite.spring.job.handler.JobNamespaceHandler
JobNamespaceHandler
/**
* 分布式作業的命名空間處理器.
*
* @author caohao
*/
public final class JobNamespaceHandler extends NamespaceHandlerSupport {
@Override
public void init() {
registerBeanDefinitionParser("simple", new SimpleJobBeanDefinitionParser());
registerBeanDefinitionParser("dataflow", new DataflowJobBeanDefinitionParser());
registerBeanDefinitionParser("script", new ScriptJobBeanDefinitionParser());
}
}
在彈性化分布式作業執行器AbstractElasticJobExecutor.java初始化時擷取配置屬性,并使用對應的Handler進行處理。
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
this.jobFacade = jobFacade;
jobRootConfig = jobFacade.loadJobRootConfiguration(true);
jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
}
3 執行作業
彈性化分布式作業執行器AbstractElasticJobExecutor.java
/**
* 執行作業.
*/
public final void execute() {
try {
jobFacade.checkJobExecutionEnvironment(); //1
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
ShardingContexts shardingContexts = jobFacade.getShardingContexts(); //2
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); //3
}
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
try {
jobFacade.beforeJobExecuted(shardingContexts); //4
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); //5
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
jobFacade.failoverIfNecessary(); //6
try {
jobFacade.afterJobExecuted(shardingContexts); //7
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
3.1 環境監測
檢查本機與注冊中心的時間誤差秒數是否在允許範圍
/**
* 檢查本機與注冊中心的時間誤差秒數是否在允許範圍.
*
* @throws JobExecutionEnvironmentException 本機與注冊中心的時間誤差秒數不在允許範圍所抛出的異常
*/
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
if (-1 == maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new JobExecutionEnvironmentException(
"Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
}
}
3.2 根據分片規則進行分片
如果需要分片且目前節點為主節點, 則作業分片.
如果目前無可用節點則不分片.
/**
* 如果需要分片且目前節點為主節點, 則作業分片.
*
* <p>
* 如果目前無可用節點則不分片.
* </p>
*/
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherShardingItemCompleted();
LiteJobConfiguration liteJobConfig = configService.load(false);
int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
resetShardingInfo(shardingTotalCount);
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
3.3 使用EventBus通知
com.google.common.eventbus.EventBus
/**
* Posts an event to all registered subscribers. This method will return
* successfully after the event has been posted to all subscribers, and
* regardless of any exceptions thrown by subscribers.
*
* <p>If no subscribers have been subscribed for {@code event}'s class, and
* {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
* DeadEvent and reposted.
*
* @param event event to post.
*/
public void post(Object event) {
Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());
boolean dispatched = false;
for (Class<?> eventType : dispatchTypes) {
subscribersByTypeLock.readLock().lock();
try {
Set<EventSubscriber> wrappers = subscribersByType.get(eventType);
if (!wrappers.isEmpty()) {
dispatched = true;
for (EventSubscriber wrapper : wrappers) {
enqueueEvent(event, wrapper);
}
}
} finally {
subscribersByTypeLock.readLock().unlock();
}
}
if (!dispatched && !(event instanceof DeadEvent)) {
post(new DeadEvent(this, event));
}
dispatchQueuedEvents();
}
3.4 job預執行,監聽ElasticJobListener
@Override
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.beforeJobExecuted(shardingContexts);
}
}
3.5 job執行
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
jobFacade.registerJobBegin(shardingContexts);//1
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
process(shardingContexts, executionSource);//2
} finally {
// TODO 考慮增加作業失敗的狀态,并且考慮如何處理作業失敗的整體回路
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
>>1.将job注冊到注冊中心
>>2.将各個任務分片放到線程池中執行
3.6 實作轉移
如果需要失效轉移, 則執行作業失效轉移.
/**
* 在主節點執行操作.
*
* @param latchNode 分布式鎖使用的作業節點名稱
* @param callback 執行操作的回調
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
3.7 作業執行後處理
作業執行後的執行的方法
@Override
public void afterJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.afterJobExecuted(shardingContexts);
}
}
4.Trigger
elasticJob預設使用Cron Trigger,在job屬性裡定義
<xsd:attribute name="cron" type="xsd:string" use="required" />
5.作業排程器JobScheduler
/**
* 初始化作業.
*/
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); //1
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); //2
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); //3
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); //4
}
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
protected Optional<ElasticJob> createElasticJobInstance() {
return Optional.absent();
}
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
5.1 更新作業配置.
/**
* 更新作業配置.
*
* @param liteJobConfig 作業配置
* @return 更新後的作業配置
*/
public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
configService.persist(liteJobConfig);
return configService.load(false);
}
5.2 初始化一系列操作
5.2.1 建立quartz scheduler
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
5.2.2 建立JobDetail
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
5.2.3 添加作業排程控制器.
/**
* 添加作業排程控制器.
*
* @param jobName 作業名稱
* @param jobScheduleController 作業排程控制器
* @param regCenter 注冊中心
*/
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
schedulerMap.put(jobName, jobScheduleController);
regCenterMap.put(jobName, regCenter);
regCenter.addCacheData("/" + jobName);
}
5.2.4 排程作業.
/**
* 排程作業.
*
* @param cron CRON表達式
*/
public void scheduleJob(final String cron) {
try {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, createTrigger(cron));
}
scheduler.start();
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
6.總結
>>elastic-job使用了quartz的排程機制,内部原理一緻,增加了性能和可用性。
>>elastic-job使用注冊中心(zookeeper)替換了quartz的jdbc資料存儲方式,性能有較大提升。
>> elastic-job增加了job的追蹤(使用Listener),便于monitor
>>elastic-job使用了分片機制,可以将job分成多個任務項,放到不同的地方執行
>>elastic-job僅支援cronTrigger,quartz支援更多的trigger實作