前言
最近需要用到多線程, 自己維護線程池很麻煩, 正好看到 springboot 內建線程池的例子, 這裡自己做了個嘗試和總結, 記錄一下, 也分享給需要的朋友;
不考慮事務的情況下, 這個多線程實作比較簡單, 主要有以下幾點:
在啟動類加上 @EnableAsync 注解, 開啟異步執行支援;
編寫線程池配置類, 别忘了 @Configuration , 和 @Bean 注解;
編寫需要異步執行的業務, 放到單獨的類中 (可以定義為 service, 因為需要 spring 管理起來才能用 );
在業務service中調用異步執行的service, 注意這是重點, 不能直接在業務 service 中寫異步執行的代碼, 否則無法異步執行( 這就是單獨放異步代碼的原因);
使用步驟
先建立一個線程池的配置,讓Spring Boot加載,用來定義如何建立一個ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync這兩個注解,表示這是個配置類,并且是線程池的配置類
@Configuration
@EnableAsync
public class ExecutorConfig {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數
executor.setCorePoolSize(corePoolSize);
//配置最大線程數
executor.setMaxPoolSize(maxPoolSize);
//配置隊列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱字首
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當pool已經達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執行初始化
executor.initialize();
return executor;
}
}
@Value是我配置在application.properties,可以參考配置,自由定義
# 異步線程配置
# 配置核心線程數
async.executor.thread.core_pool_size = 5
# 配置最大線程數
async.executor.thread.max_pool_size = 5
# 配置隊列大小
async.executor.thread.queue_capacity = 99999
# 配置線程池中的線程的名稱字首
async.executor.thread.name.prefix = async-service-
建立一個Service接口,是異步線程的接口
public interface AsyncService {
/**
* 執行異步任務
* 可以根據需求,自己加參數拟定,我這裡就做個測試示範
*/
void executeAsync();
}
實作類
@Service
public class AsyncServiceImpl implements AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
logger.info("start executeAsync");
System.out.println("異步線程要做的事情");
System.out.println("可以在這裡執行批量插入等耗時的事情");
logger.info("end executeAsync");
}
}
将Service層的服務異步化,在executeAsync()方法上增加注解@Async(“asyncServiceExecutor”),asyncServiceExecutor方法是前面ExecutorConfig.java中的方法名,表明executeAsync方法進入的線程池是asyncServiceExecutor方法建立的。
接下來就是在Controller裡或者是哪裡通過注解@Autowired注入這個Service
@Autowired
private AsyncService asyncService;
@GetMapping("/async")
public void async(){
asyncService.executeAsync();
}
用postmain或者其他工具來多次測試請求一下
通過以上日志可以發現,[async-service-]是有多個線程的,顯然已經在我們配置的線程池中執行了,并且每次請求中,controller的起始和結束日志都是連續列印的,表明每次請求都快速響應了,而耗時的操作都留給線程池中的線程去異步執行;
雖然我們已經用上了線程池,但是還不清楚線程池當時的情況,有多少線程在執行,多少在隊列中等待呢?這裡我建立了一個ThreadPoolTaskExecutor的子類,在每次送出線程的時候都會将目前線程池的運作狀況列印出來
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
如上所示,showThreadPoolInfo方法中将任務總數、已完成數、活躍線程數,隊列大小都列印出來了,然後Override了父類的execute、submit等方法,在裡面調用showThreadPoolInfo方法,這樣每次有任務被送出到線程池的時候,都會将目前線程池的基本情況列印到日志中;
修改ExecutorConfig.java的asyncServiceExecutor方法,将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改為ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
//在這裡修改
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數
executor.setCorePoolSize(corePoolSize);
//配置最大線程數
executor.setMaxPoolSize(maxPoolSize);
//配置隊列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱字首
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當pool已經達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執行初始化
executor.initialize();
return executor;
}
再次啟動該工程測試
送出任務到線程池的時候,調用的是submit(Callable task)這個方法,目前已經送出了3個任務,完成了3個,目前有0個線程在處理任務,還剩0個任務在隊列中等待,線程池的基本情況一路了然;
@Async 多線程擷取傳回值
// 異步執行代碼
@Service("asyncExecutorTest")
public class AsyncExecutorTest {
// 異步執行的方法, 注解内為自定義線程池類名
@Async("asyncServiceExecutor")
public Future<Integer> test1(Integer i) throws InterruptedException {
Thread.sleep(100);
System.out.println("@Async 執行: " + i);
return new AsyncResult(i);
}
// 這裡使用其它方式調用,詳見後面的 service3 方法
public Integer test2(Integer i) throws InterruptedException {
Thread.sleep(100);
System.out.println(" excute.run 執行: " + i);
return i;
}
}
// 業務 service
@Service("asyncExcutorService")
public class AsyncExcutorService {
@Autowired
AsyncExecutorTest asyncExecutorTest;
@Autowired
Executor localBootAsyncExecutor;
// 測試 無傳回值異步執行
public void service1(){
System.out.println("service1 執行----->");
for (int i = 0; i < 50; i++) {
try {
asyncExecutorTest.test1(i);
} catch (InterruptedException e) {
System.out.println("service1執行出錯");
}
}
System.out.println("service1 結束----->");
}
// 測試 有傳回值異步執行
public void service2(){
long l = System.currentTimeMillis();
System.out.println("service2 執行----->");
List<Future> result = new ArrayList<>();
try {
for (int i = 0; i < 300; i++) {
Future<Integer> integerFuture = asyncExecutorTest.test1(i);
result.add(integerFuture);
}
for (Future future : result) {
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException e) {
System.out.println("service2執行出錯");
}
System.out.println("service2 結束----->" + (System.currentTimeMillis() - l));
}
// 測試 有傳回值異步執行
public void service3(){
long l = System.currentTimeMillis();
List<Integer> result = new ArrayList<>();
try {
System.out.println("service3 執行----->");
int total = 300;
CountDownLatch latch = new CountDownLatch(total);
for (int i = 0; i < total; i++) {
final int y = i;
localBootAsyncExecutor.execute(() -> {
try {
result.add(asyncExecutorTest.test2(y));
} catch (InterruptedException e) {
System.out.println("service3執行出錯");
} finally {
latch.countDown();
}
});
}
latch.await();
} catch (InterruptedException e) {
System.out.println("service3執行出錯");
}
System.out.println("service3 結束----->" + (System.currentTimeMillis() - l));
}
}
這裡說下 service1 和 service2 的差別:
- 兩個都用的是一個線程池執行的
- service1 單純執行業務, 不用傳回資料, 主線程也不用等待
- service2 需要傳回資料, 主線程需要等待結果( 注意傳回值隻能是 Future, 最後再 .get()去擷取, 否則無法異步執行)
- service3 也可以傳回資料, 但是書寫上麻煩一些. 傳回值直接是想要的結果, 不像 service2 還需要提取一次資料.