天天看点

DataX篇—分布式任务调度框架xxl-job学习前言一、作业类型二、执行流程总结

参考资料:

​ https://juejin.cn/post/6938034809197297694

前言

DataX-Web页面负责管理调度DataX插件,而DataX-web实现DataX插件调度的底层框架是分布式任务调度框架XXL-Job,参考了大佬的笔记,仅供学习用。

一、作业类型

xxl-job

支持七种作业类型:

Bean

GLUE(Java)

GLUE(Shell)

GLUE(Python)

GLUE(PHP)

GLUE(Nodejs)

GLUE(PowerShell)

其中,

GLUE类型作业

都是在

admin管理端

编辑业务代码,而

Bean类型作业

是将用户业务代码逻辑集成到

xxl-job

进行调度,源码位于用户项目中,而非

xxl-job

admin模块

xxl-job

抽象

IJobHandler

组件,用于执行作业,其实现有三种(见下图):

DataX篇—分布式任务调度框架xxl-job学习前言一、作业类型二、执行流程总结

MethodJobHandler

:

Bean

类型作业处理器,

Bean类型作业

逻辑实际上封装在带有

@XxlJob

注解的

Method

中;

ScriptJobHandler

:脚本类型作业处理器,如

Shell

Python

PHP

Nodejs

PowerShell

等都可以看出脚本类型作业,使用该处理器;

GlueJobHandler

:该种作业处理器专门用于处理

Glue(Java)

类型作业,上节分析过

Java

类型作业会被

GlueFactory

编译、初始化成实例,然后封装到

GlueJobHandler

中进行执行。

二、执行流程

服务端流程

服务端作业执行触发入口见

JobTriggerPoolHelper#addTrigger

public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

    // 这里根据一定规则将触发任务从两个线程池中选取一个进行投递
    // fastTriggerPool:默认投递线程池
    // slowTriggerPool:慢作业投递到该线程池
    // 慢作业定义:投递超过500ms,且累计一分钟超过10次(每分钟重置缓存重新计算),则该作业就是慢作业,后续执行时使用slowTriggerPool 
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {

            long start = System.currentTimeMillis();

            try {
                // 触发作业
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

                // 每分钟清空慢作业累计缓存
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }

                // 超过500ms则慢作业执行次数累计+1,
                // 执行端采用异步模式:作业下发到执行端放入到队列中即返回,所以,这个时间是不包括作业本身执行时间
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }
            }

        }
    });
}

           

继续向下跟踪

XxlJobTrigger#trigger

:

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // 阻塞处理策略
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
    // 路由策略
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    // 分片参数
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1、save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    // xxl_job_log插入运行日志
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2、init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 初始化执行器地址
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            // 分片广播模式
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            //路由策略选取执行器地址
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4、trigger remote executor
    ReturnT<String> triggerResult = null;
    if (address != null) {
        // 作业执行
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 收集执行信息
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、save log trigger-info
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());

    // 将执行信息更新到xxl_job_log日志表中
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

           

这个方法代码比较多,但是逻辑都比较简单,核心逻辑:

​ 广播或路由策略选取执行器地址 -> 作业执行 -> 收集执行信息更新到xxl_job_log日志表中。

路由策略下节单独分析,接下里继续跟踪作业执行流程

XxlJobTrigger#runExecutor

:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        // 根据address获取ExecutorBiz
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    // 结果解析
    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

           

根据

address

获取对应的执行器代理

ExecutorBiz

,然后调用其

run

方法将作业下发到执行器端运行。

上节分析过执行器启动时使用

netty

初始化一个

http server

web容器

所以,这里的下发逻辑比较简单,就是调用

http

接口

XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);

执行端流程

上节执行器启动流程分析过其在启动时会利用

netty

初始化一个

http server

web容器

,用于接收

admin

下发指令,

然后将接收到的指令转给

EmbedHttpServerHandler#process

处理:

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
        
    // valid
    if (HttpMethod.POST != httpMethod) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    if (accessToken!=null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) { //执行器是否正常(在线),对应路由策略:故障转移
            return executorBiz.beat();
        } else if ("/idleBeat".equals(uri)) {// 执行器是否空闲,对应路由策略:忙碌转移
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        } else if ("/run".equals(uri)) {
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) { // kill作业指令监听
            logger.info("receive kill, data:{}", requestData);
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {// 查看执行器调度日志监听
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
    }
}

           

继续跟踪

ExecutorBizImpl#run

