天天看點

從零開始SpringCloud Alibaba實戰(85)——SpringBoot中的線程池

前言

最近需要用到多線程, 自己維護線程池很麻煩, 正好看到 springboot 內建線程池的例子, 這裡自己做了個嘗試和總結, 記錄一下, 也分享給需要的朋友;

不考慮事務的情況下, 這個多線程實作比較簡單, 主要有以下幾點:

在啟動類加上 @EnableAsync 注解, 開啟異步執行支援;

編寫線程池配置類, 别忘了 @Configuration , 和 @Bean 注解;

編寫需要異步執行的業務, 放到單獨的類中 (可以定義為 service, 因為需要 spring 管理起來才能用 );

在業務service中調用異步執行的service, 注意這是重點, 不能直接在業務 service 中寫異步執行的代碼, 否則無法異步執行( 這就是單獨放異步代碼的原因);

使用步驟

先建立一個線程池的配置,讓Spring Boot加載,用來定義如何建立一個ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync這兩個注解,表示這是個配置類,并且是線程池的配置類

@Configuration
@EnableAsync
public class ExecutorConfig {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數
        executor.setCorePoolSize(corePoolSize);
        //配置最大線程數
        executor.setMaxPoolSize(maxPoolSize);
        //配置隊列大小
        executor.setQueueCapacity(queueCapacity);
        //配置線程池中的線程的名稱字首
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }
}

           

@Value是我配置在application.properties,可以參考配置,自由定義

# 異步線程配置
# 配置核心線程數
async.executor.thread.core_pool_size = 5
# 配置最大線程數
async.executor.thread.max_pool_size = 5
# 配置隊列大小
async.executor.thread.queue_capacity = 99999
# 配置線程池中的線程的名稱字首
async.executor.thread.name.prefix = async-service-

           

建立一個Service接口,是異步線程的接口

public interface AsyncService {
    /**
     * 執行異步任務
     * 可以根據需求,自己加參數拟定,我這裡就做個測試示範
     */
    void executeAsync();
}

           

實作類

@Service
public class AsyncServiceImpl implements AsyncService {
    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        logger.info("start executeAsync");

        System.out.println("異步線程要做的事情");
        System.out.println("可以在這裡執行批量插入等耗時的事情");

        logger.info("end executeAsync");
    }
}

           

将Service層的服務異步化,在executeAsync()方法上增加注解@Async(“asyncServiceExecutor”),asyncServiceExecutor方法是前面ExecutorConfig.java中的方法名,表明executeAsync方法進入的線程池是asyncServiceExecutor方法建立的。

接下來就是在Controller裡或者是哪裡通過注解@Autowired注入這個Service

@Autowired
private AsyncService asyncService;

@GetMapping("/async")
public void async(){
    asyncService.executeAsync();
}

           

用postmain或者其他工具來多次測試請求一下

通過以上日志可以發現,[async-service-]是有多個線程的,顯然已經在我們配置的線程池中執行了,并且每次請求中,controller的起始和結束日志都是連續列印的,表明每次請求都快速響應了,而耗時的操作都留給線程池中的線程去異步執行;

雖然我們已經用上了線程池,但是還不清楚線程池當時的情況,有多少線程在執行,多少在隊列中等待呢?這裡我建立了一個ThreadPoolTaskExecutor的子類,在每次送出線程的時候都會将目前線程池的運作狀況列印出來

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;


public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

           

如上所示,showThreadPoolInfo方法中将任務總數、已完成數、活躍線程數,隊列大小都列印出來了,然後Override了父類的execute、submit等方法,在裡面調用showThreadPoolInfo方法,這樣每次有任務被送出到線程池的時候,都會将目前線程池的基本情況列印到日志中;

修改ExecutorConfig.java的asyncServiceExecutor方法,将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改為ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()

@Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        //在這裡修改
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心線程數
        executor.setCorePoolSize(corePoolSize);
        //配置最大線程數
        executor.setMaxPoolSize(maxPoolSize);
        //配置隊列大小
        executor.setQueueCapacity(queueCapacity);
        //配置線程池中的線程的名稱字首
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }

           

再次啟動該工程測試

送出任務到線程池的時候,調用的是submit(Callable task)這個方法,目前已經送出了3個任務,完成了3個,目前有0個線程在處理任務,還剩0個任務在隊列中等待,線程池的基本情況一路了然;

@Async 多線程擷取傳回值

// 異步執行代碼
@Service("asyncExecutorTest")
public class AsyncExecutorTest {
 
    // 異步執行的方法, 注解内為自定義線程池類名
    @Async("asyncServiceExecutor")
    public Future<Integer> test1(Integer i) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("@Async 執行: " + i);
        return new AsyncResult(i);
    }
 
    // 這裡使用其它方式調用,詳見後面的 service3 方法
    public Integer test2(Integer i) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(" excute.run 執行: " + i);
        return i;
    }
}
           
// 業務 service
@Service("asyncExcutorService")
public class AsyncExcutorService {
 
    @Autowired
    AsyncExecutorTest asyncExecutorTest;
 
    @Autowired
    Executor localBootAsyncExecutor;
    
    // 測試 無傳回值異步執行
    public void service1(){
        System.out.println("service1 執行----->");
        for (int i = 0; i < 50; i++) {
            try {
                asyncExecutorTest.test1(i);
            } catch (InterruptedException e) {
                System.out.println("service1執行出錯");
            }
        }
        System.out.println("service1 結束----->");
    }
 
    // 測試 有傳回值異步執行
    public void service2(){
        long l = System.currentTimeMillis();
        System.out.println("service2 執行----->");
        List<Future> result = new ArrayList<>();
        try {
            for (int i = 0; i < 300; i++) {
                Future<Integer> integerFuture = asyncExecutorTest.test1(i);
                result.add(integerFuture);
            }
            for (Future future : result) {
                System.out.println(future.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("service2執行出錯");
        }
        System.out.println("service2 結束----->" + (System.currentTimeMillis() - l));
    }
 
    // 測試 有傳回值異步執行
    public void service3(){
        long l = System.currentTimeMillis();
        List<Integer> result = new ArrayList<>();
        try {
            System.out.println("service3 執行----->");
            int total = 300;
            CountDownLatch latch = new CountDownLatch(total);
            for (int i = 0; i < total; i++) {
                final int y = i;
                localBootAsyncExecutor.execute(() -> {
                    try {
                        result.add(asyncExecutorTest.test2(y));
                    } catch (InterruptedException e) {
                        System.out.println("service3執行出錯");
                    } finally {
                        latch.countDown();
                    }
                });
            }
            latch.await();
        } catch (InterruptedException e) {
            System.out.println("service3執行出錯");
        }
        System.out.println("service3 結束----->" + (System.currentTimeMillis() - l));
    }
}
           

這裡說下 service1 和 service2 的差別:

  1. 兩個都用的是一個線程池執行的
  2. service1 單純執行業務, 不用傳回資料, 主線程也不用等待
  3. service2 需要傳回資料, 主線程需要等待結果( 注意傳回值隻能是 Future, 最後再 .get()去擷取, 否則無法異步執行)
  4. service3 也可以傳回資料, 但是書寫上麻煩一些. 傳回值直接是想要的結果, 不像 service2 還需要提取一次資料.