天天看點

Spring定時任務并行(異步)處理

最近項目中遇到一個問題 , 在SpringBoot中設定了定時任務之後 , 在某個點總是沒有執行 . 經過搜尋研究發現 , spring 定時器任務scheduled-tasks預設配置是單線程串行執行的 . 即在目前時間點之内 . 如果同時有兩個定時任務需要執行的時候 , 排在第二個的任務就必須等待第一個任務執行完畢執行才能正常運作.如果第一個任務耗時較久的話 , 就會造成第二個任務不能及時執行 . 這樣就可能由于時效性造成其他問題 . 而在實際項目中 , 我們也往往需要這些定時任務是"各幹各的" , 而不是排隊執行. 

以下為預設串行的定時任務代碼

package com.xbz.timerTask.task;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @title 測試spring定時任務執行
 * @createDate 2017年8月18日
 * @version 1.0
 */
@Component
@Configuration
@EnableScheduling
public class MyTestTask {
    private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Scheduled(fixedDelay = 1000)
    public void executeUpdateYqTask() {
        System.out.println(Thread.currentThread().getName() + " >>> task one " + format.format(new Date()));
    }

    @Scheduled(fixedDelay = 1000)
    public void executeRepaymentTask() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + " >>> task two " + format.format(new Date()));
        Thread.sleep(5000);
    }
}
           

啟動項目之後 , 發現控制台輸出如下 : 

Spring定時任務并行(異步)處理

可以發現 , 一直是pool-5-thread-1一個線程在執行定時任務 , 這顯然不符合我們的業務需求.

如何把定時任務改造成異步呢 , 在spring中網上文檔較多 , 不再叙述 . 但在SpringBoot找到的相關資料也是建立xml檔案的方式配置 , 實際上這就違背了SpringBoot減少配置檔案的初衷 . 

在SpringBoot可以自定義以下線程池配置 : 

package com.xbz.config;

@Configuration
@EnableScheduling
public class ScheduleConfig implements SchedulingConfigurer, AsyncConfigurer{

	/** 異步處理 */
	public void configureTasks(ScheduledTaskRegistrar taskRegistrar){
		TaskScheduler taskScheduler = taskScheduler();
		taskRegistrar.setTaskScheduler(taskScheduler);
	}

	/** 定時任務多線程處理 */
	@Bean(destroyMethod = "shutdown")
	public ThreadPoolTaskScheduler taskScheduler(){
		ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
		scheduler.setPoolSize(20);
		scheduler.setThreadNamePrefix("task-");
		scheduler.setAwaitTerminationSeconds(60);
		scheduler.setWaitForTasksToCompleteOnShutdown(true);
		return scheduler;
	}

	/** 異步處理 */
	public Executor getAsyncExecutor(){
		Executor executor = taskScheduler();
		return executor;
	}

	/** 異步處理 異常 */
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
		return new SimpleAsyncUncaughtExceptionHandler();
	}
}
           

此時再啟動定時任務 , 就發現已經是異步處理的了 . 

Spring定時任務并行(異步)處理

如果項目中同時配置了異步任務的線程池和定時任務的異步線程處理 , 配置類如下 : 

package com.xbz.config;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

/**
 * @title 使用自定義的線程池執行異步任務 , 并設定定時任務的異步處理 
 * @version 1.0
 */
@Configuration
@EnableAsync
@EnableScheduling
public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer {

    private static final Logger LOG = LogManager.getLogger(ExecutorConfig.class.getName());

    @Autowired
    private TaskThreadPoolConfig config;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(config.getCorePoolSize());
        executor.setMaxPoolSize(config.getMaxPoolSize());
        executor.setQueueCapacity(config.getQueueCapacity());
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        executor.setThreadNamePrefix("taskExecutor-");

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執行任務,而是由調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     * @title 異步任務中異常處理 
     * @description 
     * @author Xingbz
     * @createDate 2017年9月11日
     * @return
     * @see org.springframework.scheduling.annotation.AsyncConfigurer#getAsyncUncaughtExceptionHandler()
     * @version 1.0
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                LOG.error("==========================" + ex.getMessage() + "=======================", ex);
                LOG.error("exception method:" + method.getName());
            }
        };
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        TaskScheduler taskScheduler = taskScheduler();
        taskRegistrar.setTaskScheduler(taskScheduler);
    }
    
    /**
     * 并行任務使用政策:多線程處理
     * 
     * @return ThreadPoolTaskScheduler 線程池
     */
     @Bean(destroyMethod = "shutdown")
     public ThreadPoolTaskScheduler taskScheduler() {
         ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
         scheduler.setPoolSize(config.getCorePoolSize());
         scheduler.setThreadNamePrefix("task-");
         scheduler.setAwaitTerminationSeconds(60);
         scheduler.setWaitForTasksToCompleteOnShutdown(true);
         return scheduler;
     }
}
           

需要注意 , 這兩個配置類隻能同時配置一個 , 如果配置了第二個 , 則第一個就無需再用.