天天看點

Spring任務排程&異步任務&Web異步請求三者如何配置線程池?

作者:Spring全家桶實戰案例

環境:Spring5.3.23

3.1 任務排程

注解類:@Scheduled

核心處理類:ScheduledAnnotationBeanPostProcessor

使用的線程池:從容器中查詢TaskScheduler

  • 首先在容器中通過類型查找TaskScheduler Bean,如果沒有則抛出NoSuchBeanDefinitionException異常。
  • 在這一步中,如果找到多個,那麼會在通過beanName=taskScheduler在容器中查找
  • 在上一步中抛出異常後會繼續查找java.util.concurrent.ScheduledExecutorService 類型的Bean。
  • 在這一步中,如果找到多個,那麼會在通過beanName=taskScheduler在容器中查找
  • 在上一步中還是沒有則結束(程式并不會報錯)

如果上面流程都沒有找到,則會通過如下方式建立一個

this.localExecutor = Executors.newSingleThreadScheduledExecutor();
 this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);           

在Springboot中有個自動配置類會配置一個TaskSchedulingAutoConfiguration

public class TaskSchedulingAutoConfiguration {
   @Bean
   @ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
   @ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
   public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
     return builder.build();
   }
   
   @Bean
   @ConditionalOnMissingBean
   public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,
       ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {
     TaskSchedulerBuilder builder = new TaskSchedulerBuilder();
     builder = builder.poolSize(properties.getPool().getSize());
     Shutdown shutdown = properties.getShutdown();
     builder = builder.awaitTermination(shutdown.isAwaitTermination());
     builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
     builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
     builder = builder.customizers(taskSchedulerCustomizers);
     return builder;
   }
 }           

3.2 異步任務

注解類:Async

核心處理類:AsyncAnnotationBeanPostProcessor;

通過ProxyAsyncConfiguration配置,該類繼承AbstractAsyncConfiguration

在父類中會初始化,下面兩個成員變量:

@Configuration(proxyBeanMethods = false)
 public abstract class AbstractAsyncConfiguration implements ImportAware {
   @Nullable
   protected Supplier<Executor> executor;
   @Nullable
   protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
   // 在容器中查找AsyncConfigurer Bean 且隻能有一個
   @Autowired
   void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
     Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
       List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());
       if (CollectionUtils.isEmpty(candidates)) {
         return null;
       }
       if (candidates.size() > 1) {
         throw new IllegalStateException("Only one AsyncConfigurer may exist");
       }
       return candidates.get(0);
     });
     this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
     this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
   }
 
   private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {
     return () -> {
       AsyncConfigurer configurer = supplier.get();
       return (configurer != null ? provider.apply(configurer) : null);
     };
   }
 }           

使用的線程池:

  • 首先在容器中通過類型查找AsyncConfigurer Bean
  • 如果沒有則設定預設的AsyncConfigurer::getAsyncExecutor 該方法是接口中預設方法,傳回的是null。
  • 在上一步中如果容器中沒有AsyncConfigurer,那麼設定到AsyncAnnotationBeanPostProcessor中也将就是null。
  • 初始化AsyncAnnotationBeanPostProcessor
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
   @Nullable
   private Supplier<Executor> executor;
   @Nullable
   private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
   @Override
   public void setBeanFactory(BeanFactory beanFactory) {
     super.setBeanFactory(beanFactory);
     // 建構切面Advisor
     AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
     if (this.asyncAnnotationType != null) {
       advisor.setAsyncAnnotationType(this.asyncAnnotationType);
     }
     advisor.setBeanFactory(beanFactory);
     this.advisor = advisor;
   }
 }
 public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
   public AsyncAnnotationAdvisor(
       @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
 
     Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
     asyncAnnotationTypes.add(Async.class);
     try {
       asyncAnnotationTypes.add((Class<? extends Annotation>)
           ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
     }
     // 建構通知類
     this.advice = buildAdvice(executor, exceptionHandler);
     this.pointcut = buildPointcut(asyncAnnotationTypes);
   }
   protected Advice buildAdvice(
       @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
 
     AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
     // 調用父類方法
     interceptor.configure(executor, exceptionHandler);
     return interceptor;
   }
 }
 public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware { 
   public void configure(@Nullable Supplier<Executor> defaultExecutor,
       @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
 
     // 如果defaultExecutor則調用getDefaultExecutor方法,該方法在子類重寫了
     this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
     this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
   }
   protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
     if (beanFactory != null) {
       try {
         // 容器中查找TaskExecutor類型的Bean
         return beanFactory.getBean(TaskExecutor.class);
       } catch (NoUniqueBeanDefinitionException ex) {
         try {
           return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
         } catch (NoSuchBeanDefinitionException ex2) {
         }
       } catch (NoSuchBeanDefinitionException ex) {
         try {
           return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
         } catch (NoSuchBeanDefinitionException ex2) {
         }
         // Giving up -> either using local default executor or none at all...
       }
     }
     return null;
   }
 }
 public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
   protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
     // 先通過父類擷取
     Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
     // 如果父類擷取不到,則建立預設的SimpleAsyncTaskExecutor
     return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
   }
 }           

