通過之前三篇關于Spring Boot異步任務實作的博文,我們分别學會了 用@Async建立異步任務 、 為異步任務配置線程池 使用多個線程池隔離不同的異步任務
。今天這篇,我們繼續對上面的知識進行完善和優化!
如果你已經看過上面幾篇内容并已經掌握之後,一起來思考下面這個問題:
假設,線程池配置為核心線程數2、最大線程數2、緩沖隊列長度2。此時,有5個異步任務同時開始,會發生什麼?
https://blog.didispace.com/spring-boot-learning-2-7-8/#%E5%9C%BA%E6%99%AF%E9%87%8D%E7%8E%B0 場景重制
我們先來把上面的假設用代碼實作一下:
第一步:建立Spring Boot應用,根據上面的假設寫好線程池配置。
@EnableAsync
@SpringBootApplication
public class Chapter78Application {
public static void main(String[] args) {
SpringApplication.run(Chapter78Application.class, args);
}
@EnableAsync
@Configuration
class TaskPoolConfig {
@Bean
public Executor taskExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("executor-1-");
return executor;
}
}
}
第二步:用
@Async
注解實作一個部分任務
@Slf4j
@Component
public class AsyncTasks {
public static Random random = new Random();
@Async("taskExecutor1")
public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
log.info("開始任務:{}", taskNo);
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任務:{},耗時:{} 毫秒", taskNo, end - start);
return CompletableFuture.completedFuture("任務完成");
}
}
第三步:編寫測試用例
@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {
@Autowired
private AsyncTasks asyncTasks;
@Test
public void test2() throws Exception {
// 線程池配置:core-2,max-2,queue=2,同時有5個任務,出現下面異常:
// org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
// active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9
long start = System.currentTimeMillis();
// 線程池1
CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");
// 一起執行
CompletableFuture.allOf(task1, task2, task3, task4, task5).join();
long end = System.currentTimeMillis();
log.info("任務全部完成,總耗時:" + (end - start) + "毫秒");
}
}
執行一下,可以類似下面這樣的日志資訊:
2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-2] com.didispace.chapter78.AsyncTasks : 開始任務:2
2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-1] com.didispace.chapter78.AsyncTasks : 開始任務:1
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
... 74 more
從異常資訊
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task:
中,可以很明确的知道,第5個任務因為超過了執行線程+緩沖隊列長度,而被拒絕了。
所有,預設情況下,線程池的拒絕政策是:當線程池隊列滿了,會丢棄這個任務,并抛出異常。
https://blog.didispace.com/spring-boot-learning-2-7-8/#%E9%85%8D%E7%BD%AE%E6%8B%92%E7%BB%9D%E7%AD%96%E7%95%A5 配置拒絕政策
雖然線程池有預設的拒絕政策,但實際開發過程中,有些業務場景,直接拒絕的政策往往并不适用,有時候我們可能會選擇舍棄最早開始執行而未完成的任務、也可能會選擇舍棄剛開始執行而未完成的任務等更貼近業務需要的政策。是以,為線程池配置其他拒絕政策或自定義拒絕政策是很常見的需求,那麼這個要怎麼實作呢?
下面就來具體說說今天的正題,如何為線程池配置拒絕政策、如何自定義拒絕政策。
看下面這段代碼的最後一行,
setRejectedExecutionHandler
方法就是為線程池設定拒絕政策的方法:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//...其他線程池配置
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
在ThreadPoolExecutor中提供了4種線程的政策可以供開發者直接使用,你隻需要像下面這樣設定即可:
// AbortPolicy政策
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// DiscardPolicy政策
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// DiscardOldestPolicy政策
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// CallerRunsPolicy政策
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
這四個政策對應的含義分别是:
- AbortPolicy政策:預設政策,如果線程池隊列滿了丢掉這個任務并且抛出RejectedExecutionException異常。
- DiscardPolicy政策:如果線程池隊列滿了,會直接丢掉這個任務并且不會有任何異常。
- DiscardOldestPolicy政策:如果隊列滿了,會将最早進入隊列的任務删掉騰出空間,再嘗試加入隊列。
- CallerRunsPolicy政策:如果添加到線程池失敗,那麼主線程會自己去執行該任務,不會等待線程池中的線程去執行。
而如果你要自定義一個拒絕政策,那麼可以這樣寫:
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 拒絕政策的邏輯
}
});
當然如果你喜歡用Lamba表達式,也可以這樣寫:
executor.setRejectedExecutionHandler((r, executor1) -> {
// 拒絕政策的邏輯
});
好了,今天的學習就到這裡!
如果您學習過程中如遇困難?可以加入我們超高品質的
Spring技術交流群,參與交流與讨論,更好的學習與進步!更多
Spring Boot教程可以點選直達!,歡迎收藏與轉發支援!
https://blog.didispace.com/spring-boot-learning-2-7-8/#%E4%BB%A3%E7%A0%81%E7%A4%BA%E4%BE%8B 代碼示例
本文的完整工程可以檢視下面倉庫中
2.x
目錄下的
chapter7-8
工程:
- Github: https://github.com/dyc87112/SpringBoot-Learning/
- Gitee: https://gitee.com/didispace/SpringBoot-Learning/
如果您覺得本文不錯,歡迎
Star
支援,您的關注是我堅持的動力!