1.建立自己的線程池
其中taskExecutor1,taskExecutor2,taskExecutor3使用ThreadPoolTaskExecutor定義一個可以供異步使用的線程池,ThreadPoolTaskExecutor是對ThreadPoolExecutor的封裝,也可以使用JUC包下的ThreadPoolExecutor定義一個線程池(如taskExecutor4,taskExecutor5),傳回一個Executor或者ThreadPoolExecutor即可,這樣便在異步任務中指定對應的線程池Bean名稱便可使用該線程池。此處建議需要設定線程池的名稱,有利于排查問題。
package com.cxf.cxfasynctask.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
@Configuration
public class ThreadPoolConfig {
@Bean("taskExecutor1")
public Executor taskExecutor1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//設定線程池參數資訊
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("taskExecutor1-");
//置線程池關閉的時候等待所有任務都完成再繼續銷毀其他的Bean,這樣這些異步任務的銷毀就會先于Redis線程池的銷毀
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 設定線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確定應用最後能夠被關閉,而不是阻塞住
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒絕政策為使用目前線程執行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化線程池
taskExecutor.initialize();
return taskExecutor;
}
@Bean("taskExecutor2")
public Executor taskExecutor2() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//設定線程池參數資訊
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("taskExecutor2-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒絕政策為使用目前線程執行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化線程池
taskExecutor.initialize();
return taskExecutor;
}
@Bean("taskExecutor3")
public Executor taskExecutor3() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//設定線程池參數資訊
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(2);
taskExecutor.setQueueCapacity(1);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("taskExecutor3-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒絕政策為使用目前線程執行
//taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化線程池
taskExecutor.initialize();
return taskExecutor;
}
@Bean("taskExecutor4")
public Executor taskExecutor4() {
ThreadFactory springThreadFactory =
new CustomizableThreadFactory("taskExecutor4-");
ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(
2,
2,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
springThreadFactory
);
return taskExecutor;
}
@Bean("taskExecutor5")
public ThreadPoolExecutor taskExecutor5() {
ThreadFactory springThreadFactory =
new CustomizableThreadFactory("taskExecutor4-");
ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(
2,
2,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
springThreadFactory
);
return taskExecutor;
}
}
2.建立異步的方法
建立一個異步的Service在該Service中使用@Async("taskExecutor1")即可指定自己定義的線程池,taskExecutor1為上一步定義的線程池名稱,該異步方法有兩個傳回值可以傳回void和Future<T>,此處的異步處理邏輯相當于一個runnable或者callable
package com.cxf.cxfasynctask.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@Service
public class AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
@Async("taskExecutor1")
public void dealTask1(){
logger.info("dealTask1... start");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("dealTask1... end" );
}
@Async("taskExecutor2")
public Future<String> dealTask2(){
logger.info("dealTask2... start");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("dealTask2... end" );
return new AsyncResult<String>("dealTask2");
}
@Async("taskExecutor3")
public void dealTask3(int i){
logger.info("dealTask3... start ..."+i);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("dealTask3... end..."+i);
}
@Async("taskExecutor4")
public void dealTask4(int i){
logger.info("dealTask4... start ..."+i);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("dealTask4... end..."+i);
}
public void dealTask5(int i){
logger.info("dealTask4... start ..."+i);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("dealTask4... end..."+i);
}
}
3.調用controller
在controller中調用上一步定義的異步方法,一種是直接注入異步service調用電定義好的異步方法,一種是直接使用第一步定義的線程池建立一個匿名runnable或callable調用非@Async("taskExecutor4")定義的方法,也可以對第一步定義的線程池進行封裝元件然後使用,如第四步。
package com.cxf.cxfasynctask.controller;
import com.cxf.cxfasynctask.component.AsyncComponent;
import com.cxf.cxfasynctask.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@RestController
@RequestMapping("/api/async")
public class AsyncController {
@Autowired
private AsyncService asyncService;
@Autowired
private Executor taskExecutor4;
@Autowired
private AsyncComponent asyncComponent;
@GetMapping("/sendAsync1")
public String sendAsync1(){
asyncService.dealTask1();
return "succ";
}
@GetMapping("/sendAsync2")
public String sendAsync2(){
Future<String> stringFuture = asyncService.dealTask2();
String res = "";
try {
res = stringFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "succ"+res;
}
@GetMapping("/sendAsync3")
public String sendAsync3() {
int num = new Random().nextInt(1000);
try {
asyncService.dealTask3(num);
} catch (Exception e) {
return "送出失敗..." + num;
}
return "succ..." + num;
}
@GetMapping("/sendAsync4")
public String sendAsync4() {
int num = new Random().nextInt(1000);
try {
asyncService.dealTask4(num);
} catch (Exception e) {
return "送出失敗..." + num;
}
return "succ..." + num;
}
@GetMapping("/sendAsync5")
public String sendAsync5() {
int num = new Random().nextInt(1000);
try {
taskExecutor4.execute(() -> {
asyncService.dealTask5(num);
});
} catch (Exception e) {
return "送出失敗..." + num;
}
return "succ..." + num;
}
@GetMapping("/sendAsync6")
public String sendAsync6() {
int num = new Random().nextInt(1000);
try {
asyncComponent.submit(() -> {
asyncService.dealTask5(num);
});
} catch (Exception e) {
return "送出失敗..." + num;
}
return "succ..." + num;
}
}
4.可以自定義自己的線程池送出邏輯
對第一步定義的線程池進行進一步的封裝,可以添加自己的邏輯,實作自己的線程池邏輯。
package com.cxf.cxfasynctask.component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
@Component
public class AsyncComponent {
@Autowired
private ThreadPoolExecutor taskExecutor5;
public void submit(Runnable runnable){
taskExecutor5.submit(runnable);
}
}
5.啟動類加 @EnableAsync
在啟動類上添加啟動異步任務的注解@EnableAsync
package com.cxf.cxfasynctask;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class CxfAsyncTaskApplication {
public static void main(String[] args) {
SpringApplication.run(CxfAsyncTaskApplication.class, args);
}
}