總結:

  1. 從容器中查找TaskExecutor Bean
  2. 上一步沒有找到,則查找beanName=taskExecutor,類型為java.util.concurrent.Executor的Bean
  3. 如果上一步還是沒有找到,那麼最終建立預設的SimpleAsyncTaskExecutor 這是個沒有上限的線程池,來一個任務建立新線程。

如果執行的異步任務很多且線程池,線程有限則多的任務會等待。

3.3 Web異步接口

RequestMappingHandlerAdapter

public class RequestMappingHandlerAdapter {
   // 預設是一個沒有上限的線程池
   private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("MvcAsync");
   protected ModelAndView invokeHandlerMethod(
     HttpServletRequest request,
     HttpServletResponse response, 
     HandlerMethod handlerMethod) throws Exception {
     // ...
     WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
     asyncManager.setTaskExecutor(this.taskExecutor);
     // ...
   }
 }           

建立RequestMappingHandlerAdapter Bean對象

繼承關系

public static class EnableWebMvcConfiguration extends DelegatingWebMvcConfiguration{}
 public class DelegatingWebMvcConfiguration extends WebMvcConfigurationSupport {
   private final WebMvcConfigurerComposite configurers = new WebMvcConfigurerComposite();
   // WebMvcConfigurer預設實作
   @Autowired(required = false)
   public void setConfigurers(List<WebMvcConfigurer> configurers) {
     if (!CollectionUtils.isEmpty(configurers)) {
       this.configurers.addWebMvcConfigurers(configurers);
     }
   }
   @Override
   protected void configureAsyncSupport(AsyncSupportConfigurer configurer) {
     this.configurers.configureAsyncSupport(configurer);
   }
 }
 public class WebMvcConfigurationSupport implements ApplicationContextAware, ServletContextAware {
   private AsyncSupportConfigurer asyncSupportConfigurer;
   @Bean
   public RequestMappingHandlerAdapter requestMappingHandlerAdapter(...) {
     // ...
     AsyncSupportConfigurer configurer = getAsyncSupportConfigurer();
     if (configurer.getTaskExecutor() != null) {
       adapter.setTaskExecutor(configurer.getTaskExecutor());
     }
     // ...
   }
   protected AsyncSupportConfigurer getAsyncSupportConfigurer() {
     if (this.asyncSupportConfigurer == null) {
       this.asyncSupportConfigurer = new AsyncSupportConfigurer();
       configureAsyncSupport(this.asyncSupportConfigurer);
     }
     return this.asyncSupportConfigurer;
   }
 }
 public class WebMvcAutoConfiguration {
   public static class WebMvcAutoConfigurationAdapter implements WebMvcConfigurer, ServletContextAware {
     public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
       // 判斷容器中是否有beanName = applicationTaskExecutor 的Bean
       if (this.beanFactory.containsBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)) {
         // 通過beanName = applicationTaskExecutor擷取bean對象
         Object taskExecutor = this.beanFactory.getBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);
         // 判斷是否AsyncTaskExecutor對象
         if (taskExecutor instanceof AsyncTaskExecutor) {
           configurer.setTaskExecutor(((AsyncTaskExecutor) taskExecutor));
         }
       }
       Duration timeout = this.mvcProperties.getAsync().getRequestTimeout();
       if (timeout != null) {
         configurer.setDefaultTimeout(timeout.toMillis());
       }
     }
   }
 }           

總結:

  1. 預設使用SimpleAsyncTaskExecutor
  2. 如果容器中存在以beanName = applicationTaskExecutor 且 類型是 AsyncTaskExecutor, 則使用該bean。

到這你應該知道了這三者線上程池方面該如何正确配置及使用了。

完畢!!!

關注+轉發

SpringBoot對Spring MVC都做了哪些事?(一)

SpringBoot對Spring MVC都做了哪些事?(二)

SpringBoot對Spring MVC都做了哪些事?(三)

SpringBoot對Spring MVC都做了哪些事?(四)

Spring Retry重試架構的應用

spring data jpa 進階應用

Spring任務排程&amp;異步任務&amp;Web異步請求三者如何配置線程池?
Spring任務排程&amp;異步任務&amp;Web異步請求三者如何配置線程池?

繼續閱讀