概述
为什么需要集群:
- 防止单点故障,减少对业务的影响。
- 通过负载减少节点的压力,例如在 10 点要触发 1000 个任务,如果有 10 个节点,则每个节点只需要执行 100 个任务。
集群需要解决的问题:
- 任务重跑,因为节点部署的内容是一样的,到 10 点的时候,每个节点都会执行相同的操作,引起数据混乱。比如跑批,绝对不能执行多次。
- 任务漏跑,一个任务分配到某个节点上,但是因为该节点故障了,一直没有得到执行的情况。
- 水平集群需要注意服务器时间的同步。
- Quartz 使用的是随机的负载均衡算法,不能指定节点执行。
由于上面的问题,所以需要在分布式集群的多个quartz节点上通过某种方式进行进程间的数据共享和通信。
可以使用的方法有Zookeeper、redis、DB等。
在 Quartz 中,提供了一种简单的方式,基于数据库共享任务执行信息。也就是说,一个节点执行任务的时候,会操作数据库,其他的节点查询数据库,便可以感知到了。
数据库使用的表依旧是持久化使用的那11张表。
具体配置方法
- 配置quartz.properties配置文件:要配置四个地方,配置集群实例Id,集群开关、数据库持久化、数据源信息。
#配置调度器名称
org.quartz.scheduler.instanceName: MyScheduler
##配置集群id,如果使用集群,instanceId必须唯一,设置成AUTO,会自动生成一个唯一的id
org.quartz.scheduler.instanceId = AUTO
#配置线程池信息
#配置线程池类型
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
#线程数
org.quartz.threadPool.threadCount: 10
#线程优先级
org.quartz.threadPool.threadPriority: 5
#线程是否继承初始化线程的上下文加载器
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#漏跑阈值,单位毫秒。这里设置为60秒,也就是一个任务本来是17:00:00 跑,然后17:01:00还没跑就认为是漏跑
org.quartz.jobStore.misfireThreshold: 60000
#是否使用集群
org.quartz.jobStore.isClustered = true
#使用的JobStoreTX实现
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
#指定的代理类
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties, 不使用默认配置
org.quartz.jobStore.useProperties:true
#数据库中 quartz 表的表名前缀
org.quartz.jobStore.tablePrefix:QRTZ_
#数据源 ,在下面配置
org.quartz.jobStore.dataSource:myDS
#配置数据源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:5201314..a
org.quartz.dataSource.myDS.validationQuery=select 0 from dual
-
测试:清空那11个表,然后设置两个触发时间及频率一样的任务(触发器)。
验证 1:先后启动 2 个节点,任务是否重跑。就是看两个节点是否都输出一样的任务。
验证 2:停掉一个节点,任务是否漏跑。
java代码:
public class MyJobDemo1 implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//JobExecutionContext是上下文,可以获取KV信息、触发器等信息。
Date date = new Date();
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
System.out.println( " " + sf.format(date) + " 任务1执行了," + dataMap.getString("jobName"));
}
}
public class MyJobDemo2 implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//JobExecutionContext是上下文,可以获取KV信息、触发器等信息。
Date date = new Date();
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
System.out.println( " " + sf.format(date) + " 任务2执行了," + dataMap.getString("jobName"));
}
}
public class MyScheduler {
public static void main(String[] args) throws SchedulerException {
//创建任务
JobDetail jobDetail = JobBuilder.newJob(MyJobDemo1.class)
.usingJobData("jobName","MyJobDemo1")
.withIdentity("MyJobDemo1","jobGroup")
.build();
JobDetail jobDetail1 = JobBuilder.newJob(MyJobDemo2.class)
.usingJobData("jobName","MyJobDemo2")
.withIdentity("MyJobDemo2","jobGroup")
.build();
//创建简单触发器。每分钟执行一次
Trigger simpleTrigger = TriggerBuilder.newTrigger()
//定义额外参数
.usingJobData("tName","simpleTrigger")
//使用name和group唯一确定这个触发器
.withIdentity("simpleTrigger","triggerGroup")
//描述
.withDescription("简单触发器")
//每分钟执行一次
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(3))
//立即启动
.startNow()
.build();
//创建CalendarIntervalTrigger,每两分钟执行一次
Trigger CalendarIntervalTrigger = TriggerBuilder.newTrigger()
//定义额外参数
.usingJobData("tName","CalendarIntervalTrigger")
//使用name和group唯一确定这个触发器
.withIdentity("CalendarIntervalTrigger","triggerGroup")
//描述
.withDescription("这是基于日历的触发器")
//每2分钟执行一次
.withSchedule(CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withIntervalInSeconds(3))
//立即启动
.startNow()
.build();
//把两个触发器加入到一个集合里面
Set<Trigger> triggers1 = new HashSet<>();
Set<Trigger> triggers2 = new HashSet<>();
triggers1.add(CalendarIntervalTrigger);
triggers2.add(simpleTrigger);
//创建一个调度器工厂
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
//获取调度器,单例模式的,获取的调度器都一样
Scheduler scheduler = schedulerFactory.getScheduler();
//添加任务和触发器 这个就是 1个任务对应2个触发器 第三个参数是是否代替的意思,就是如果存在同名、同组任务或者触发器时,是否用这个覆盖。
scheduler.scheduleJob(jobDetail,triggers1,true);
scheduler.scheduleJob(jobDetail1,triggers2,true);
//启动任务调度
scheduler.start();
}
}
运行MyScheduler 的main方法两次,模拟两个节点。
可以看到图一先启动,所以开始执行两个任务,之后再次启动一个节点,变成了一个节点执行任务1,一个执行任务2,没有任务重跑。这个是不一定的,因为负载是随机的 ,有可能出现一个节点执行两个任务,另一个空闲的情况
先启动1再启动2,然后关闭节点1,过了一会,节点2开始执行任务,并且会把节点1在宕机期间漏跑的任务重跑。
节点1
节点2: