天天看点

异步调用Async的使用基础案例,同步调用执行修改为异步执行downloadDoc(无返回值)接下来使用异步执行verfication(有返回值)线程池名字指定线程池统一异常捕获组件开发

基础案例,同步调用执行

1. 新建springboot项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.sync</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.7.RELEASE</version>
                <configuration>
                    <mainClass>com.sync.demo.DemoApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
           

2. 代码结构

异步调用Async的使用基础案例,同步调用执行修改为异步执行downloadDoc(无返回值)接下来使用异步执行verfication(有返回值)线程池名字指定线程池统一异常捕获组件开发

3. 模仿业务代码

@RestController
public class OnBoradingController {
    @Autowired
    private OnBoardingService service;
    @RequestMapping("/b")
    public boolean onBoarding(){
        System.out.printf("controller线程%s-%s处理\n",Thread.currentThread().getId(),Thread.currentThread().getName());
        return service.onBoard();
    }
}
           
@Service
public class OnBoardingService {
    @Autowired
    private DownService service;

    public boolean onBoard(){
        System.out.printf("开户成功\n");
        System.out.printf("进入下一阶段\n");
        service.downloadDoc();
        System.out.printf("我先完成了\n");
        return true;
    }
}
           
@Service
public class DownService {
    
    void downloadDoc() {
        System.out.printf("doc线程%s-%s处理\n",Thread.currentThread().getId(),Thread.currentThread().getName());
        System.out.printf("开始处理文档%tT%n",new Date());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("处理文档完成%tT%n",new Date());
    }
    
    Future<String> Verification(){
        System.out.printf("doc线程%s-%s处理\n",Thread.currentThread().getId(),Thread.currentThread().getName());
        System.out.printf("开始验证请求%tT%n",new Date());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("验证通过%tT%n",new Date());
        return new AsyncResult<>("636-ghd13-13sa");
    }
}
           

4. 启动类

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
           

修改为异步执行downloadDoc(无返回值)

5. 启动类添加支持异步注解

@SpringBootApplication
@EnableAsync
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
           

6. 方法添加异步注解

@Async
void downloadDoc() {
    ...
}
           
  • 异步方法不能定义为static类型
  • 调用方法和异步方法不能定义在同一个类中

此时总共输出两个不同线程id和名称,是异步调用执行

接下来使用异步执行verfication(有返回值)

7. 方法添加异步注解

@Async
Future<String> Verification(){
    ...
}
           

8. 调用

public boolean onBoard(){
    System.out.printf("开户成功\n");
    Future<String> verification = service.Verification();
    //既然需要结果,那我就要一般阻塞等待结果,和同步差不多都要等待
    while (!verification.isDone()){}
    try {
        System.out.printf("验证成功id--%s\n",verification.get());
    } catch (Exception e) {
    }
    System.out.printf("进入下一阶段\n");
    service.downloadDoc();
    System.out.printf("我先完成了\n");
    return true;
}
           

线程池

9. 添加线程池配置

默认线程池不重用线程,是假的线程池

@Configuration
public class AscyConfig implements AsyncConfigurer {
    /**
     * 配置线程池,减少在调用每个异步方法时创建和销毁线程所需的时间
     */
    @Override
    public Executor getAsyncExecutor() {
        // 初始化Spring框架提供的线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(3);
        // 最大线程数
        executor.setMaxPoolSize(3);
        // 任务等待队列大小
        executor.setQueueCapacity(10);
        // 任务拒绝策略,如果线程池拒绝接受任务,使用调用线程执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 定义线程名称前缀
        executor.setThreadNamePrefix("async-executor-");
        // 调用线程池初始化方法,如果在getAsyncExecutor()加上了@Bean注解,这个方法可以不调用,因为ThreadPoolTaskExecutor实现了InitializingBean接口,Spring在初始化Bean时会调用InitializingBean.afterPropertiesSet()
        executor.initialize();
        return executor;
    }
}
           

多次请求可发现线程池里三个线程轮流处理

名字指定线程池

10. 多个线程池

@Bean("asyncPool")
@Override
public Executor getAsyncExecutor() {
    ...
}

@Bean("asyncPool2")
public Executor asyncPool2() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    return executor;
}

//默认调用该名称异步线程池
@Bean("taskExecutor")
public Executor asyncPool3() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    return executor;
}
           
//指定线程池名称
@Async("asyncPool2")
Future<String> Verification(){...}
           

当项目中存在多个@Async且仅部分指定了异步线程池时,对于没指定线程池的异步方法使用规则优先级测试如下

  1. 重写AsyncConfigurer的getAsyncExecutor返回的Excutor
  2. 查找默认使用名称@Bean(“taskExecutor”)
  3. 使用原始默认SimpleAsyncTaskExecutor(SimpleAsyncTaskExecutor不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程)

统一异常捕获

  • 对于方法返回值是Futrue的异步方法:

    a) 一种是在调用future的get时捕获异常;

    b) 在异常方法中直接捕获异常

  • 对于返回值是void的异步方法:通过AsyncUncaughtExceptionHandler处理异常

11. 添加返回值void的异常捕获

@Configuration
public class AscyConfig implements AsyncConfigurer {
    /**
     * void返回值异步方法异常捕获处理
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }

    /**
     * 异常捕获处理类
     */
    public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            System.out.println(ex);
        }
    }
}
           

组件开发

打包成组件给其他项目依赖

12. 添加属性类,让线程池参数能从yml读取

@ConfigurationProperties(prefix = "spring-async.async-thread-pool")
public class AsyncThreadPoolProperties {
    /**
     * 是否启动异步线程池,默认 false
     */
    private boolean enable;
    /**
     * 核心线程数,默认:Java虚拟机可用线程数
     */
    private Integer corePoolSize;
    /**
     * 线程池最大线程数,默认:40000
     */
    private Integer maxPoolSize;
    /**
     * 线程队列最大线程数,默认:80000
     */
    private Integer queueCapacity;
    /**
     * 自定义线程名前缀,默认:Async-ThreadPool-
     */
    private String threadNamePrefix;
    /**
     * 线程池中线程最大空闲时间,默认:60,单位:秒
     */
    private Integer keepAliveSeconds = 60;
    /**
     * 核心线程是否允许超时,默认false
     */
    private boolean allowCoreThreadTimeOut;
    /**
     * IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
     */
    private boolean waitForTasksToCompleteOnShutdown;
    /**
     * 阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
     */
    private int awaitTerminationSeconds = 10;
    
    ...get/set.....
}
           

13. 对应的yml

# 应用名称
spring:
  application:
    name: asyncDemo
    
# 应用服务 WEB 访问端口
server:
  port: 8080

#异步线程池
#异步线程池组件开关,默认false
springAsync:
  asyncThreadPool:
    enable: true
  #核心线程数,默认:Java虚拟机可用线程数
    core-pool-size: 4
  #线程池最大线程数,默认:40000
    max-pool-size: 40000
  #线程队列最大线程数,默认:80000
    queue-capacity: 80000
  #自定义线程名前缀,默认:Async-ThreadPool-
    thread-name-prefix: Async-ThreadPool-acviti-
  #线程池中线程最大空闲时间,默认:60,单位:秒
    keep-alive-seconds: 60
  #核心线程是否允许超时,默认false
    allow-core-thread-time-out: false
  #IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
    wait-for-tasks-to-complete-on-shutdown : false
  #阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
    await-termination-seconds: 10
           

14. 修改配置类,读取属性文件和bean加载条件

@Configuration
@EnableConfigurationProperties(AsyncThreadPoolProperties.class)
@ConditionalOnProperty(prefix = "spring-async.async-thread-pool", name = "enable", havingValue = "true", matchIfMissing = false)
public class AscyConfig implements AsyncConfigurer {

