天天看点

Quartz + Redis实现集群定时任务

在服务集群中,定时任务主要需要考虑两个问题:

1.是否每个节点都需要执行定时任务?

2.主节点定时任务如何保障高可用?

如果在定时任务中操作数据,多个节点运行可能会出现数据不一致问题

场景如下:

存在一个定时任务,每分钟从数据库获取数据进行处理,并推送到第三方系统,处理完后删除数据库中的数据。

当A节点查询到数据,并且处理未完成,同时节点B也查询到了数据,开始进行处理。这种情况下第三方系统最终会同时接收到A节点和B节点推送过来的数据。这无疑是不可接受的。

因此对于有数据一致性要求的任务,最好通过单节点执行,通过故障转移实现高可用,这是最简单的。多节点运行既可能出现脏数据,也耗费了更多不必要的资源。

接下来分享下实现:

Quartz是一个调度任务框架,可以很好地与springboot结合使用,只需在项目中添加依赖,并实现Job、Trigger即可,没有网友说的那么复杂。当然Quartz本身提供了集群定时任务解决方案,但官方提供的方案是基于数据库实现的,对于应用方来说可能不希望数据库被业务以外的表浸入,因此我考虑了通过Quartz + Redis的方式实现集群定时任务单节点运行。

Quartz Maven依赖(版本由具体的spring-boot-starter-parent确定):

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
           

新增配置类:

@Configuration
public class SchedulerConfig {

    /**
    * 定义任务
    */
	@Bean
	public JobDetail testJob() {
		JobDataMap data = new JobDataMap(); //通过JobDataMap向任务中传递参数
		return JobBuilder.newJob(TestJob.class).withIdentity("TestJob").usingJobData(data).storeDurably()
				.build();
	}

    /**
    * 定义触发器
    */
	@Bean
	public Trigger syncPwdTrigger(@Qualifier("testJob") JobDetail job) {
		ScheduleBuilder<?> csb = getScheduleBuilderWithMillisecond(5000); // 每5秒运行一次

		return TriggerBuilder.newTrigger().forJob(job).withIdentity("TestTrigger").withSchedule(csb).build();
	}

	private ScheduleBuilder<?> getScheduleBuilderWithMillisecond(long milliSecond) {
		SimpleScheduleBuilder ssb = SimpleScheduleBuilder.simpleSchedule().withIntervalInMilliseconds(milliSecond).repeatForever();
		return ssb.repeatForever();
	}
}
           

新增任务类,处理具体业务逻辑:

public class TestJob extends QuartzJobBean {
	
	@Override
	protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
		System.out.println("test");
	}
}
           

至此已经实现了定时任务,服务启动即会开始运行定时任务。接下来实现如何在集群中,保证仅一个节点在运行定时任务,且可实现故障转移。

思路如下:

1.每个节点均运行调度任务,但仅让其中一个节点(master)执行业务逻辑,其他节点仅用作故障转移。

2.往redis中存一个值,这个值可以标记服务的唯一性,定义为serverId(serverId = ip + port + uuid,ip、port是为了监控程序可通过redis寻找当前的master位置,uuid在启动时动态产生,预防多个内网部署ip port重复)。

3.当触发器触发了任务,任务先通过redis判断是否存在master节点,存在则与serverId比较当前节点是否master,不存在则让当前节点成为master,只有master节点方可执行后续业务逻辑(考虑到原子性,判断是否存在master以及成为master需要在同一事务中,通过redis执行lua脚本实现)。

4.设置在redis中存储的serverId会超时过期,但是master节点可以续期,当master发生故障不再续期,serverId过期后其他节点就会成为master,继续执行定时任务(redis过期时间需要大于调度周期,如定时任务每5秒执行一次,则redis过期时间需要大于5秒,否则每一轮调度都是不同的节点在执行。同时由于过期时间大于调度周期,当master节点发生故障时,redis中的serverId还未过期,会错过一轮调度,对于严格要求调度周期的任务,需要考虑其他辅助方案)。

实现如下:

lua脚本:

local key = KEYS[1]
local val = redis.call("GET", key)
if val == ARGV[1] or not(val)
then
    return redis.call("SETEX", KEYS[1], ARGV[2], ARGV[1])
else
    return "FAIL"
end
           

java中redis执行lua:

public String exeLua(String script, List<String> keys, Object... valus) throws Exception {
	RedisScript<String> redisScript = new DefaultRedisScript<String>(script, String.class);
	return redisTemplate.execute(redisScript, keys, valus);
}
           

TestJob完整的代码:

public class TestJob extends QuartzJobBean {
	
	@Override
	protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
	    JobDetailImpl job = (JobDetailImpl) context.getJobDetail();
	    if (tryPromotionMaster()) {
	        System.out.println("test");
	    }
	}

    private boolean tryPromotionMaster(JobDetailImpl job) {
   		JobDataMap dataMap = job.getJobDataMap();
		RedisTemplate redisTemplate = (RedisUtil) dataMap.get("redisTemplate");
		String runningServerId = dataMap.getString("runningServerId");
		String timeout = 6; // redis6秒过期
		String script = "local key = KEYS[1] local val = redis.call(\"GET\", key) if val == ARGV[1] or not(val) then return redis.call(\"SETEX\", KEYS[1], ARGV[2], ARGV[1]) else return \"FAIL\" end ";
		String key = MessageFormat.format("test_scheduler_task_{0}", job.getFullName());
		
		String result = null;
		try {
		    RedisScript<String> redisScript = new DefaultRedisScript<String>(script, String.class);
			result = redisTemplate.execute(redisScript, Arrays.asList(key), runningServerId, timeout);
		} catch (Exception e) {
			LOG.error("", e); 
		}
		if (result != null && "OK".equalsIgnoreCase(result.toString())) {
			return true;
		}
		
		return false;
	}
}