Quartz是实现定时任务的利器,Quartz主要有四个组成部分,分别是:
1. Job(任务):包含具体的任务逻辑;
2. JobDetail(任务详情):是对Job的一种详情描述;
3. Trigger(触发器):负责管理触发JobDetail的机制;
4. Scheduler(调度器):负责Job的执行。
有两种方式可以实现Spring Boot与Quartz的整合:
一、使用Spring提供的工厂类
spring-context.jar和spring-context-support.jar类库提供了一些org.quartz包的扩展,使用这些扩展并通过注入bean的方式可以实现与Quartz的整合。
1. pom.xml
添加依赖:
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
Spring Boot 2.0.4指定的quartz.jar的版本是2.3.0
2. Job
首先定义一个执行器:
import java.util.Date;
import devutility.internal.text.format.DateFormatUtils;
public class Executor {
public static void execute(String name, long costMillis) {
Date startDate = new Date();
Date endDate = new Date(startDate.getTime() + costMillis);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(String.format("[%s]: ", DateFormatUtils.format(startDate, "yyyy-MM-dd HH:mm:ss:SSS")));
stringBuilder.append(String.format("%s executing on %s, ", name, Thread.currentThread().getName()));
stringBuilder.append(String.format("will finish at %s.", DateFormatUtils.format(endDate, "yyyy-MM-dd HH:mm:ss:SSS")));
System.out.println(stringBuilder.toString());
try {
Thread.sleep(costMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
定义Job:
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import devutility.test.app.quartz.common.Executor;
@Component
@EnableScheduling
public class SpringJobs {
public void job1() {
Executor.execute("Job1", 5000);
}
public void job2() {
Executor.execute("Job2", 6000);
}
}
@EnableScheduling注解是必须要有的。
3. JobDetail
1 package devutility.test.app.quartz.config;
2
3 import org.quartz.Trigger;
4 import org.springframework.beans.factory.annotation.Qualifier;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7 import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
8 import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
9 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
10
11 import devutility.test.app.quartz.jobs.SpringJobs;
12
13 @Configuration
14 public class SpringJobConfiguration {
15 @Bean
16 public MethodInvokingJobDetailFactoryBean jobDetailFactory1(SpringJobs springJobs) {
17 MethodInvokingJobDetailFactoryBean jobDetailFactory = new MethodInvokingJobDetailFactoryBean();
18 jobDetailFactory.setName("Spring-Job1");
19 jobDetailFactory.setGroup("Spring-Group");
20
21 jobDetailFactory.setTargetObject(springJobs);
22 jobDetailFactory.setTargetMethod("job1");
23
24 jobDetailFactory.setConcurrent(false);
25 return jobDetailFactory;
26 }
27
28 @Bean
29 public MethodInvokingJobDetailFactoryBean jobDetailFactory2(SpringJobs springJobs) {
30 MethodInvokingJobDetailFactoryBean jobDetailFactory = new MethodInvokingJobDetailFactoryBean();
31 jobDetailFactory.setName("Spring-Job2");
32 jobDetailFactory.setGroup("Spring-Group");
33
34 jobDetailFactory.setTargetObject(springJobs);
35 jobDetailFactory.setTargetMethod("job2");
36
37 jobDetailFactory.setConcurrent(true);
38 return jobDetailFactory;
39 }
MethodInvokingJobDetailFactoryBean是来自spring-context-support.jar的一个工厂类,它实现了FactoryBean<JobDetail>接口,完成了对具体job的封装。
21行指定具体的任务是我们在2中定义的SpringJobs,22行指定我们使用SpringJobs中的job1方法作为定时任务的具体业务实现。
注意24和37行,如果一个任务每隔5秒执行一次,但是每次需要执行10秒,那么它有两种执行方式,串行(执行完上一个再开启下一个)和并行(间隔时间到了就开始并行执行下一个),24和37就分别设置成了并行和串行执行。
4. Trigger
Quartz支持多种trigger,其中配置最为灵活的一种当属CronTrigger,它属于表达式类型的配置方式,有关cron表达式的配置请参考
1 package devutility.test.app.quartz.config;
2
3 import org.quartz.Trigger;
4 import org.springframework.beans.factory.annotation.Qualifier;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7 import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
8 import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
9 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
10
11 import devutility.test.app.quartz.jobs.SpringJobs;
12
13 @Configuration
14 public class SpringJobConfiguration {
15 @Bean
16 public CronTriggerFactoryBean cronTriggerFactory1(@Qualifier("jobDetailFactory1") MethodInvokingJobDetailFactoryBean jobDetailFactory1) {
17 CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
18 cronTriggerFactoryBean.setName("cronTriggerFactoryForJobDetailFactory1");
19 cronTriggerFactoryBean.setJobDetail(jobDetailFactory1.getObject());
20 cronTriggerFactoryBean.setCronExpression("0/3 * * * * ?");
21 return cronTriggerFactoryBean;
22 }
23
24 @Bean
25 public CronTriggerFactoryBean cronTriggerFactory2(@Qualifier("jobDetailFactory2") MethodInvokingJobDetailFactoryBean jobDetailFactory2) {
26 CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
27 cronTriggerFactoryBean.setName("cronTriggerFactoryForJobDetailFactory2");
28 cronTriggerFactoryBean.setJobDetail(jobDetailFactory2.getObject());
29 cronTriggerFactoryBean.setCronExpression("0/4 * * * * ?");
30 return cronTriggerFactoryBean;
31 }
本例使用的Cron表达式非常简单,分别是每隔3秒和每隔4秒执行一次。
5. Scheduler
1 @Bean
2 public SchedulerFactoryBean schedulerFactory1(Trigger cronTriggerFactory1, Trigger cronTriggerFactory2) {
3 SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
4 schedulerFactoryBean.setStartupDelay(2);
5 schedulerFactoryBean.setTriggers(cronTriggerFactory1, cronTriggerFactory2);
6 return schedulerFactoryBean;
7 }
行4指明系统启动之后需要延迟2秒执行;
行5用于注册需要执行的触发器;
SchedulerFactoryBean还有一个叫autoStartup的属性,用于指明任务在系统启动之后是否立即执行,默认是true。
6. 测试
由此可见,Job1是按照我们的预期串行执行的。
Job2则是并行执行的。
二、使用org.quartz原生类和方法
1. pom.xml,只需要添加quartz
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
2. 定义一个Scheduler类型的Bean
package devutility.test.app.quartz.config;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QuartzConfiguration {
@Bean
public Scheduler scheduler() throws SchedulerException {
return StdSchedulerFactory.getDefaultScheduler();
}
}
3. Job
package devutility.test.app.quartz.jobs;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import devutility.test.app.quartz.common.Executor;
@DisallowConcurrentExecution
public class ScheduleJob1 implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
Executor.execute("ScheduleJob1", 5000);
JobDetail jobDetail = context.getJobDetail();
System.out.println(String.format("key: %s", jobDetail.getKey()));
}
}
这种方式每一个Job都需要定义一个类实现org.quartz.Job接口,形式上要比第一种方式更条理。
3. 我们定义一个创建任意Job的公共方法,来实现Job类的定时执行:
1 package devutility.test.app.quartz.services;
2
3 import org.quartz.CronScheduleBuilder;
4 import org.quartz.CronTrigger;
5 import org.quartz.Job;
6 import org.quartz.JobBuilder;
7 import org.quartz.JobDetail;
8 import org.quartz.JobKey;
9 import org.quartz.Scheduler;
10 import org.quartz.SchedulerException;
11 import org.quartz.TriggerBuilder;
12 import org.quartz.TriggerKey;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.stereotype.Service;
15
16 import devutility.internal.models.OperationResult;
17
18 @Service
19 public class JobServiceImpl implements JobService {
20 @Autowired
21 private Scheduler schedulerFactory1;
22
23 @Autowired
24 private Scheduler scheduler;
25
26 @Override
27 public void start(String name, String group, String cronExpression, Class<? extends Job> clazz) throws SchedulerException {
28 JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(name, group).build();
29
30 String triggerName = String.format("trigger_%s", name);
31 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
32 CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerName, group).withSchedule(scheduleBuilder).build();
33 scheduler.scheduleJob(jobDetail, trigger);
34
35 if (!scheduler.isStarted()) {
36 scheduler.start();
37 }
38 }
28行创建一个新的JobDetail,并将Job实现类的Class对象赋值给它;
32行创建了一个新的Trigger;
33-37行使用注入进来的scheduler来运行一个定时任务。
三、对Job的CRUD操作
创建和运行Job上面已经讲过了,下面说一下Job的暂停、中断、删除和更新操作。
1. 暂停
scheduler有一个方法public void pauseJob(JobKey jobKey),该方法通过暂停trigger的触发来实现暂停job的功能,调用该方法之后正在运行的job会持续运行到业务逻辑处理完毕,等下一个触发条件满足时不再开启新的job。
public OperationResult pause(String group, String name) {
OperationResult result = new OperationResult();
JobKey jobKey = JobKey.jobKey(name, group);
try {
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
result.setErrorMessage(String.format("Pause Job with name = \"%s\" group = \"%s\" failed, system error!", name, group));
}
return result;
}
2. 中断
针对需要中断的Job,quartz专门为其定义了一个接口org.quartz.InterruptableJob:
public interface InterruptableJob extends Job {
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Interface.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @throws UnableToInterruptJobException
* if there is an exception while interrupting the job.
*/
void interrupt()
throws UnableToInterruptJobException;
}
所有需要支持中断的Job都需要实现这个接口:
package devutility.test.app.quartz.jobs;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.UnableToInterruptJobException;
import devutility.test.app.quartz.common.Executor;
@DisallowConcurrentExecution
public class ScheduleJob2 implements InterruptableJob {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
Executor.execute(context, 5000);
}
@Override
public void interrupt() throws UnableToInterruptJobException {
System.out.println("ScheduleJob2 is interrupting now。。。");
}
}
然后通过调用Scheduler实例的boolean interrupt(JobKey jobKey)方法,查看StdScheduler的源码,我们发现Scheduler的interrupt方法仅仅是调用了InterruptableJob中的interrupt()方法实现,然后设置了一下自己的interrupted属性就完了,并未做任何其他操作。
所以,如果要实现可以中断的Job,我们需要在InterruptableJob实现类中增加中断的逻辑:
package devutility.test.app.quartz.jobs;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.UnableToInterruptJobException;
import devutility.test.app.quartz.common.Executor;
public class ScheduleJob3 implements InterruptableJob {
private boolean interrupted = false;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
Executor.execute("ScheduleJob3 sub-task 1", 1000);
if (interrupted) {
return;
}
Executor.execute("ScheduleJob3 sub-task 2", 1000);
if (interrupted) {
return;
}
Executor.execute("ScheduleJob3 sub-task 3", 1000);
}
@Override
public void interrupt() throws UnableToInterruptJobException {
interrupted = true;
}
}
3. 删除
public OperationResult delete(String jobName, String jobGroup) {
OperationResult result = new OperationResult();
result.append(String.format("Removing Quartz job: %s", jobName));
try {
if (!schedulerFactory1.deleteJob(JobKey.jobKey(jobName, jobGroup))) {
result.setErrorMessage(String.format("Removing job %s failed!", jobName));
}
} catch (SchedulerException e) {
e.printStackTrace();
result.setErrorMessage(String.format("Removing job %s failed with error!", jobName));
}
return result;
}
4. 更新
public OperationResult update(CronTrigger cronTrigger, String cronExpression) {
OperationResult result = new OperationResult();
TriggerKey triggerKey = cronTrigger.getKey();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger newTrigger = cronTrigger.getTriggerBuilder().withSchedule(scheduleBuilder).build();
try {
schedulerFactory1.rescheduleJob(triggerKey, newTrigger);
result.append(String.format("Update cron trigger %s succeeded!", triggerKey.getName()));
} catch (SchedulerException e) {
e.printStackTrace();
result.setErrorMessage(String.format("Update cron trigger %s failed!", triggerKey.getName()));
}
return result;
}
Demo代码