:

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    // 根据jobId从缓存中加载JobThread和IJobHandler
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // 作业类型匹配 并进行IJobHandler校验
    // 比如作业IJobHandler发送变更、Glue类作业源码出现编辑等,则之前缓存的JobThread不能再继续使用,并使用最新IJobHandler创建JobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {//Bean类型作业
		......
    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {//Java类型作业
		......
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {//脚本类作业
		......
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    if (jobThread != null) {
        // 如果JobThread != null,则该JobThread可能存在正在运行作业,则根据阻塞策略处理
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // 丢弃后续调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则当前作业丢弃
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // 覆盖之前调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则destroy之前的JobThread,并重新创建JobThread运行当前作业
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // 单机串行则直接将作业发送到JobThread的triggerQueue中即可
        }
    }

    if (jobThread == null) {
        // 创建JobThread,并放入缓存,如果jobId缓存中已存在,则destroy
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    logger.debug("jobThread.pushTriggerQueue hash:{}, data:{}", System.identityHashCode(jobThread), GsonTool.toJson(triggerParam));
    // 将下发的作业放入到JobThread的triggerQueue中,JobThread处理线程从triggerQueue提取执行
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

           

下发的作业被投递到

JobThread

triggerQueue

队列中,

JobThread#run

:

@Override
public void run() {

    try {
    	// 调用IJobHandler.init方法,如@XxlJob(init=xxx)即在这里调用
		handler.init();
	} catch (Throwable e) {
    	logger.error(e.getMessage(), e);
	}

	while(!toStop){
        // running=false表示当前JobThread没有在处理作业
		// isRunningOrHasQueue()中判断JobThread是否运行用到该值以及triggerQueue
		running = false;
		// 空闲次数累加+1
		idleTimes++;

        TriggerParam triggerParam = null;
            ReturnT<String> executeResult = null;
            try {
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if (triggerParam!=null) {
                    // running=true表示当前JobThread正在处理作业
					running = true;
					// 重置空闲统计次数
					idleTimes = 0;
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					// 初始化日志文件
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
					XxlJobFileAppender.contextHolder.set(logFileName);
					// 将分片信息注入到线程上下文中:InheritableThreadLocal
					ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

					// execute
					XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

					// executorTimeout:作业执行超时控制
					// 正常执行作业是handler.execute(triggerParam.getExecutorParams()),
					// 如果带有超时控制,则封装FutureTask放入到线程中异步执行,超时则触发中断并返回超时异常
					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							final TriggerParam triggerParamTmp = triggerParam;
							FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
								@Override
								public ReturnT<String> call() throws Exception {
									return handler.execute(triggerParamTmp.getExecutorParams());
								}
							});
							futureThread = new Thread(futureTask);
							futureThread.start();

							executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						} catch (TimeoutException e) {

							XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
							XxlJobLogger.log(e);

							executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
						} finally {
							futureThread.interrupt();
						}
					} else {
						// 调用对应的IJobHandler处理作业
						executeResult = handler.execute(triggerParam.getExecutorParams());
					}

					if (executeResult == null) {
						executeResult = IJobHandler.FAIL;
					} else {
						executeResult.setMsg(
								(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
										?executeResult.getMsg().substring(0, 50000).concat("...")
										:executeResult.getMsg());
						executeResult.setContent(null);	// limit obj size
					}
					XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

				} else {
					// 连续超时30次(每次3秒),即90秒内JobThread一直空闲,则销毁JobThread
					if (idleTimes > 30) {
						if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
							XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
						}
					}
				}
			} catch (Throwable e) {
				if (toStop) {
					XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				StringWriter stringWriter = new StringWriter();
				e.printStackTrace(new PrintWriter(stringWriter));
				String errorMsg = stringWriter.toString();
				executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

				// 作业执行异常,则将异常信息写入到日志中
				XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
			} finally {
                if(triggerParam != null) {
                    if (!toStop) {
						// JobThread未停止场景下,异步回调机制将执行结果推送到admin
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                    } else {
						// JobThread停止场景下,异步回调机制将kill异常推送到admin
                        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
                    }
                }
            }
        }

		// JobThread被kill,检查下triggerQueue是否还有等待触发作业,如果有则向admin推送异常信息
		while(triggerQueue !=null && triggerQueue.size()>0){
			TriggerParam triggerParam = triggerQueue.poll();
			if (triggerParam!=null) {
				// is killed
				ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
			}
		}

		// destroy
		try {
			// 销毁IJobHandler,调用IJobHandler.destroy方法,如@XxlJob(destroy=xxx)即在这里调用
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

           

上面代码很多,但是逻辑不太复杂,看注释很容易理解到,接下来再来看下执行流程中最后一个核心组件

IJobHandler

,调用作业执行逻辑被封装到该组件中,

xxl-job

内置提供了三种实现方式,分别对应调用

Bean

Java

脚本类型

作业,其实现不太复杂,这里就不再继续深入分析。

核心抽象组件

DataX篇—分布式任务调度框架xxl-job学习前言一、作业类型二、执行流程总结

ExecutorRouter

:路由组件,选取执行器地址;

ExecutorBizClient

:路由组件选取任务执行器地址后,将其包装成

ExecutorBizClient

ExecutorBizClient

可以看成执行器在引擎端代理,屏蔽远程RPC网络通信底层细节;

EmbedHttpServerHandler

:执行器通过

netty

实现

http server

容器,

EmbedHttpServerHandler

扩展组件用于处理接收指令;

ExecutorBizImpl

ExecutorBizClient

作为执行器在引擎端代理,主要将指令通过

RPC

转发给执行器,起到透传作用,

ExecutorBizImpl

则是执行器上真正实现逻辑封装,所以,

ExecutorBizClient

ExecutorBizImpl

都实现同一接口

ExecutorBiz

JobThread

:每个任务在执行器上执行都会对应一个

JobThread

,任务和任务间是互相独立的,

JobThread

控制任务在执行器上并发模型。

IJobHandler

IJobHandler

则是封装怎么调用任务逻辑,

xxl-job

内置三种实现类分别用来调用不同类型任务。

总结

上面对

xxl-job

作业执行的核心关键代码进行了整体分析梳理,整体还是比较简单,可能比较枯燥,下面简要整理了作业执行的大概流程(见下图),可对

xxl-job

调度机制有个大致理解:

DataX篇—分布式任务调度框架xxl-job学习前言一、作业类型二、执行流程总结

大致描述:

  • xxl-job

    整体架构采用中心化设计,分为调度中心

    Admin

    和执行器两部分;
  • 调度中心

    Admin

    模块提供

    trigger

    触发接口进行作业调度,然后根据作业历史统计下发耗时将作业分配到两个线程池中的一个进行执行;
  • 执行前将作业启动日志记录到

    xxl_job_log

    表中,然后利用路由组件选取执行器地址,并利用执行器代理

    ExecutorBiz

    将执行下发到路由的执行器上,执行器代理

    ExecutorBiz

    实现很简单:就是发送

    http

    请求;
  • 执行器在启动时会利用

    netty

    初始化一个内嵌

    http server

    容器,当接收到调度中心发送过来的指令后,将其转交给

    EmbedHttpServerHandler

    处理器进行处理;
  • EmbedHttpServerHandler

    处理器在处理作业运行指令时,会根据

    jobId

    从缓存中查找对应的

    JobThread

    ,然后将作业执行指令投递到

    JobThread

    实例中

    triggerQueue

    队列中排队;
  • JobThread

    线程不停循环从

    triggerQueue

    队列中提取等待执行的作业信息,然后将其交由

    IJobHandler

    真正处理作业调用,

    JobThread

    IJobHandler

    处理结果解析后投递给

    TriggerCallbackThread

    线程中

    callBackQueue

    队列中排队;
  • TriggerCallbackThread

    内部也是线程不停循环从

    callBackQueue

    提取回调任务,然后转交给

    doCallback

    方法,这个方法内部通过

    Admin

    代理类

    AdminBizClient

    叫结果回调发送给调用中心的回调接口,即完成作业完成通知。

上面就是

xxl-job

作业执行的整体大致流程,将其抽象出来的几个核心组件串联起来看清其脉络,则整个逻辑就比较清晰了。这里理解关键点是

JobThread

组件,每个作业在每个执行器中会对应一个

JobThread

实例,当作业下发到执行器上时,找到对应的

JobThread

进行处理。

JobThread

采用懒加载和缓存模式设计,只有作业下发执行器未找到对应的

JobThread

才会创建并返回起来,待下次同一个作业过来执行时直接使用该

JobThread

即可。

什么场景下执行器找不到

JobThread

  • 作业第一次下发到该执行器;

    JobThread

    IJobHandler

    处理结果解析后投递给

    TriggerCallbackThread

    线程中

    callBackQueue

    队列中排队;
  • TriggerCallbackThread

    内部也是线程不停循环从

    callBackQueue

    提取回调任务,然后转交给

    doCallback

    方法,这个方法内部通过

    Admin

    代理类

    AdminBizClient

    叫结果回调发送给调用中心的回调接口,即完成作业完成通知。

上面就是

xxl-job

作业执行的整体大致流程,将其抽象出来的几个核心组件串联起来看清其脉络,则整个逻辑就比较清晰了。这里理解关键点是

JobThread

组件,每个作业在每个执行器中会对应一个

JobThread

实例,当作业下发到执行器上时,找到对应的

JobThread

进行处理。

JobThread

采用懒加载和缓存模式设计,只有作业下发执行器未找到对应的

JobThread

才会创建并返回起来,待下次同一个作业过来执行时直接使用该

JobThread

即可。

什么场景下执行器找不到

JobThread

  • 作业第一次下发到该执行器;
  • JobThread

    内部线程循环不停从

    triggerQueue

    提取作业进行处理,且每个作业在执行器上对应一个

    JobThread

    ,如果某个作业在执行器上执行一次后面不再执行、或者执行频率很低,可能会导致大量线程浪费,所以

    JobThread

    设计上有空闲超时自动销毁机制。当

    30 * 3 = 90秒

    没有执行作业,则判断

    JobThread

    空闲超时,进入销毁流程,后面又接收到该作业下发来的指令,则会重新创建

    JobThread