    @Autowired
    private AsyncThreadPoolProperties asyncThreadPoolProperties;
    /**
     * 配置线程池,减少在调用每个异步方法时创建和销毁线程所需的时间
     */
    @Bean("asyncPool")
    @Override
    public Executor getAsyncExecutor() {
        //Java虚拟机可用的处理器数
        int processors = Runtime.getRuntime().availableProcessors();
        // 初始化Spring框架提供的线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(Objects.nonNull(asyncThreadPoolProperties.getCorePoolSize()) ? asyncThreadPoolProperties.getCorePoolSize() : processors);
        // 最大线程数
        executor.setMaxPoolSize(Objects.nonNull(asyncThreadPoolProperties.getCorePoolSize()) ? asyncThreadPoolProperties.getCorePoolSize() : processors);
        // 任务等待队列大小
        executor.setQueueCapacity(10);
        // 任务拒绝策略,如果线程池拒绝接受任务,使用调用线程执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 定义线程名称前缀
        executor.setThreadNamePrefix(StringUtils.isEmpty(asyncThreadPoolProperties.getThreadNamePrefix()) ? "Async-ThreadPool-" : asyncThreadPoolProperties.getThreadNamePrefix());
        // 调用线程池初始化方法,如果在getAsyncExecutor()加上了@Bean注解,这个方法可以不调用,因为ThreadPoolTaskExecutor实现了InitializingBean接口,Spring在初始化Bean时会调用InitializingBean.afterPropertiesSet()
        // executor.initialize();
        return executor;
    }
    ......
}
           

15. 新建META-INF/spring.factories文件,目的是告诉springboot装载这个bean,这样打包后被其他项目依赖时其它项目会加载这个bean

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.sync.demo.config.AscyConfig
           

16. 将@EnableAsync移动到配置类,这样其它项目就可以不用添加该注解

@EnableAsync
@Configuration
@EnableConfigurationProperties(AsyncThreadPoolProperties.class)
@ConditionalOnProperty(prefix = "spring-async.async-thread-pool", name = "enable", havingValue = "true", matchIfMissing = false)
public class AscyConfig implements AsyncConfigurer {...}
           

17. 修改打包插件classifier或skip**

  • 未修改参数时,springboot打包出的jar是包含依赖jar可执行的jar包
  • class会被复制到新建的BOOT-INF/classes下且在MANIFEST.MF指定该位置
Manifest-Version: 1.0
Spring-Boot-Classpath-Index: BOOT-INF/classpath.idx
Archiver-Version: Plexus Archiver
Built-By: Admin
Start-Class: com.sync.demo.DemoApplication
Spring-Boot-Classes: BOOT-INF/classes/
Spring-Boot-Lib: BOOT-INF/lib/
Spring-Boot-Version: 2.3.7.RELEASE
Created-By: Apache Maven 3.6.3
Build-Jdk: 1.8.0_40
Main-Class: org.springframework.boot.loader.JarLauncher
           
  • 如果依赖该jar包会导致找不到class
  • 能被依赖的jar包是被重命名的demo-0.0.1-SNAPSHOT.jar.original
    异步调用Async的使用基础案例,同步调用执行修改为异步执行downloadDoc(无返回值)接下来使用异步执行verfication(有返回值)线程池名字指定线程池统一异常捕获组件开发
<!-- 打包后的jar就是原始的可依赖jar,不再重命名为后缀.jar.original-->
<skip>true</skip>
           
<!-- 打包后的jar就是原始的可依赖jar,可执行jar被重命名,名字拼接上exec-->
<classifier>exec</classifier>
           
<plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.7.RELEASE</version>
                <configuration>
                    <mainClass>com.sync.demo.DemoApplication</mainClass>
<!--                    <classifier><skip>留一个就行                    -->
<!--                    <classifier>exec</classifier>                  -->
                        <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
           

18. install到maven库后,另起一个springboot项目

<!-- 添加打包时的依赖坐标 -->
<dependency>
    <groupId>com.sync</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>
           

19. 配置enable: true,即可在项目直接使用@Async异步线程池

springAsync:
  asyncThreadPool:
    enable: true
    #自定义线程名前缀,默认:Async-ThreadPool-
    thread-name-prefix: Async-ThreadPool-acviti-wohaha
           

20. junit测试时, 有时需要关闭异步, 便于测试, 但是yml里的开关是全局的, 此时可以使用@Profile和@ActiveProfiles

@Profile("!non-async")
@EnableAsync
@Configuration
@EnableConfigurationProperties(AsyncThreadPoolProperties.class)
@ConditionalOnProperty(prefix = "spring-async.async-thread-pool", name = "enable", havingValue = "true", matchIfMissing = false)
public class AscyConfig implements AsyncConfigurer {}
           
@SpringBootTest
@ActiveProfiles({"test", "non-async"})
public class test{}