Spring 通過任務執行器(TaskExecutor)來實作多線程和并發程式設計。使用ThreadPoolTaskExecutor可以實作一個基于線程池的TaskExecutor。而實際上開發中的任務一般是非阻塞的,即異步的,是以我們要在配置類中通過@EnableAsync注解來開啟對異步任務的支援,并通過在實際執行的Bean的方法上使用@Async注解來聲明一個異步任務。
配置類:
package com.chenfeng.xiaolyuh.thread.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration // 聲明目前類是一個配置類,相當于Spring配置的XML檔案
@ComponentScan(basePackages={"com.chenfeng.xiaolyuh.thread"})
@EnableAsync// 利用@EnableAsync注解開啟異步任務的支援
// 配置類實作AsyncConfigurer接口并重寫getAsyncExecutor方法,并傳回ThreadPoolTaskExecutor,這樣我們就獲得了一個基于線程池TaskExecutor
public class ThreadConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心線程數
taskExecutor.setCorePoolSize(5);
// 最大線程數
taskExecutor.setMaxPoolSize(50);
// 隊列最大長度
taskExecutor.setQueueCapacity(1000);
// 線程池維護線程所允許的空閑時間(機關秒)
taskExecutor.setKeepAliveSeconds(120);
// 線程池對拒絕任務(無線程可用)的處理政策 ThreadPoolExecutor.CallerRunsPolicy政策 ,調用者的線程會執行該任務,如果執行器已關閉,則丢棄.
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public Executor getThreadPool() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(1000);
taskExecutor.setQueueCapacity(1000);
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
任務執行類:
package com.chenfeng.xiaolyuh.thread.service;
import java.util.concurrent.Executor;
import javax.annotation.Resource;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 多線程的測試類
* @ClassName DemoELService
* @author yuhao.wang
* @Date 2017年3月10日 下午3:41:18
* @version 1.0.0
*/
@Service
public class DemoThreadService {
@Resource(name = "getThreadPool")
private Executor executor;
@Async
// 通過@Async注解方法表名這個方法是一個異步方法,如果注解在類級别,則表名該類的所有方法都是異步的,
// 而這裡的方法自動被注入使用ThreadPoolTaskExecutor作為TaskExecutor
public void executeAsyncTask(Integer i) {
System.out.println("執行異步任務:" + i);
}
public void executeAsyncTaskPlus(Integer i) {
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("執行異步任務+1:" + i);
}
});
}
}
package com.chenfeng.xiaolyuh.test;
import org.junit.After;
import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import com.chenfeng.xiaolyuh.thread.config.ThreadConfig;
import com.chenfeng.xiaolyuh.thread.service.DemoThreadService;
/**
* Created by yuhao.wang on 2017/3/9.
*/
public class SpringThreadTest {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ThreadConfig.class);
@Test
public void contextTest() {
DemoThreadService demoThreadService = context.getBean(DemoThreadService.class);
for (int i = 0; i < 1000; i++) {
demoThreadService.executeAsyncTaskPlus(i);
demoThreadService.executeAsyncTask(i);
}
}
@After
public void closeContext() {
context.close();
}
}