天天看點

Spring異步任務執行詳解

作者:Spring全家桶實戰案例

環境:Spring5.3.25

Spring架構通過TaskExecutor接口為任務的異步執行提供了抽象。

TaskExecutor抽象

Executors是線程池概念的JDK名稱。之是以命名為“executor”,是因為無法保證底層實作實際上是一個池。執行器可以是單線程的,甚至可以是同步的。Spring的抽象隐藏了Java SE和Java EE環境之間的實作細節。

Spring的TaskExecutor接口與java.util.concurrent.Executor接口相同。事實上,最初,它存在的主要原因是在使用線程池時抽象出對Java5的需求。該接口有一個單獨的方法(execute(Runnable task)),它接受基于線程池的語義和配置執行的任務。

TaskExecutor最初是為了在需要的地方為其他Spring元件提供線程池的抽象。ApplicationEventMulticaster、JMS的AbstractMessageListenerContainer和Quartz內建等元件都使用TaskExecutor抽象來池化線程。然而,如果你的bean需要線程池行為,你也可以使用這種抽象來滿足自己的需求。

TaskExecutor類型

Spring包括許多預建構的TaskExecutor實作。基本上你不需要自定義實作。Spring提供的實作如下:

  • SyncTaskExecutor:此實作不會異步運作調用。相反,每次調用都發生在調用線程中。它主要用于不需要多線程的情況,例如在簡單的測試用例中。
  • SimpleAsyncTaskExecutor:此實作不重用任何線程。相反,它為每次調用啟動一個新線程。然而,它确實支援并發限制,該限制會阻止任何超過限制的調用,直到釋放出一個插槽。
  • ConcurrentTaskExecutor:這個實作是java.util.concurrent.Executor執行個體的擴充卡。還有一個替代方案(ThreadPoolTaskExecutor),它将Executor配置參數公開為bean屬性。很少需要直接使用ConcurrentTaskExecutor。但是,如果ThreadPoolTaskExecutor不夠靈活,可以使用ConcurrentTaskExecutor。
  • ThreadPoolTaskExecutor:這個實作是最常用的。它公開了用于配置java.util.concurrent.ThreadPoolExecutor的bean屬性,并将其包裝在TaskExecutor中。如果你需要适應一種不同的java.util.concurrent.Executor,我們建議你使用ConcurrentTaskExecutor代替。
  • WorkManagerTaskExecutor:這個實作使用CommonJ WorkManager作為它的支援服務提供者,它是在Spring應用程式上下文中在WebLogic或WebSphere上設定基于CommonJ的線程池內建的核心友善類。
  • DefaultManagedTaskExecutor:此實作在JSR-236相容的運作時環境(如Java EE 7+應用伺服器)中使用jndi獲得的ManagedExecutorService,為此替換了CommonJ WorkManager。

使用TaskExecutor

Spring的TaskExecutor實作被用作簡單的JavaBeans。在以下示例中,我們定義了一個bean,該bean使用ThreadPoolTaskExecutor異步列印出一組消息:

public class TaskExecutorExample {

  private class MessagePrinterTask implements Runnable {

    private String message;

    public MessagePrinterTask(String message) {
      this.message = message;
    }

    public void run() {
      System.out.println(Thread.currentThread().getName() + ": " + message);
    }
  }

  private TaskExecutor taskExecutor;

  public TaskExecutorExample(TaskExecutor taskExecutor) {
    this.taskExecutor = taskExecutor;
  }

  public void printMessages() {
    for(int i = 0; i < 25; i++) {
      taskExecutor.execute(new MessagePrinterTask("Message" + i));
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}           

配置

@Configuration
public class TaskConfig {
  @Bean
  public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(5);//核心線程數
    pool.setMaxPoolSize(10);//最大線程數
    pool.setQueueCapacity(25);//線程隊列
    pool.initialize();//線程初始化
    return pool;
  }
  @Bean
  public TaskExecutorExample taskExecutorExample(TaskExecutor taskExecutor) {
    return new TaskExecutorExample(taskExecutor) ;
  }
}           
AnnotationConfigApplicationContext config = new AnnotationConfigApplicationContext(TaskConfig.class) ;
    
TaskExecutorExample task = config.getBean(TaskExecutorExample.class) ;
task.printMessages() ;
    
config.close() ;           

運作結果:

taskExecutor-1: Message0
taskExecutor-2: Message1
taskExecutor-3: Message2
taskExecutor-4: Message3
taskExecutor-5: Message4
taskExecutor-1: Message5
taskExecutor-2: Message6
taskExecutor-3: Message7
taskExecutor-4: Message8
taskExecutor-5: Message9
taskExecutor-1: Message10
taskExecutor-2: Message11
...           

每次任務執行都在不同的線程中執行

TaskExecutor在事件廣播器的應用

預設情況下事件監聽處理程式是在同步線程中執行的,如果需要在異步線程中執行我們則需要自定義ApplicationEventMulticaster。

預設ApplicationEventMulticaster初始化:

public abstract class AbstractApplicationContext {
  public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
  public void refresh() {
    initApplicationEventMulticaster();
  }
  protected void initApplicationEventMulticaster() {
    ConfigurableListableBeanFactory beanFactory = getBeanFactory();
    // 判斷目前BeanFactory容器中是否有以applicationEventMulticaster為名稱的Bean
    // 是以我們自定義時也必須使用applicationEventMulticaster為名稱
    if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
      this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
    } else {
      // 如果沒有自定義,那麼這就是預設的實作,那内部的taskExecutor就會為null。是以預設就是同步執行
      this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
      beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
    }
  }
}           

SimpleApplicationEventMulticaster

public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {

  @Nullable
  private Executor taskExecutor;
  public void setTaskExecutor(@Nullable Executor taskExecutor) {
    this.taskExecutor = taskExecutor;
  }
  @Override
  public void multicastEvent(ApplicationEvent event) {
    multicastEvent(event, resolveDefaultEventType(event));
  }

  @Override
  public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    Executor executor = getTaskExecutor();
    for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
      // 判斷任務執行器是否配置
      if (executor != null) {
        executor.execute(() -> invokeListener(listener, event));
      } else {
        // 同步調用
        invokeListener(listener, event);
      }
    }
  }
}           

自定義事件廣播器

// 注意這裡的方法名稱必須與上面提到的一樣(預設Bean的名字就是方法名,當然可以通過@Bean屬性設定)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster(BeanFactory beanFactory, TaskExecutor taskExecutor) {
  SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster(beanFactory) ;
  multicaster.setTaskExecutor(taskExecutor) ;
  return multicaster ;
}           

通過上面配置後,咱們的事件監聽器的執行就可以在異步任務執行器中執行。

完畢!!!

SpringBoot對Spring MVC都做了哪些事?(一)

SpringBoot對Spring MVC都做了哪些事?(二)

SpringBoot對Spring MVC都做了哪些事?(三)

SpringBoot對Spring MVC都做了哪些事?(四)

Spring Retry重試架構的應用

spring data jpa 進階應用

Spring是如何解決循環依賴的?

Spring 強大的資料驗證功能

Spring容器這些擴充點你都清楚了嗎?

Spring中的@Configuration注解你真的了解嗎?

Spring事務實作原理源碼分析

Spring異步任務執行詳解
Spring異步任務執行詳解