天天看点

xxj-job服务端架构流程

作者:IT巅峰技术

前 言

本篇我们主要讲一下《xxl-job的调度流程》,在讲调度流程前,我们先概述一下:客户端接入流程、服务端配置流程和路由策略参数详解。

一、客户端接入流程

1 添加Maven依赖

<dependency>
  <groupId>com.xuxueli</groupId>
  <artifactId>xxl-job-core</artifactId>
  <version>${选择合适的版本}</version>
</dependency>           

2 添加xxl-job的配置

在application.yml中添加xxl-job的配置

基础参数:

xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
xxl.job.accessToken=default_token
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.address=
xxl.job.executor.ip=
xxl.job.executor.port=9999
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30           

拓展参数,非必填

xxl.job.i18n=zh_CN
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100           

3 代码创建执行函数

  1. 任务开发:在Spring Bean实例中,开发Job方法;
  2. 注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
  3. 执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
  4. 任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过:
  5. "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
@XxlJob(“demoJobHandler”)
public void demoJobHandler() throws Exception {
    XxlJobHelper.log(“XXL-JOB, Hello World.”);

    for (int i = 0; i < 5; i++) {
        XxlJobHelper.log(“beat at:” + i);
        TimeUnit.SECONDS.sleep(2);
    }
    // default success
}           

4 客户端配置参数说明:

xxj-job服务端架构流程

二、服务端配置流程

1 执行器管理

xxj-job服务端架构流程

  • AppName:执行组Name,Name相同的执行器视为同一个执行组
  • 名称:执行组中文名。
  • 注册方式:
  1. 自动注册:IP地址由执行器上报,通常这样使用;
  2. 手动注册:手动输入执行器地址IP,不建议使用。

2 任务管理

xxj-job服务端架构流程

基础配置:

  • 执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置;
  • 任务描述:任务的描述信息,便于任务管理;
  • 负责人:任务的负责人;
  • 报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔。

触发配置:

  • CRON:触发任务执行的Cron表达式;
  • 固定速度:固件速度的时间间隔,单位为秒;
  • 固定延迟:固件延迟的时间间隔,单位为秒。

任务配置:

  • 运行模式:
(1)BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;
(2)GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;
(3)GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本;
(4)GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本;
(5)GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本;
(6)GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本;
(7)GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本;           
  • JobHandler:运行模式为 "BEAN模式" 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值;
  • 执行参数:任务执行所需的参数;

高级配置

  • 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
(1)FIRST(第一个):固定选择第一个机器;
(2)LAST(最后一个):固定选择最后一个机器;
(3)ROUND(轮询):;
(4)RANDOM(随机):随机选择在线的机器;
(5)CONSISTENT\_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
(6)LEAST\_FREQUENTLY\_USED(最不经常使用):使用频率最低的机器优先被选举;
(7)LEAST\_RECENTLY\_USED(最近最久未使用):最久未使用的机器优先被选举;
(8)FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
(9)BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
(10)SHARDING\_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;           
  • 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
  • 调度过期策略:
(1)忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
(2)立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;           
  • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
(1)单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
(2)丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
(3)覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;           
  • 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
  • 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;

三、路由策略参数详解

xxj-job服务端架构流程

上面我们讲解了客户端接入流程、服务端配置流程和路由策略参数详解,接下来我们讲一下《xxj-job服务端架构流程》

四、xxl-job的调度流程

任务调度器和执行器使用http协议通信,各自有轮询线程处理不同业务。

xxj-job服务端架构流程

五、xxl-job的调度中心详解

1 XXL-JOB的启动和销毁逻辑:

如代码可见,xxl-job调度中心的启动和销毁,核心是处理几个线程池的创建和销毁。对每一个业务线程池,后续有详细讲解。

public class XxlJobScheduler  {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);

    public void init() throws Exception {
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

    public void destroy() throws Exception {

        // stop-schedule
        JobScheduleHelper.getInstance().toStop();

        // admin log report stop
        JobLogReportHelper.getInstance().toStop();

        // admin lose-monitor stop
        JobCompleteHelper.getInstance().toStop();

        // admin fail-monitor stop
        JobFailMonitorHelper.getInstance().toStop();

        // admin registry stop
        JobRegistryHelper.getInstance().toStop();

        // admin trigger pool stop
        JobTriggerPoolHelper.toStop();

    }
}           

2 任务触发线程池

任务触发线程池:JobTriggerPoolHelper.toStart();

启动两个执行任务的线程池,通常任务在fastTriggerPool,统计一分钟内超时10次的任务,对超时任务再执行放进slowTriggerPool。

