天天看点

xxx-job 源码解读(一)

1.  调度中心启动源码分析

 首先从spring的配置看起, 从以下配置可以看出,xxl内部使用的是quartz

spring配置

<

bean

id

=

"quartzScheduler"

lazy-init

=

"false"

class

=

"org.springframework.scheduling.quartz.SchedulerFactoryBean"

>

<

property

name

=

"dataSource"

ref

=

"dataSource"

/>

<

property

name

=

"autoStartup"

value

=

"true"

/>         

<!--自动启动 -->

<

property

name

=

"startupDelay"

value

=

"20"

/>             

<!--延时启动,应用启动成功后在启动 -->

<

property

name

=

"overwriteExistingJobs"

value

=

"true"

/> 

<!--覆盖DB中JOB:true、以数据库中已经存在的为准:false -->

<

property

name

=

"applicationContextSchedulerContextKey"

value

=

"applicationContextKey"

/>

<

property

name

=

"configLocation"

value

=

"classpath:quartz.properties"

/>

</

bean

>

<!-- 这个调度中心,在启动的时候,会做很多初始化的工作 ,比如:执行器信息,注册机器列表等信息 -->

<

bean

id

=

"xxlJobDynamicScheduler"

class

=

"com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler"

init-method

=

"init"

destroy-method

=

"destroy"

>

<!-- 配置调度中心的名称 -->

<

property

name

=

"scheduler"

ref

=

"quartzScheduler"

/>

<!-- 用于调度中心和执行器之间通信的时候做数据加密 -->

<

property

name

=

"accessToken"

value

=

"${xxl.job.accessToken}"

/>

</

bean

>

com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler  在启动的时候会做如下工作: 
           

XxlJobDynamicScheduler

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

public

void

init() 

throws

Exception {

// 启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现

JobRegistryMonitorHelper.getInstance().start();

// 启动失败日志监控线程

JobFailMonitorHelper.getInstance().start();

// admin-server(spring-mvc)

NetComServerFactory.putService(AdminBiz.

class

, XxlJobDynamicScheduler.adminBiz);

NetComServerFactory.setAccessToken(accessToken);

// valid

Assert.notNull(scheduler, 

"quartz scheduler is null"

);

logger.info(

">>>>>>>>> init xxl-job admin success."

);

}

JobRegistryMonitorHelper.getInstance().start()   详细代码如下: 
           

JobRegistryMonitorHelper

public

void

start(){

//创建一个线程

registryThread = 

new

Thread(

new

Runnable() {

@Override

public

void

run() {

// 当toStop 为false时进入该循环。

while

(!toStop) {

try

{

// 获取类型为自动注册的执行器地址列表

List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(

);

if

(CollectionUtils.isNotEmpty(groupList)) {

// 删除 90秒之内没有更新信息的注册机器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除

XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);

// fresh online address (admin/executor)

HashMap<String, List<String>> appAddressMap = 

new

HashMap<String, List<String>>();

// 查询在90秒之内有过更新的机器列表

List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);

if

(list != 

null

) {

//循环注册机器列表,  根据执行器不同,将这些机器列表区分拿出来

for

(XxlJobRegistry item: list) {

// 判断该机器注册信息RegistryGroup ,RegistType 是否是EXECUTOR , EXECUTOR 代表该机器是注册到执行器上面的

// RegistType  分为两种, ADMIN 和EXECUTOR

if

(RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {

// 获取注册的执行器 KEY  (也就是执行器)

String appName = item.getRegistryKey();

List<String> registryList = appAddressMap.get(appName);

if

(registryList == 

null

) {

registryList = 

new

ArrayList<String>();

}

if

(!registryList.contains(item.getRegistryValue())) {

registryList.add(item.getRegistryValue());

}

// 收集 机器信息,根据执行器做区分

appAddressMap.put(appName, registryList);

}

}

}

//  遍历执行器列表

for

(XxlJobGroup group: groupList) {

// 通过执行器的APP_NAME  拿出他下面的集群机器地址

List<String> registryList = appAddressMap.get(group.getAppName());

String addressListStr = 

null

;

if

(CollectionUtils.isNotEmpty(registryList)) {

Collections.sort(registryList);

// 转为为String, 通过逗号分隔

addressListStr = StringUtils.join(registryList, 

","

);

}

group.setAddressList(addressListStr);

// 将 这个执行器的 集群机器地址列表,写入到数据库

XxlJobDynamicScheduler.xxlJobGroupDao.update(group);

}

}

catch

(Exception e) {

logger.error(

"job registry instance error:{}"

, e);

}

try

{

TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

catch

(InterruptedException e) {

logger.error(

"job registry instance error:{}"

, e);

}

}

}

});

registryThread.setDaemon(

true

);

//启动线程

registryThread.start();

}

JobFailMonitorHelper.getInstance().start(); 详细代码如下: 
           

JobFailMonitorHelper

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

public

void

start(){

// 启动线程

monitorThread = 

new

Thread(

new

Runnable() {

@Override

public

void

run() {

// monitor

while

(!toStop) {

try

{

List<Integer> jobLogIdList = 

new

ArrayList<Integer>();

// 从队列中拿出所有可用的 jobLogIds

int

drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);

if

(CollectionUtils.isNotEmpty(jobLogIdList)) {

for

(Integer jobLogId : jobLogIdList) {

if

(jobLogId==

null

|| jobLogId==

) {

continue

;

}

//从数据库跟以前有日志信息

XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);

if

(log == 

null

) {

continue

;

}

//任务触发成功, 但是JobHandle 还没有返回结果

if

(IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 

) {

//将 JobLogId 放入队列 , 继续监控

JobFailMonitorHelper.monitor(jobLogId);

logger.info(

">>>>>>>>>>> job monitor, job running, JobLogId:{}"

, jobLogId);

else

if

(IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {

// job success, pass

logger.info(

">>>>>>>>>>> job monitor, job success, JobLogId:{}"

, jobLogId);

else

if

(IJobHandler.FAIL.getCode() == log.getTriggerCode()

|| IJobHandler.FAIL.getCode() == log.getHandleCode()

|| IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() ) {

// 任务执行失败, 执行发送邮件等预警措施

failAlarm(log);

logger.info(

">>>>>>>>>>> job monitor, job fail, JobLogId:{}"

, jobLogId);

else

{

JobFailMonitorHelper.monitor(jobLogId);

logger.info(

">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}"

, jobLogId);

}

}

}

// 停顿一下

TimeUnit.SECONDS.sleep(

10

);

catch

(Exception e) {

logger.error(

"job monitor error:{}"

, e);

}

}

});

monitorThread.setDaemon(

true

);

monitorThread.start();

}

以上 是xxl-job 在启动的时候做的操作,  主要是启动两个线程,  

  1. 用来监控自动注册上来的机器,达到自动注册的目的
  2. 监控任务的执行状态, 如若失败,则发送邮件预警

xxl-job 是基于quartz 进行的二次开发,在系统启动的时候,quartz框架会自动去数据库读取相关的配置信息,载入相关定时器信息

sharedCode源码交流群,欢迎喜欢阅读源码的朋友加群,添加下面的微信, 备注”加群“ 。 

xxx-job 源码解读(一)
xxx-job 源码解读(一)

继续阅读