文章目录
- 前言
- 一.SpringBoot集成Quartz
-
- 1.依赖
- 2.实现步骤
-
- 2.1.编写Quartz 的配置类
- 2.2.开启SpringBoot任务调度
- 2.3.Job 类中注入对象
- 二.Quartz内置数据库执行任务调度
-
- 1.JDBCStore 概念
- 2.建表 sql
- 3.实现步骤
-
- 3.1.新增配置文件quartz.properties
- 3.2.编写代码
- 三.Quartz基于数据库进行集群
-
- 1.集群概念
- 2.使用步骤
- 3.测试方法
- 四.Springboot整合Quartz集群
-
- 1.引入依赖
- 2.application.yml配置
- 3.quartz.properties配置文件[参考](#jbSql)
- 4.模拟任务调度service层的业务逻辑
- 5.任务
- 6.任务配置
- 7.[解决在Job中注入SpringBean时会产生空指针异常方法](#zrnd)
- 8.调度器配置
- 五.对外拓展任务CRUD
前言
【JavaWeb】Quartz—任务调度(一)从入门到了解Quartz的所有概念
一.SpringBoot集成Quartz
1.依赖
如果SpringBoot版本是
2.x
以后的,则在spring-boot-starter中已经包含了Quart的依赖,则可以直接使用
spring-boot-starter-quartz
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
如果是1.5.x则要使用以下添加依赖:
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
我使用SpringBoot版本是2.x的
2.实现步骤
2.1.编写Quartz 的配置类
import org.quartz.SchedulerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SimpleTriggerFactoryBean;
@Configuration
@Component
public class QuartzConfig {
/**
* 1.创建Job对象
*/
@Bean
public JobDetailFactoryBean jobDetailFactoryBean() {
JobDetailFactoryBean factory = new JobDetailFactoryBean();
factory.setName("JobName");
factory.setGroup("JobGroup");
//关联我们自己的Job类
factory.setJobClass(QuartzJob.class);
return factory;
}
/**
* 2.创建Trigger对象
* 简单的Trigger
*/
/*@Bean
public SimpleTriggerFactoryBean simpleTriggerFactoryBean(JobDetailFactoryBean jobDetailFactoryBean){
SimpleTriggerFactoryBean factory = new SimpleTriggerFactoryBean();
//关联JobDetail对象
factory.setJobDetail(jobDetailFactoryBean.getObject());
//该参数表示一个执行的毫秒数
factory.setRepeatInterval(2000);
//重复次数
factory.setRepeatCount(5);
return factory;
}*/
/**
* Cron Trigger
*/
@Bean
public CronTriggerFactoryBean cronTriggerFactoryBean(@Qualifier("jobDetailFactoryBean") JobDetailFactoryBean jobDetailFactoryBean) {
CronTriggerFactoryBean factory = new CronTriggerFactoryBean();
factory.setName("TriggerName");
factory.setGroup("TriggerGroup");
factory.setJobDetail(jobDetailFactoryBean.getObject()); //设置触发时间
factory.setCronExpression("0/2 * * * * ?");//每2秒触发一次Job执行
return factory;
}
/**
* 3.创建Scheduler对象
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("cronTriggerFactoryBean") CronTriggerFactoryBean cronTriggerFactoryBean) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
//关联trigger
//factory.setTriggers(simpleTriggerFactoryBean.getObject());
factory.setTriggers(cronTriggerFactoryBean.getObject());
//设置延时启动,保证job中的属性的注入
factory.setStartupDelay(5);
return factory;
}
}
2.2.开启SpringBoot任务调度
启动类或者Quartz配置类上使用
@EnableScheduling
注解开启任务调度
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
2.3.Job 类中注入对象
初始化UsersService类到Spring容器
import org.springframework.stereotype.Service;
@Service
public class UsersService {
public void addUsers() {
System.out.println("Add Users....");
}
}
实现Job接口,在该类中注入Spring容器中的对象
import java.util.Date;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
public class QuartzJob implements Job {
@Autowired
private UsersService usersService;
/**
* 任务被触发时所执行的方法
*/
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("Execute...."+new Date());
this.usersService.addUsers();
}
}
难点
: 在Job中注入SpringBean时会产生空指针异常
- 因为UsersService 在
中没有被注入到Spring容器中QuartzJob
-
中的AdaptableJobFactory
是通过createJobInstance
的反射完成实例化
-
解决办法: 通过
继承SpringBeanJobFactory或者AdaptableJobFactory
,在生成Job实例对象的的时候通过AutowireCapbaleBeanFactory将Job实例注入到Spring容器中去。
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
@Component("autowiredSpringBeanJobFactory")
public class AutowiredSpringBeanJobFactory extends AdaptableJobFactory {
//AutowireCapableBeanFactory 可以将一个对象添加到SpringIOC容器中,并且完成该对象注入
@Autowired
private AutowireCapableBeanFactory autowireCapableBeanFactory;
/**
* 该方法需要将实例化的任务对象手动的添加到springIOC容器中并且完成对象的注入
*/
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object obj = super.createJobInstance(bundle);
//将obj对象添加Spring IOC容器中,并完成注入
this.autowireCapableBeanFactory.autowireBean(obj);
return obj;
}
}
修改配置类QuartzConfig
- 初始化SchedulerFactoryBean时指定JobFactory ,以及配置延迟加载.
/**
* 3.创建Scheduler对象
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("cronTriggerFactoryBean") CronTriggerFactoryBean cronTriggerFactoryBean, @Qualifier("autowiredSpringBeanJobFactory") AutowiredSpringBeanJobFactory autowiredSpringBeanJobFactory ) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
//关联trigger
//factory.setTriggers(simpleTriggerFactoryBean.getObject());
factory.setTriggers(cronTriggerFactoryBean.getObject());
//将自定义的MyJobFactory注入配置类,并添加如下配置,
//配置使用spring的autowired的对象,在job中进行对象的注入
factory.setJobFactory(autowiredSpringBeanJobFactory);
//设置延时启动,保证job中的属性的注入
factory.setStartupDelay(5);
return factory;
}
执行结果
二.Quartz内置数据库执行任务调度
1.JDBCStore 概念
- 默认Quartz的触发器,调度器,任务等信息都是放在
中的,叫做内存
。 好处是快速,坏处是一旦系统重启,那么信息就丢失了,就得全部从头来过。RAMJobStore
- 所以Quartz还提供了另一个方式,可以把这些信息存放在
做,叫做数据库
。 好处是就算系统重启了,目前运行到第几次了这些信息都是存放在数据库中的,那么就可以继续原来的步伐把计划任务无缝地继续做下去。 坏处就是性能上比内存慢一些,毕竟数据库读取总是要慢一些的。JobStoreTX
- 所以Quartz还提供了另一个方式,可以把这些信息存放在
2.建表 sql
为了能够把相关信息存放进 mysql 数据库里,必须手动建立数据库和表,使用如下 脚本就行了。
注: 这里使用的数据库名称是 quartz
- 具体的建表sql从文章【JavaWeb】Quartz—任务调度(三)Quartz2.x内置数据表结构说明 复制粘贴就可以了
3.实现步骤
3.1.新增配置文件quartz.properties
1.新增配置文件
在
src/main/resources
下新建
quartz.properties
配置文件,里面指定使用
JobStoreTX
方式管理任务。 并且指定联系数据库的驱动、用户名、密码、url等
#调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName=MyScheduler
#调度器实例编号自动生成,每个实例不能不能相同
org.quartz.scheduler.instanceId=AUTO
#开启分布式部署,集群
org.quartz.jobStore.isClustered=true
#分布式节点有效性检查时间间隔,单位:毫秒,默认值是15000
org.quartz.jobStore.clusterCheckinInterval=2000
#远程管理相关的配置,全部关闭
org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
#实例化ThreadPool时,使用的线程类为SimpleThreadPool(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
##并发个数,指定线程数,至少为1(无默认值)(一般设置为1-100之间的的整数合适)
org.quartz.threadPool.threadCount=10
##设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5)
org.quartz.threadPool.threadPriority=5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
#容许的最大作业延长时间,最大能忍受的触发超时时间,如果超过则认为“失误”,不敢再内存中还是数据中都要配置
org.quartz.jobStore.misfireThreshold=6000
#持久化方式配置
# 默认存储在内存中,保存job和Trigger的状态信息到内存中的类
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
#数据库方式
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#持久化方式配置数据驱动,MySQL数据库(根据选择的数据库类型做不同配置选定JDBC代理类)
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix=QRTZ_
#数据库别名 随便取
org.quartz.jobStore.dataSource=mysqlDatabase
org.quartz.dataSource.mysqlDatabase.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.mysqlDatabase.URL=jdbc:mysql://localhost:3306/quartz?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
org.quartz.dataSource.mysqlDatabase.user=root
org.quartz.dataSource.mysqlDatabase.password=root
org.quartz.dataSource.mysqlDatabase.maxConnection=5
2.引入依赖
在boot2.x中spring-boot-starter-quartz依赖默认是不依赖c3p0数据源的,如果要使用需要自己单独引用c3p0数据源,否则会抛出异常
这里我们使用druid做为数据源
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
需要自定义Druid数据库连接池,需要实现
org.quartz.utils.ConnectionProvider
接口
public class DruidConnectionProvider implements ConnectionProvider {
/*
* 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。
*/
public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;
public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;
//JDBC驱动
public String driver;
//JDBC连接串
public String URL;
//数据库用户名
public String user;
//数据库用户密码
public String password;
//数据库最大连接数
public int maxConnection;
//数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。
public String validationQuery;
public String maxCachedStatementsPerConnection;
private boolean validateOnCheckout;
private int idleConnectionValidationSeconds;
private String discardIdleConnectionsSeconds;
//Druid连接池
private DruidDataSource datasource;
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* 接口实现
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
@Override
public Connection getConnection() throws SQLException {
return datasource.getConnection();
}
@Override
public void shutdown() throws SQLException {
datasource.close();
}
@Override
public void initialize() throws SQLException {
if (this.URL == null) {
throw new SQLException("DBPool could not be created: DB URL cannot be null");
}
if (this.driver == null) {
throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");
}
if (this.maxConnection < 0) {
throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!");
}
datasource = new DruidDataSource();
try {
datasource.setDriverClassName(this.driver);
} catch (Exception e) {
try {
throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);
} catch (SchedulerException e1) {
}
}
datasource.setUrl(this.URL);
datasource.setUsername(this.user);
datasource.setPassword(this.password);
datasource.setMaxActive(this.maxConnection);
datasource.setMinIdle(1);
datasource.setMaxWait(0);
datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS);
if (this.validationQuery != null) {
datasource.setValidationQuery(this.validationQuery);
if (!this.validateOnCheckout) {
datasource.setTestOnReturn(true);
}
else {
datasource.setTestOnBorrow(true);
}
datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);
}
}
/*
* 提供get set方法
*
*/
public String getDriver() {
return driver;
}
public void setDriver(String driver) {
this.driver = driver;
}
public String getURL() {
return URL;
}
public void setURL(String URL) {
this.URL = URL;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getMaxConnection() {
return maxConnection;
}
public void setMaxConnection(int maxConnection) {
this.maxConnection = maxConnection;
}
public String getValidationQuery() {
return validationQuery;
}
public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}
public boolean isValidateOnCheckout() {
return validateOnCheckout;
}
public void setValidateOnCheckout(boolean validateOnCheckout) {
this.validateOnCheckout = validateOnCheckout;
}
public int getIdleConnectionValidationSeconds() {
return idleConnectionValidationSeconds;
}
public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) {
this.idleConnectionValidationSeconds = idleConnectionValidationSeconds;
}
public DruidDataSource getDatasource() {
return datasource;
}
public void setDatasource(DruidDataSource datasource) {
this.datasource = datasource;
}
public String getDiscardIdleConnectionsSeconds() {
return discardIdleConnectionsSeconds;
}
public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) {
this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds;
}
}
quartz.properties新增一行配置
#使用的数据库连接池,默认是c3p0,我这里使用的是alibaba的druid数据源,需引入依赖
org.quartz.dataSource.mysqlDatabase.connectionProvider.class=com.oyjp.testdb.DruidConnectionProvider
3.2.编写代码
MailJob
@DisallowConcurrentExecution//串行执行
public class MailJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDetail detail = context.getJobDetail();
String email = detail.getJobDataMap().getString("email");
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
String now = sdf.format(new Date());
System.out.printf("给邮件地址 %s 发出了一封定时邮件, 当前时间是: %s (%s)%n" ,email, now,context.isRecovering());
}
}
测试方法
public class TestQuartz {
public static void main(String[] args) throws Exception {
try {
assginNewJob();
} catch (ObjectAlreadyExistsException e) {
System.err.println("发现任务已经在数据库存在了,直接从数据库里运行:"+ e.getMessage());
resumeJobFromDatabase();
}
}
//当assginNewJob()进行任务调度报异常的时候,调用当前方法从数据库中恢复任务调度
// 任务的触发也调整成了15秒一次,总共11次。
private static void resumeJobFromDatabase() throws Exception {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
// 等待200秒,让前面的任务都执行完了之后,再关闭调度器
Thread.sleep(200000);
scheduler.shutdown(true);
}
//创建一个任务调度
private static void assginNewJob() throws SchedulerException, InterruptedException {
// 创建调度器
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 定义一个触发器
Trigger trigger = newTrigger().withIdentity("trigger1", "group1") // 定义名称和所属的租
.startNow()
.withSchedule(simpleSchedule().withIntervalInSeconds(15) // 每隔15秒执行一次
.withRepeatCount(10)) // 总共执行11次(第一次执行不基数)
.build();
// 定义一个JobDetail
JobDetail job = newJob(MailJob.class) // 指定干活的类MailJob
.withIdentity("mailjob1", "mailgroup") // 定义任务名称和分组
.usingJobData("email", "[email protected]") // 定义属性
.build();
// 调度加入这个job
scheduler.scheduleJob(job, trigger);
// 启动
scheduler.start();
// 等待20秒,让前面的任务都执行完了之后,再关闭调度器
Thread.sleep(20000);
scheduler.shutdown(true);
}
}
测试方法
- 先运行一次,
,因为此时数据库里还没有任务,会顺利执行是不会看到红色的
- 经过20秒,自动结束。此时任务还未运行完毕(任务设计是总共10次,间隔15秒一次)。
- 不用修改代码,再次执行,就会看到红色的字样
,表示试图向数据库添加任务,但是发现任务已经在数据库里已经存在了,那么这个时候调度就会自动运行数据库里的任务,从而达到系统重启后自动衔接的效果
注意(如上图红色字体所示)
- 如果你使用JDBC保存任务调度数据时,当你运行代码一个job然后退出,当再次希望运行job时,系统将抛出JobDetail重名的异常
:ObjectAlreadyExistsException
- 因为每次调用
时,Quartz都会将Scheduler#scheduleJob()
的信息保存到数据库中,如果数据表中已经同名的JobDetail或Trigger,异常就产生了。JobDetail和Trigger
三.Quartz基于数据库进行集群
1.集群概念
- Quart分布式调度任务是通过
“数据库实现的
”,一个任务只能在一个Quartz节点上执行,他的集群也仅仅是解决了抢占式调度
,实现了单点故障(任务级别)
,多个任务在集群中高可用
调度,并没有解决负载均衡
的问题,不能实现任务分片
,如果执行水平扩展
,各个节点大量的短任务
,节点越多这种情况越严重,频繁的竞争数据库锁
。性能会很低下
注: 文中描述的 Quartz 应用 在一些语境下,又叫做
,都是同一个概念。Quartz 服务器节点
quartz 的集群思想如下:通过在数据库中配置定时器信息, 以数据库
悲观锁
的方式达到同一个任务始终只有一个节点在运行,
tips:
.
,称之为
在同一台机器上运行所有Quartz节点
,还是在
垂直集群
,称之为
不同的机器上运行所有节点
。
水平集群
- 对于垂直集群,存在着
的问题。这对高可用性的应用来说是个坏消息,因为一旦机器宕掉了,所有的节点也就被有效的终止了。
单点故障
- 而当你运行
时,一个严格的要求就是我们的
水平集群
必须要同步,以免出现离奇且不可预知的行为。假如时钟没能够同步,S
时钟时间
,造成难以估计得麻烦。
cheduler 实例将对其他节点的状态产生混乱
优点:
- 保证节点高可用 (HA), 如果某一个几点挂了, 其他节点可以顶上
缺点:
-
,同一个任务只能有一个节点运行
,性能低,资源浪费其他节点将不执行任务
- 当碰到
时,各个节点大量短任务
,节点越多这种情况越严重。频繁的竞争数据库锁
性能会很低下
- quartz 的分布式
,并没有解决仅解决了集群高可用的问题
的问题,不能实现任务分片
水平扩展
2.使用步骤
1.创建quartz数据库
跳转
2.quartz.properties调整
- 开启集群
- 要进行集群,多个应用调度名称 instanceName 应该是一样的
org.quartz.scheduler.instanceName = quartzScheduler
- 要进行集群,多个应用调度id instanceId 必须不一样,这里使用AUTO,就会自动分配不同的ID。 目测是本机机器名称加上时间戳
org.quartz.scheduler.instanceId = AUTO
- 每个一秒钟去数据库检查一下,以在其他应用挂掉之后及时补上
完整的quartz.properties文件
#调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName=MyScheduler
#调度器实例编号自动生成,每个实例不能不能相同
org.quartz.scheduler.instanceId=AUTO
#开启分布式部署,集群
org.quartz.jobStore.isClustered=true
#分布式节点有效性检查时间间隔,单位:毫秒,默认值是15000
org.quartz.jobStore.clusterCheckinInterval=2000
#远程管理相关的配置,全部关闭
org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
#实例化ThreadPool时,使用的线程类为SimpleThreadPool(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
##并发个数,指定线程数,至少为1(无默认值)(一般设置为1-100之间的的整数合适)
org.quartz.threadPool.threadCount=10
##设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5)
org.quartz.threadPool.threadPriority=5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
#容许的最大作业延长时间,最大能忍受的触发超时时间,如果超过则认为“失误”,不敢再内存中还是数据中都要配置
org.quartz.jobStore.misfireThreshold=6000
#持久化方式配置
# 默认存储在内存中,保存job和Trigger的状态信息到内存中的类
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
#数据库方式
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#持久化方式配置数据驱动,MySQL数据库(根据选择的数据库类型做不同配置选定JDBC代理类)
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix=QRTZ_
#数据库别名 随便取
org.quartz.jobStore.dataSource=mysqlDatabase
org.quartz.dataSource.mysqlDatabase.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.mysqlDatabase.URL=jdbc:mysql://localhost:3306/quartz?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
org.quartz.dataSource.mysqlDatabase.user=root
org.quartz.dataSource.mysqlDatabase.password=root
org.quartz.dataSource.mysqlDatabase.maxConnection=5
3.测试方法
运行有较多步骤,请严格按部就班地来:
- 启动一次 TestQuartz,叫做 a 应用
-
紧接着(在几秒钟内)再次启动 TestQuartz,叫做 b 应用
这样就启动了两个Quartz应用了
- 使用
,在两个不同的控制台观察现象多控制台显示方式
将会观察到如下现象:
- a 应用 只会运行20秒,就自动结束了。 这20秒,仅仅够运行两次任务的,所以观察到两次输出。
- b 应用在 a 应用结束之前,是不会运行的
- b 应用在 a 应用结束之后 ,自动接过了革命的火炬,不停地接着把后续的任务都运行完毕
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
import org.quartz.JobDetail;
import org.quartz.ObjectAlreadyExistsException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;
public class TestQuartz {
public static void main(String[] args) throws Exception {
try {
assginNewJob();
} catch (ObjectAlreadyExistsException e) {
System.err.println("发现任务已经在数据库存在了,直接从数据库里运行:"+ e.getMessage());
resumeJobFromDatabase();
}
}
private static void resumeJobFromDatabase() throws Exception {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
System.out.println("当前调度器的id是:"+scheduler.getSchedulerInstanceId());
scheduler.start();
// 等待200秒,让前面的任务都执行完了之后,再关闭调度器
Thread.sleep(200000);
scheduler.shutdown(true);
}
private static void assginNewJob() throws SchedulerException, InterruptedException {
// 创建调度器
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 定义一个触发器
Trigger trigger = newTrigger().withIdentity("trigger1", "group1") // 定义名称和所属的租
.startNow()
.withSchedule(simpleSchedule()
.withIntervalInSeconds(15) // 每隔15秒执行一次
.withRepeatCount(10)) // 总共执行11次(第一次执行不基数)
.build();
// 定义一个JobDetail
JobDetail job = newJob(MailJob.class) // 指定干活的类MailJob
.withIdentity("mailjob1", "mailgroup") // 定义任务名称和分组
.usingJobData("email", "[email protected]") // 定义属性
.build();
// 调度加入这个job
scheduler.scheduleJob(job, trigger);
System.out.println("当前调度器的id是:"+scheduler.getSchedulerInstanceId());
// 启动
scheduler.start();
// 等待20秒,让前面的任务都执行完了之后,再关闭调度器
Thread.sleep(20000);
scheduler.shutdown(true);
}
}
四.Springboot整合Quartz集群
1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.43</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.9</version>
</dependency>
2.application.yml配置
application.yml文件配置(这里是单数据源的配置方式,如果是多数据源配置方式可以参考我的文章【SpringBoot】实现JdbcTemplate、Druid、Dynamic-Datasource的多数据源动态切换)
server:
port: 8080
spring:
application:
name: quartz
datasource:
url: jdbc:mysql://localhost:3306/quartzdb?characterEncoding=utf-8&useUnicode=true&useSSL=false
driver-class-name: com.mysql.jdbc.Driver # mysql8.0以前使用com.mysql.jdbc.Driver
username: root
password: root
platform: mysql
#通过这句配置将druid连接池引入到我们的配置中,spring会尽可能判断类型是什么,然后根据情况去匹配驱动类。
type: com.alibaba.druid.pool.DruidDataSource
druid:
initial-size: 5 # 初始化大小
min-idle: 5 # 最小
max-active: 100 # 最大
max-wait: 60000 # 配置获取连接等待超时的时间
time-between-eviction-runs-millis: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
min-evictable-idle-time-millis: 300000 # 指定一个空闲连接最少空闲多久后可被清除,单位是毫秒
validationQuery: select 'x'
test-while-idle: true # 当连接空闲时,是否执行连接测试
test-on-borrow: false # 当从连接池借用连接时,是否测试该连接
test-on-return: false # 在连接归还到连接池时是否测试该连接
filters: config,wall,stat # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
poolPreparedStatements: true # 打开PSCache,并且指定每个连接上PSCache的大小
maxPoolPreparedStatementPerConnectionSize: 20
maxOpenPreparedStatements: 20
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.slowSqlMillis=200;druid.stat.logSlowSql=true;config.decrypt=false
# 合并多个DruidDataSource的监控数据
#use-global-data-source-stat: true
#WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter
web-stat-filter:
enabled: true #是否启用StatFilter默认值true
url-pattern: /*
exclusions: /druid/*,*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico
session-stat-enable: true
session-stat-max-count: 10
#StatViewServlet配置,说明请参考Druid Wiki,配置_StatViewServlet配置
stat-view-servlet:
enabled: true #是否启用StatViewServlet默认值true
url-pattern: /druid/*
reset-enable: true
login-username: admin
login-password: admin
3.quartz.properties配置文件参考
4.模拟任务调度service层的业务逻辑
@Service
public class UserService {
public void getUserInfo() {
System.err.println("调度getUserInof成功");
}
public void getUserAddr() {
System.err.println("调度getUserAddr成功");
}
}
5.任务
@Component
@DisallowConcurrentExecution //保证上一次任务执行完毕再执行下一任务
//@PersistJobDataAfterExecution //上一个任务完成前写入需要被下一个任务获取的变量以及对应的属性值,类似求和累加
public class JobOne extends QuartzJobBean {
private UserService userService;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
userService.getUserInfo();
}
public void setUserService(UserService userService) {
this.userService = userService;
}
}
@Component
@DisallowConcurrentExecution
public class JobTwo extends QuartzJobBean {
//不能使用注入的方式,只能使用DateMap方式传入参数
private UserService userService;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
userService.getUserAddr();
}
public void setUserService(UserService userService) {
this.userService = userService;
}
}
QuartzJobBean 是Spring实现的Job接口的一个抽象类,多出了些Spring专有处理而已,如图所示
6.任务配置
@Configuration
public class JobConfig {
@Bean("jobOneDetail")
public JobDetailFactoryBean jobOneDetailFactoryBean(JobOne jobOne) {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
jobDetailFactoryBean.setJobClass(jobOne.getClass());
//没有绑定触发器仍然保留在Quartz的JobStore中
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.setName("jobOneDetailName");
jobDetailFactoryBean.setGroup("jobOneDetailGroup");
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("test1", "1111");
jobDetailFactoryBean.setJobDataMap(jobDataMap);
return jobDetailFactoryBean;
}
@Bean("jobOneTrigger")
public CronTriggerFactoryBean cronTriggerOneFactoryBean(@Qualifier("jobOneDetail") JobDetailFactoryBean jobDetailFactoryBean) {
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(jobDetailFactoryBean.getObject());
cronTriggerFactoryBean.setCronExpression("*/1 * * * * ?");
cronTriggerFactoryBean.setName("jobOneTriggerName");
cronTriggerFactoryBean.setGroup("jobOneTriggerGroup");
return cronTriggerFactoryBean;
}
@Bean("jobTwoDetail")
public JobDetailFactoryBean jobTwoDetailFactoryBean(JobTwo jobTwo) {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
jobDetailFactoryBean.setJobClass(jobTwo.getClass());
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.setName("jobTwoDetailName");
jobDetailFactoryBean.setGroup("jobTwoDetailGroup");
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("test2", "2222");
jobDetailFactoryBean.setJobDataMap(jobDataMap);
return jobDetailFactoryBean;
}
@Bean("jobTwoTrigger")
public CronTriggerFactoryBean cronTriggerTwoFactoryBean(@Qualifier("jobTwoDetail") JobDetailFactoryBean jobDetailFactoryBean) {
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(jobDetailFactoryBean.getObject());
cronTriggerFactoryBean.setCronExpression("*/1 * * * * ?");
cronTriggerFactoryBean.setName("jobTwoTriggerName");
cronTriggerFactoryBean.setGroup("jobTwoTriggerGroup");
return cronTriggerFactoryBean;
}
}
7.解决在Job中注入SpringBean时会产生空指针异常方法
8.调度器配置
@Configuration
public class SchedulerConfig {
// 配置文件路径
private static final String QUARTZ_CONFIG = "/quartz.properties";
@Autowired
@Qualifier("autowiredSpringBeanJobFactory")
private AutowiredSpringBeanJobFactory autowiredSpringBeanJobFactory;
//配置数据源
//这里可以不使用@Bean交给spring管理,否则可能出现默认数据源的问题.
@Bean("quartzDataSource")
public DataSource quartzDataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/quartz?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false");
dataSource.setUsername("root");
dataSource.setPassword("root");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
return dataSource;
}
/**
* 从quartz.properties文件中读取Quartz配置属性
* @return
* @throws IOException
*/
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_CONFIG));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
//执行任务。有了触发器,我们就可以执行任务了。注册一个SchedulerFactroyBean,然后将触发器一list的方式传入
@Bean
public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("jobOneTrigger") Trigger jobOneTrigger, @Qualifier("jobTwoTrigger") Trigger jobTwoTrigger) throws IOException {
SchedulerFactoryBean schedulerFactoryBean=new SchedulerFactoryBean();
//调度器名称
schedulerFactoryBean.setSchedulerName("TestScheduler");
//数据源
schedulerFactoryBean.setDataSource(quartzDataSource());
//覆盖已存在的任务,用于Quartz集群,QuartzScheduler启动会更新已存在的Job
schedulerFactoryBean.setOverwriteExistingJobs(true);
//延时1s启动定时任务,避免系统未完全启动却开始执行定时任务的情况
schedulerFactoryBean.setStartupDelay(1);
//设置加载的quartz.properties配置文件
schedulerFactoryBean.setQuartzProperties(quartzProperties());
//自动启动
schedulerFactoryBean.setAutoStartup(true);
//注册触发器
schedulerFactoryBean.setTriggers(jobOneTrigger,jobTwoTrigger);
//将自定义的MyJobFactory注入配置类,并添加如下配置,
//配置使用spring的autowired的对象,在job中进行对象的注入
schedulerFactoryBean.setJobFactory(autowiredSpringBeanJobFactory);
return schedulerFactoryBean;
}
}
测试结果
- 当启动1个服务的时候,会交叉调度两个任务
- 当启动a,b 2个服务的时候,a和b服务会分别选择一个任务一直执行
- 当启动两个服务a,b,执行到一定时间,关闭服务b,a服务检测到b服务异常后,会获取b服务的调度任务执行
五.对外拓展任务CRUD
基于上面四.Springboot整合Quartz集群,我们可以通过接口的方式来进行操作任务,从而实现任务的可视化
任务
public class AsyncJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("========================立即执行的任务,只执行一次===============================");
System.out.println("jobName=====:"+jobExecutionContext.getJobDetail().getKey().getName());
System.out.println("jobGroup=====:"+jobExecutionContext.getJobDetail().getKey().getGroup());
System.out.println("taskData=====:"+jobExecutionContext.getJobDetail().getJobDataMap().get("asyncData"));
}
}
public class CronJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("=========================定时任务每5秒执行一次===============================");
System.out.println("jobName=====:"+jobExecutionContext.getJobDetail().getKey().getName());
System.out.println("jobGroup=====:"+jobExecutionContext.getJobDetail().getKey().getGroup());
System.out.println("taskData=====:"+jobExecutionContext.getJobDetail().getJobDataMap().get("taskData"));
}
}
任务操作的业务接口
public interface JobService {
/**
* 添加一个定时任务
* @param jobName
* @param jobGroup
*/
void addCronJob(String jobName, String jobGroup);
/**
* 添加异步任务
* @param jobName
* @param jobGroup
*/
void addAsyncJob(String jobName, String jobGroup);
/**
* 暂停任务
* @param jobName
* @param jobGroup
*/
void pauseJob(String jobName, String jobGroup);
/**
* 恢复任务
* @param jobName
* @param jobGroup
*/
void resumeJob(String jobName, String jobGroup);
/**
* 删除job
* @param jobName
* @param jobGroup
*/
void deleteJob(String jobName, String jobGroup);
}
任务操作的业务接口实现类
@Service
public class JobServiceImpl implements JobService {
Logger log = LoggerFactory.getLogger(this.getClass());
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
/**
* 创建一个定时任务
*
* @param jobName
* @param jobGroup
*/
@Override
public void addCronJob(String jobName, String jobGroup) {
try {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
if (jobDetail != null) {
log.info("job:" + jobName + " 已存在");
}
else {
//构建job信息
jobDetail = JobBuilder.newJob(CronJob.class).withIdentity(jobName, jobGroup).build();
//用JopDataMap来传递数据
jobDetail.getJobDataMap().put("taskData", "hzb-cron-001");
//表达式调度构建器(即任务执行的时间,每5秒执行一次)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("*/5 * * * * ?");
//按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName + "_trigger", jobGroup + "_trigger")
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void addAsyncJob(String jobName, String jobGroup) {
try {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
if (jobDetail != null) {
log.info("job:" + jobName + " 已存在");
}
else {
//构建job信息,在用JobBuilder创建JobDetail的时候,有一个storeDurably()方法,可以在没有触发器指向任务的时候,将任务保存在队列中了。然后就能手动触发了
jobDetail = JobBuilder.newJob(AsyncJob.class).withIdentity(jobName, jobGroup).storeDurably().build();
jobDetail.getJobDataMap().put("asyncData", "this is a async task");
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName + "_trigger", jobGroup + "_trigger") //定义name/group
.startNow()//一旦加入scheduler,立即生效
.withSchedule(simpleSchedule())//使用SimpleTrigger
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void pauseJob(String jobName, String jobGroup) {
try {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName + "_trigger", jobGroup + "_trigger");
scheduler.pauseTrigger(triggerKey);
log.info("=========================pause job:" + jobName + " success========================");
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 恢复任务
*
* @param jobName
* @param jobGroup
*/
@Override
public void resumeJob(String jobName, String jobGroup) {
try {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName + "_trigger", jobGroup + "_trigger");
scheduler.resumeTrigger(triggerKey);
log.info("=========================resume job:" + jobName + " success========================");
} catch (SchedulerException e) {
e.printStackTrace();
}
}
@Override
public void deleteJob(String jobName, String jobGroup) {
try {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
scheduler.deleteJob(jobKey);
log.info("=========================delete job:" + jobName + " success========================");
} catch (SchedulerException e) {
e.printStackTrace();
}
}
}
对外暴露接口
@RestController
@RequestMapping("/quartztest")
public class JobController {
@Autowired
private JobService jobService;
/**
* 创建cron任务
*
* @param jobName
* @param jobGroup
* @return
*/
@RequestMapping(value = "/cron", method = RequestMethod.POST)
public String startCronJob(@RequestParam("jobName") String jobName, @RequestParam("jobGroup") String jobGroup) {
jobService.addCronJob(jobName, jobGroup);
return "create cron task success";
}
/**
* 创建异步任务
*
* @param jobName
* @param jobGroup
* @return
*/
@RequestMapping(value = "/async", method = RequestMethod.POST)
public String startAsyncJob(@RequestParam("jobName") String jobName, @RequestParam("jobGroup") String jobGroup) {
jobService.addAsyncJob(jobName, jobGroup);
return "create async task success";
}
/**
* 暂停任务
*
* @param jobName
* @param jobGroup
* @return
*/
@RequestMapping(value = "/pause", method = RequestMethod.POST)
public String pauseJob(@RequestParam("jobName") String jobName, @RequestParam("jobGroup") String jobGroup) {
jobService.pauseJob(jobName, jobGroup);
return "pause job success";
}
/**
* 恢复任务
*
* @param jobName
* @param jobGroup
* @return
*/
@RequestMapping(value = "/resume", method = RequestMethod.POST)
public String resumeJob(@RequestParam("jobName") String jobName, @RequestParam("jobGroup") String jobGroup) {
jobService.resumeJob(jobName, jobGroup);
return "resume job success";
}
/**
* 删除任务
*
* @param jobName
* @param jobGroup
* @return
*/
@RequestMapping(value = "/delete", method = RequestMethod.PUT)
public String deleteJob(@RequestParam("jobName") String jobName, @RequestParam("jobGroup") String jobGroup) {
jobService.deleteJob(jobName, jobGroup);
return "delete job success";
}
}
-----------------相关好文--------------------
Quartz官方文档
Quartz框架集合-nice
任务调度框架Quartz集合-two-nic
项目重启quartz定时任务执行策略-misfire
6大分布式定时任务对比 就这?? 给你盘的明明白白
Quartz 源码解析(一) —— 基本介绍
Quartz 源码解析(二) —— Scheduler的初始化
Quartz 源码解析(三) —— JobDetail、Trigger和它们的Builder
Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
Quartz 源码解析(五) —— QuartzSchedulerThread
Quartz 源码解析(六) —— 解析Cron表达式