// job-timeout 10 times in 1 min

public class JobTriggerPoolHelper {
    private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
    // ---------------------- trigger pool ---------------------
    // fast/slow thread pool
    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;

    public void start(){
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });

        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }
}           

3 执行器管理线程

执行器管理线程:

JobRegistryHelper.getInstance().start();

保证任务执行的时候拿到的执行器列表都是运行状态的

  1. 启动一个守护线程;
  2. 每隔三十秒,查询一次数据库 注册表 中自动注册的执行器;
  3. 删除超过90秒未再次注册(心跳)的执行器;
  4. 将执行器注册信息加载到内存Map中;
  5. 更新注册上了的执行器地址信息到 任务执行表 中。
public void start(){
        // for monitor
        registryMonitorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {

                    // auto registry group
                    List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                    if (groupList!=null && !groupList.isEmpty()) {

                        // remove dead address (admin/executor)
                        List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                        if (ids!=null && ids.size()>0) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                        }

                        // fresh online address (admin/executor)
                        HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                        List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                        if (list != null) {
                            for (XxlJobRegistry item: list) {
                                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                    String appname = item.getRegistryKey();
                                    List<String> registryList = appAddressMap.get(appname);
                                    if (!registryList.contains(item.getRegistryValue())) {
                                        registryList.add(item.getRegistryValue());
                                    }
                                    appAddressMap.put(appname, registryList);
                                }
                            }
                        }

                        // fresh group address
                        for (XxlJobGroup group: groupList) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
            }
        });
        registryMonitorThread.setDaemon(true);
        registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
        registryMonitorThread.start();
    }
}           

4 失败任务管理线程

失败任务管理线程:JobFailMonitorHelper.getInstance().start();

管理执行失败的任务,重试或者发送告警

  1. 每隔10s查询执行失败的任务;
  2. 如果设置重试次数,就进行重试操作;
  3. 如未设置重试次数,或已经重试超过重试次数,就发送告警信息(邮件或短信等)。

5 完成任务管理线程

完成任务管理线程:JobCompleteHelper.getInstance().start();

管理超时任务,或者执行器宕机的任务,做轮询补偿。

  1. 每隔1min查询查询状态未结束的任务;
  2. 如果距任务开始时间10min 并且 注册执行器不在线,那么就标记任务执行结束。

6 日志管理线程

日志管理线程:JobLogReportHelper.getInstance().start();

统计任务执行成功率,删除过期日志。

  1. 每隔 1min 执行一次;
  2. 按天统计总任务数,成功和失败的个数,可通过 xxl.job.logretentiondays 配置天数 默认30天。

7 任务执行调度线程

任务执行调度线程:

JobScheduleHelper.getInstance().start();

scheduleThread:任务查询并计算执行时间线程

  1. 每一秒 查询数据库中执行时间在 当前时间 至 (当前时间 + 5s)区间的任务;
  2. 根据CronHelp类计算出下次执行时间;
  3. 将任务的下次执行时间写入数据库;
  4. 加载此次执行任务Id到缓存中。
使用ConcurrentHashMap缓存,Key是分钟内的秒数(0-59),Value是任务Id组成的数组
{
    "1":[
        251,
        172
    ],
    "2":[
        643,
        172
    ],
    "39":[
        273
    ],
    "59":[
        188,
        175
    ]
}           

ringThread: 任务执行线程

  1. 每一秒轮询一次,查找当前秒的任务Id ;
  2. 根据任务Id,查出任务详情,并投递到发送线程池;
  3. 发送线程池查询到执行器地址列表,根据配置的发送策略,通过http请求发送到执行器。

发送策略:(对应页面的路由策略)

xxj-job服务端架构流程

六、附录

一致性哈希算法详解

private static int VIRTUAL_NODE_NUM = 100;

public String hashJob(int jobId, List<String> addressList) {
    // ------A1------A2-------A3------
    // -----------J1------------------
    // Address的hashCode为Key,address本身为Value;
    TreeMap<Long, String> addressRing = new TreeMap<Long, String>();

    for (String address: addressList) {
        // 对Address进行 100 次取模,每次对Key+1,
        for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
            long addressHash = hash("SHARD-" + address + "-NODE-" + i);
            addressRing.put(addressHash, address);
        }
    }

    // 对任务Id取模,以Hash树最近的Address作为选定的
    long jobHash = hash(String.valueOf(jobId));
    SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
    return addressRing.firstEntry().getValue();
}           
xxj-job服务端架构流程
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

继续阅读