環境:Spring5.3.25
Spring架構通過TaskExecutor接口為任務的異步執行提供了抽象。
TaskExecutor抽象
Executors是線程池概念的JDK名稱。之是以命名為“executor”,是因為無法保證底層實作實際上是一個池。執行器可以是單線程的,甚至可以是同步的。Spring的抽象隐藏了Java SE和Java EE環境之間的實作細節。
Spring的TaskExecutor接口與java.util.concurrent.Executor接口相同。事實上,最初,它存在的主要原因是在使用線程池時抽象出對Java5的需求。該接口有一個單獨的方法(execute(Runnable task)),它接受基于線程池的語義和配置執行的任務。
TaskExecutor最初是為了在需要的地方為其他Spring元件提供線程池的抽象。ApplicationEventMulticaster、JMS的AbstractMessageListenerContainer和Quartz內建等元件都使用TaskExecutor抽象來池化線程。然而,如果你的bean需要線程池行為,你也可以使用這種抽象來滿足自己的需求。
TaskExecutor類型
Spring包括許多預建構的TaskExecutor實作。基本上你不需要自定義實作。Spring提供的實作如下:
- SyncTaskExecutor:此實作不會異步運作調用。相反,每次調用都發生在調用線程中。它主要用于不需要多線程的情況,例如在簡單的測試用例中。
- SimpleAsyncTaskExecutor:此實作不重用任何線程。相反,它為每次調用啟動一個新線程。然而,它确實支援并發限制,該限制會阻止任何超過限制的調用,直到釋放出一個插槽。
- ConcurrentTaskExecutor:這個實作是java.util.concurrent.Executor執行個體的擴充卡。還有一個替代方案(ThreadPoolTaskExecutor),它将Executor配置參數公開為bean屬性。很少需要直接使用ConcurrentTaskExecutor。但是,如果ThreadPoolTaskExecutor不夠靈活,可以使用ConcurrentTaskExecutor。
- ThreadPoolTaskExecutor:這個實作是最常用的。它公開了用于配置java.util.concurrent.ThreadPoolExecutor的bean屬性,并将其包裝在TaskExecutor中。如果你需要适應一種不同的java.util.concurrent.Executor,我們建議你使用ConcurrentTaskExecutor代替。
- WorkManagerTaskExecutor:這個實作使用CommonJ WorkManager作為它的支援服務提供者,它是在Spring應用程式上下文中在WebLogic或WebSphere上設定基于CommonJ的線程池內建的核心友善類。
- DefaultManagedTaskExecutor:此實作在JSR-236相容的運作時環境(如Java EE 7+應用伺服器)中使用jndi獲得的ManagedExecutorService,為此替換了CommonJ WorkManager。
使用TaskExecutor
Spring的TaskExecutor實作被用作簡單的JavaBeans。在以下示例中,我們定義了一個bean,該bean使用ThreadPoolTaskExecutor異步列印出一組消息:
public class TaskExecutorExample {
private class MessagePrinterTask implements Runnable {
private String message;
public MessagePrinterTask(String message) {
this.message = message;
}
public void run() {
System.out.println(Thread.currentThread().getName() + ": " + message);
}
}
private TaskExecutor taskExecutor;
public TaskExecutorExample(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
public void printMessages() {
for(int i = 0; i < 25; i++) {
taskExecutor.execute(new MessagePrinterTask("Message" + i));
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
配置
@Configuration
public class TaskConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心線程數
pool.setMaxPoolSize(10);//最大線程數
pool.setQueueCapacity(25);//線程隊列
pool.initialize();//線程初始化
return pool;
}
@Bean
public TaskExecutorExample taskExecutorExample(TaskExecutor taskExecutor) {
return new TaskExecutorExample(taskExecutor) ;
}
}
AnnotationConfigApplicationContext config = new AnnotationConfigApplicationContext(TaskConfig.class) ;
TaskExecutorExample task = config.getBean(TaskExecutorExample.class) ;
task.printMessages() ;
config.close() ;
運作結果:
taskExecutor-1: Message0
taskExecutor-2: Message1
taskExecutor-3: Message2
taskExecutor-4: Message3
taskExecutor-5: Message4
taskExecutor-1: Message5
taskExecutor-2: Message6
taskExecutor-3: Message7
taskExecutor-4: Message8
taskExecutor-5: Message9
taskExecutor-1: Message10
taskExecutor-2: Message11
...
每次任務執行都在不同的線程中執行
TaskExecutor在事件廣播器的應用
預設情況下事件監聽處理程式是在同步線程中執行的,如果需要在異步線程中執行我們則需要自定義ApplicationEventMulticaster。
預設ApplicationEventMulticaster初始化:
public abstract class AbstractApplicationContext {
public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
public void refresh() {
initApplicationEventMulticaster();
}
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
// 判斷目前BeanFactory容器中是否有以applicationEventMulticaster為名稱的Bean
// 是以我們自定義時也必須使用applicationEventMulticaster為名稱
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
} else {
// 如果沒有自定義,那麼這就是預設的實作,那内部的taskExecutor就會為null。是以預設就是同步執行
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
}
}
}
SimpleApplicationEventMulticaster
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
@Nullable
private Executor taskExecutor;
public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public void multicastEvent(ApplicationEvent event) {
multicastEvent(event, resolveDefaultEventType(event));
}
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 判斷任務執行器是否配置
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
} else {
// 同步調用
invokeListener(listener, event);
}
}
}
}
自定義事件廣播器
// 注意這裡的方法名稱必須與上面提到的一樣(預設Bean的名字就是方法名,當然可以通過@Bean屬性設定)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster(BeanFactory beanFactory, TaskExecutor taskExecutor) {
SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster(beanFactory) ;
multicaster.setTaskExecutor(taskExecutor) ;
return multicaster ;
}
通過上面配置後,咱們的事件監聽器的執行就可以在異步任務執行器中執行。
完畢!!!
SpringBoot對Spring MVC都做了哪些事?(一)
SpringBoot對Spring MVC都做了哪些事?(二)
SpringBoot對Spring MVC都做了哪些事?(三)
SpringBoot對Spring MVC都做了哪些事?(四)
Spring Retry重試架構的應用
spring data jpa 進階應用
Spring是如何解決循環依賴的?
Spring 強大的資料驗證功能
Spring容器這些擴充點你都清楚了嗎?
Spring中的@Configuration注解你真的了解嗎?
Spring事務實作原理源碼分析