天天看點

SpringBoot 異步程式設計指南

通過本文你可以了解到下面這些知識點:

  1. Future 模式介紹以及核心思想
  2. 核心線程數、最大線程數的差別,隊列容量代表什麼;
  3. ThreadPoolTaskExecutor

     飽和政策;
  4. SpringBoot 異步程式設計實戰,搞懂代碼的執行邏輯。

Future 模式 

異步程式設計在處理耗時操作以及多任務處理的場景下非常有用,我們可以更好的讓我們的系統利用好機器的 CPU 和 記憶體,提高它們的使用率。多線程設計模式有很多種,Future模式是多線程開發中非常常見的一種設計模式,本文也是基于這種模式來說明 SpringBoot 對于異步程式設計的知識。

實戰之前我先簡單介紹一下 Future 模式的核心思想 吧!。

Future 模式的核心思想是 異步調用 。當我們執行一個方法時,假如這個方法中有多個耗時的任務需要同時去做,而且又不着急等待這個結果時可以讓用戶端立即傳回然後,背景慢慢去計算任務。當然你也可以選擇等這些任務都執行完了,再傳回給用戶端。這個在 Java 中都有很好的支援,我在後面的示例程式中會詳細對比這兩種方式的差別。

SpringBoot 異步程式設計實戰

如果我們需要在 SpringBoot 實作異步程式設計的話,通過 Spring 提供的兩個注解會讓這件事情變的非常簡單。

  1. @EnableAsync

    :通過在配置類或者Main類上加@EnableAsync開啟對異步方法的支援。
  2. @Async

     可以作用在類上或者方法上,作用在類上代表這個類的所有方法都是異步方法。

1. 自定義 TaskExecutor

很多人對于 TaskExecutor 不是太了解,是以我們花一點篇幅先介紹一下這個東西。從名字就能看出它是任務的執行者,它上司執行着線程來處理任務,就像司令官一樣,而我們的線程就好比一隻隻軍隊一樣,這些軍隊可以異步對敵人進行打擊?。

Spring 提供了

TaskExecutor

接口作為任務執行者的抽象,它和

java.util.concurrent

包下的

Executor

接口很像。稍微不同的 

TaskExecutor

接口用到了 Java 8 的文法

@FunctionalInterface

聲明這個接口口是一個函數式接口。

org.springframework.core.task.TaskExecutor

@FunctionalInterface
public interface TaskExecutor extends Executor {
    void execute(Runnable var1);
}
           
SpringBoot 異步程式設計指南

如果沒有自定義Executor, Spring 将建立一個 

SimpleAsyncTaskExecutor

 并使用它。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

  private static final int CORE_POOL_SIZE = 6;
  private static final int MAX_POOL_SIZE = 10;
  private static final int QUEUE_CAPACITY = 100;

  @Bean
  public Executor taskExecutor() {
    // Spring 預設配置是核心線程數大小為1,最大線程容量大小不受限制,隊列容量也不受限制。
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 核心線程數
    executor.setCorePoolSize(CORE_POOL_SIZE);
    // 最大線程數
    executor.setMaxPoolSize(MAX_POOL_SIZE);
    // 隊列大小
    executor.setQueueCapacity(QUEUE_CAPACITY);
    // 當最大池已滿時,此政策保證不會丢失任務請求,但是可能會影響應用程式整體性能。
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
    executor.initialize();
    return executor;
  }
}
           

ThreadPoolTaskExecutor

 常見概念:

  • Core Pool Size : 核心線程數線程數定義了最小可以同時運作的線程數量。
  • Queue Capacity : 當新任務來的時候會先判斷目前運作的線程數量是否達到核心線程數,如果達到的話,信任就會被存放在隊列中。
  • Maximum Pool Size : 當隊列中存放的任務達到隊列容量的時候,目前可以同時運作的線程數量變為最大線程數。

一般情況下不會将隊列大小設為:

Integer.MAX_VALUE

,也不會将核心線程數和最大線程數設為同樣的大小,這樣的話最大線程數的設定都沒什麼意義了,你也無法确定目前 CPU 和記憶體使用率具體情況如何。

如果隊列已滿并且目前同時運作的線程數達到最大線程數的時候,如果再有新任務過來會發生什麼呢?

Spring 預設使用的是 

ThreadPoolExecutor.AbortPolicy

。在Spring的預設情況下,

ThreadPoolExecutor

  将抛出 

RejectedExecutionException

 來拒絕新來的任務 ,這代表你将丢失對這個任務的處理。對于可伸縮的應用程式,建議使用 

ThreadPoolExecutor.CallerRunsPolicy

。當最大池被填滿時,此政策為我們提供可伸縮隊列。

ThreadPoolTaskExecutor

 飽和政策定義:

如果目前同時運作的線程數量達到最大線程數量時,

