天天看點

SpringBoot多線程使用(@Async)

作者:北風濁酒
SpringBoot多線程使用(@Async)

1.建立自己的線程池

其中taskExecutor1,taskExecutor2,taskExecutor3使用ThreadPoolTaskExecutor定義一個可以供異步使用的線程池,ThreadPoolTaskExecutor是對ThreadPoolExecutor的封裝,也可以使用JUC包下的ThreadPoolExecutor定義一個線程池(如taskExecutor4,taskExecutor5),傳回一個Executor或者ThreadPoolExecutor即可,這樣便在異步任務中指定對應的線程池Bean名稱便可使用該線程池。此處建議需要設定線程池的名稱,有利于排查問題。

package com.cxf.cxfasynctask.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

@Configuration
public class ThreadPoolConfig {


    @Bean("taskExecutor1")
    public Executor taskExecutor1() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //設定線程池參數資訊
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("taskExecutor1-");
        //置線程池關閉的時候等待所有任務都完成再繼續銷毀其他的Bean,這樣這些異步任務的銷毀就會先于Redis線程池的銷毀
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 設定線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確定應用最後能夠被關閉,而不是阻塞住
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒絕政策為使用目前線程執行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化線程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("taskExecutor2")
    public Executor taskExecutor2() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //設定線程池參數資訊
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("taskExecutor2-");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒絕政策為使用目前線程執行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化線程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("taskExecutor3")
    public Executor taskExecutor3() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //設定線程池參數資訊
        taskExecutor.setCorePoolSize(2);
        taskExecutor.setMaxPoolSize(2);
        taskExecutor.setQueueCapacity(1);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("taskExecutor3-");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒絕政策為使用目前線程執行
        //taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化線程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("taskExecutor4")
    public Executor taskExecutor4() {
        ThreadFactory springThreadFactory =
                new CustomizableThreadFactory("taskExecutor4-");
        ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(
                2,
                2,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                springThreadFactory
        );
        return taskExecutor;
    }

    @Bean("taskExecutor5")
    public ThreadPoolExecutor taskExecutor5() {
        ThreadFactory springThreadFactory =
                new CustomizableThreadFactory("taskExecutor4-");
        ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(
                2,
                2,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                springThreadFactory
        );
        return taskExecutor;
    }

}
           

2.建立異步的方法

建立一個異步的Service在該Service中使用@Async("taskExecutor1")即可指定自己定義的線程池,taskExecutor1為上一步定義的線程池名稱,該異步方法有兩個傳回值可以傳回void和Future<T>,此處的異步處理邏輯相當于一個runnable或者callable

package com.cxf.cxfasynctask.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Executor;
import java.util.concurrent.Future;

@Service
public class AsyncService {

    private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);

    @Async("taskExecutor1")
    public void dealTask1(){
        logger.info("dealTask1...  start");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("dealTask1...  end" );
    }

    @Async("taskExecutor2")
    public Future<String> dealTask2(){
        logger.info("dealTask2...  start");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("dealTask2...  end" );
        return new AsyncResult<String>("dealTask2");
    }

    @Async("taskExecutor3")
    public void dealTask3(int i){
        logger.info("dealTask3...  start ..."+i);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("dealTask3...  end..."+i);
    }


    @Async("taskExecutor4")
    public void dealTask4(int i){
        logger.info("dealTask4...  start ..."+i);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("dealTask4...  end..."+i);
    }


    public void dealTask5(int i){
        logger.info("dealTask4...  start ..."+i);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("dealTask4...  end..."+i);
    }


}
           

3.調用controller

在controller中調用上一步定義的異步方法,一種是直接注入異步service調用電定義好的異步方法,一種是直接使用第一步定義的線程池建立一個匿名runnable或callable調用非@Async("taskExecutor4")定義的方法,也可以對第一步定義的線程池進行封裝元件然後使用,如第四步。

package com.cxf.cxfasynctask.controller;

import com.cxf.cxfasynctask.component.AsyncComponent;
import com.cxf.cxfasynctask.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

@RestController
@RequestMapping("/api/async")
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @Autowired
    private Executor taskExecutor4;

    @Autowired
    private AsyncComponent asyncComponent;

    @GetMapping("/sendAsync1")
    public String sendAsync1(){
        asyncService.dealTask1();
        return "succ";
    }

    @GetMapping("/sendAsync2")
    public String sendAsync2(){
        Future<String> stringFuture = asyncService.dealTask2();
        String res = "";
        try {
            res = stringFuture.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "succ"+res;
    }

    @GetMapping("/sendAsync3")
    public String sendAsync3() {
        int num = new Random().nextInt(1000);
        try {
            asyncService.dealTask3(num);
        } catch (Exception e) {
            return "送出失敗..." + num;
        }
        return "succ..." + num;
    }

    @GetMapping("/sendAsync4")
    public String sendAsync4() {
        int num = new Random().nextInt(1000);
        try {
            asyncService.dealTask4(num);
        } catch (Exception e) {
            return "送出失敗..." + num;
        }
        return "succ..." + num;
    }

    @GetMapping("/sendAsync5")
    public String sendAsync5() {
        int num = new Random().nextInt(1000);
        try {
            taskExecutor4.execute(() -> {
                    asyncService.dealTask5(num);
            });

        } catch (Exception e) {
            return "送出失敗..." + num;
        }
        return "succ..." + num;
    }

    @GetMapping("/sendAsync6")
    public String sendAsync6() {
        int num = new Random().nextInt(1000);
        try {
            asyncComponent.submit(() -> {
                asyncService.dealTask5(num);
            });

        } catch (Exception e) {
            return "送出失敗..." + num;
        }
        return "succ..." + num;
    }


}
           

4.可以自定義自己的線程池送出邏輯

對第一步定義的線程池進行進一步的封裝,可以添加自己的邏輯,實作自己的線程池邏輯。

package com.cxf.cxfasynctask.component;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadPoolExecutor;


@Component
public class AsyncComponent {


    @Autowired
    private ThreadPoolExecutor taskExecutor5;


    public void submit(Runnable runnable){
        taskExecutor5.submit(runnable);
    }

}
           

5.啟動類加 @EnableAsync

在啟動類上添加啟動異步任務的注解@EnableAsync

package com.cxf.cxfasynctask;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class CxfAsyncTaskApplication {

    public static void main(String[] args) {
        SpringApplication.run(CxfAsyncTaskApplication.class, args);
    }

}           

6.項目目錄結構

SpringBoot多線程使用(@Async)