天天看点

Quartz集群

概述

为什么需要集群:

  1. 防止单点故障,减少对业务的影响。
  2. 通过负载减少节点的压力,例如在 10 点要触发 1000 个任务,如果有 10 个节点,则每个节点只需要执行 100 个任务。

集群需要解决的问题:

  1. 任务重跑,因为节点部署的内容是一样的,到 10 点的时候,每个节点都会执行相同的操作,引起数据混乱。比如跑批,绝对不能执行多次。
  2. 任务漏跑,一个任务分配到某个节点上,但是因为该节点故障了,一直没有得到执行的情况。
  3. 水平集群需要注意服务器时间的同步。
  4. Quartz 使用的是随机的负载均衡算法,不能指定节点执行。

由于上面的问题,所以需要在分布式集群的多个quartz节点上通过某种方式进行进程间的数据共享和通信。

可以使用的方法有Zookeeper、redis、DB等。

在 Quartz 中,提供了一种简单的方式,基于数据库共享任务执行信息。也就是说,一个节点执行任务的时候,会操作数据库,其他的节点查询数据库,便可以感知到了。

数据库使用的表依旧是持久化使用的那11张表。

具体配置方法
  1. 配置quartz.properties配置文件:要配置四个地方,配置集群实例Id,集群开关、数据库持久化、数据源信息。
#配置调度器名称
org.quartz.scheduler.instanceName: MyScheduler

##配置集群id,如果使用集群,instanceId必须唯一,设置成AUTO,会自动生成一个唯一的id
org.quartz.scheduler.instanceId = AUTO

#配置线程池信息
#配置线程池类型
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
#线程数
org.quartz.threadPool.threadCount: 10
#线程优先级
org.quartz.threadPool.threadPriority: 5

#线程是否继承初始化线程的上下文加载器
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

#漏跑阈值,单位毫秒。这里设置为60秒,也就是一个任务本来是17:00:00 跑,然后17:01:00还没跑就认为是漏跑
org.quartz.jobStore.misfireThreshold: 60000

#是否使用集群
org.quartz.jobStore.isClustered = true


#使用的JobStoreTX实现
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
#指定的代理类
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties, 不使用默认配置
org.quartz.jobStore.useProperties:true
#数据库中 quartz 表的表名前缀
org.quartz.jobStore.tablePrefix:QRTZ_
#数据源 ,在下面配置
org.quartz.jobStore.dataSource:myDS
​
#配置数据源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:5201314..a
org.quartz.dataSource.myDS.validationQuery=select 0 from dual
           
  1. 测试:清空那11个表,然后设置两个触发时间及频率一样的任务(触发器)。

    验证 1:先后启动 2 个节点,任务是否重跑。就是看两个节点是否都输出一样的任务。

    验证 2:停掉一个节点,任务是否漏跑。

java代码:

public class MyJobDemo1 implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        //JobExecutionContext是上下文,可以获取KV信息、触发器等信息。
        Date date = new Date();
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        System.out.println( " " + sf.format(date) + " 任务1执行了," + dataMap.getString("jobName"));
    }
}

public class MyJobDemo2 implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        //JobExecutionContext是上下文,可以获取KV信息、触发器等信息。
        Date date = new Date();
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        System.out.println( " " + sf.format(date) + " 任务2执行了," + dataMap.getString("jobName"));
    }
}


public class MyScheduler {
    public static void main(String[] args) throws SchedulerException {

        //创建任务
        JobDetail jobDetail = JobBuilder.newJob(MyJobDemo1.class)
                .usingJobData("jobName","MyJobDemo1")
                .withIdentity("MyJobDemo1","jobGroup")
                .build();

        JobDetail jobDetail1 = JobBuilder.newJob(MyJobDemo2.class)
                .usingJobData("jobName","MyJobDemo2")
                .withIdentity("MyJobDemo2","jobGroup")
                .build();

        //创建简单触发器。每分钟执行一次

        Trigger simpleTrigger = TriggerBuilder.newTrigger()
                //定义额外参数
                .usingJobData("tName","simpleTrigger")
                //使用name和group唯一确定这个触发器
                .withIdentity("simpleTrigger","triggerGroup")
                //描述
                .withDescription("简单触发器")
                //每分钟执行一次
                .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(3))
                //立即启动
                .startNow()
                .build();

        //创建CalendarIntervalTrigger,每两分钟执行一次
        Trigger CalendarIntervalTrigger = TriggerBuilder.newTrigger()
                //定义额外参数
                .usingJobData("tName","CalendarIntervalTrigger")
                //使用name和group唯一确定这个触发器
                .withIdentity("CalendarIntervalTrigger","triggerGroup")
                //描述
                .withDescription("这是基于日历的触发器")
                //每2分钟执行一次
                .withSchedule(CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withIntervalInSeconds(3))
                //立即启动
                .startNow()
                .build();

        //把两个触发器加入到一个集合里面
        Set<Trigger> triggers1 = new HashSet<>();
        Set<Trigger> triggers2 = new HashSet<>();
        triggers1.add(CalendarIntervalTrigger);
        triggers2.add(simpleTrigger);

        //创建一个调度器工厂
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        //获取调度器,单例模式的,获取的调度器都一样
        Scheduler scheduler = schedulerFactory.getScheduler();

        //添加任务和触发器  这个就是 1个任务对应2个触发器   第三个参数是是否代替的意思,就是如果存在同名、同组任务或者触发器时,是否用这个覆盖。
        scheduler.scheduleJob(jobDetail,triggers1,true);
        scheduler.scheduleJob(jobDetail1,triggers2,true);
        //启动任务调度
        scheduler.start();
    }
}

           

运行MyScheduler 的main方法两次,模拟两个节点。

Quartz集群
Quartz集群

可以看到图一先启动,所以开始执行两个任务,之后再次启动一个节点,变成了一个节点执行任务1,一个执行任务2,没有任务重跑。这个是不一定的,因为负载是随机的 ,有可能出现一个节点执行两个任务,另一个空闲的情况

先启动1再启动2,然后关闭节点1,过了一会,节点2开始执行任务,并且会把节点1在宕机期间漏跑的任务重跑。

节点1

Quartz集群

节点2:

Quartz集群