ThreadPoolTaskExecutor

 定義一些政策:

  • ThreadPoolExecutor.AbortPolicy:抛出 

    RejectedExecutionException

    來拒絕新任務的處理。
  • ThreadPoolExecutor.CallerRunsPolicy:調用執行自己的線程運作任務。您不會任務請求。但是這種政策會降低對于新任務送出速度,影響程式的整體性能。另外,這個政策喜歡增加隊列容量。如果您的應用程式可以承受此延遲并且你不能任務丢棄任何一個任務請求的話,你可以選擇這個政策。
  • ThreadPoolExecutor.DiscardPolicy: 不處理新任務,直接丢棄掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此政策将丢棄最早的未處理的任務請求。

2. 編寫一個異步的方法

下面模拟一個查找對應字元開頭電影的方法,我們給這個方法加上了

@Async

注解來告訴 Spring 它是一個異步的方法。另外,這個方法的傳回值 

CompletableFuture.completedFuture(results)

這代表我們需要傳回結果,也就是說程式必須把任務執行完成之後再傳回給使用者。

請留意

completableFutureTask

方法中的第一行列印日志這句代碼,後面分析程式中會用到,很重要!

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Service
public class AsyncService {

  private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);

  private List<String> movies =
      new ArrayList<>(
          Arrays.asList(
              "Forrest Gump",
              "Titanic",
              "Spirited Away",
              "The Shawshank Redemption",
              "Zootopia",
              "Farewell ",
              "Joker",
              "Crawl"));

  /** 示範使用:找到特定字元/字元串開頭的電影 */
  @Async
  public CompletableFuture<List<String>> completableFutureTask(String start) {
    // 列印日志
    logger.warn(Thread.currentThread().getName() + "start this task!");
    // 找到特定字元/字元串開頭的電影
    List<String> results =
        movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
    // 模拟這是一個耗時的任務
    try {
      Thread.sleep(1000L);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //傳回一個已經用給定值完成的新的CompletableFuture。
    return CompletableFuture.completedFuture(results);
  }
}
           

3. 測試編寫的異步方法

@RestController
@RequestMapping("/async")
public class AsyncController {
  @Autowired 
  AsyncService asyncService;

  @GetMapping("/movies")
  public String completableFutureTask() throws ExecutionException, InterruptedException {
    //開始時間
    long start = System.currentTimeMillis();
    // 開始執行大量的異步任務
    List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
    List<CompletableFuture<List<String>>> completableFutureList =
        words.stream()
            .map(word -> asyncService.completableFutureTask(word))
            .collect(Collectors.toList());
    // CompletableFuture.join()方法可以擷取他們的結果并将結果連接配接起來
    List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
    // 列印結果以及運作程式運作花費時間
    System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
    return results.toString();
  }
}
           

請求這個接口,控制台列印出下面的内容:

2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task!
Elapsed time: 1010
           

首先我們可以看到處理所有任務花費的時間大概是 1 s。這與我們自定義的 

ThreadPoolTaskExecutor

 有關,我們配置的核心線程數是 6 ,然後通過通過下面的代碼模拟配置設定了 6 個任務給系統執行。這樣每個線程都會被配置設定到一個任務,每個任務執行花費時間是 1 s ,是以處理 6 個任務的總花費時間是 1 s。

List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
        words.stream()
            .map(word -> asyncService.completableFutureTask(word))
            .collect(Collectors.toList());
           

你可以自己驗證一下,試着去把核心線程數的數量改為 3 ,再次請求這個接口你會發現處理所有任務花費的時間大概是 2 s。

另外,從上面的運作結果可以看出,當所有任務執行完成之後才傳回結果。這種情況對應于我們需要傳回結果給用戶端請求的情況下,假如我們不需要傳回任務執行結果給用戶端的話呢? 就比如我們上傳一個大檔案到系統,上傳之後隻要大檔案格式符合要求我們就上傳成功。普通情況下我們需要等待檔案上傳完畢再傳回給使用者消息,但是這樣會很慢。采用異步的話,當使用者上傳之後就立馬傳回給使用者消息,然後系統再默默去處理上傳任務。這樣也會增加一點麻煩,因為檔案可能會上傳失敗,是以系統也需要一點機制來補償這個問題,比如當上傳遇到問題的時候,發消息通知使用者。

下面會示範一下用戶端不需要傳回結果的情況:

completableFutureTask

方法變為 void 類型

@Async
public void completableFutureTask(String start) {
  ......
  //這裡可能是系統對任務執行結果的處理,比如存入到資料庫等等......
  //doSomeThingWithResults(results);
}
           

Controller 代碼修改如下:

@GetMapping("/movies")
  public String completableFutureTask() throws ExecutionException, InterruptedException {
    // Start the clock
    long start = System.currentTimeMillis();
    // Kick of multiple, asynchronous lookups
    List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
        words.stream()
            .forEach(word -> asyncService.completableFutureTask(word));
    // Wait until they are all done
    // Print results, including elapsed time
    System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
    return "Done";
  }
           

請求這個接口,控制台列印出下面的内容:

Elapsed time: 0
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task!
           

可以看到系統會直接傳回給使用者結果,然後系統才真正開始執行任務。

參考

  • https://spring.io/guides/gs/async-method/
  • https://medium.com/trendyol-tech/spring-boot-async-executor-management-with-threadpooltaskexecutor-f493903617d

繼續閱讀