前言
最近需要用到多线程, 自己维护线程池很麻烦, 正好看到 springboot 集成线程池的例子, 这里自己做了个尝试和总结, 记录一下, 也分享给需要的朋友;
不考虑事务的情况下, 这个多线程实现比较简单, 主要有以下几点:
在启动类加上 @EnableAsync 注解, 开启异步执行支持;
编写线程池配置类, 别忘了 @Configuration , 和 @Bean 注解;
编写需要异步执行的业务, 放到单独的类中 (可以定义为 service, 因为需要 spring 管理起来才能用 );
在业务service中调用异步执行的service, 注意这是重点, 不能直接在业务 service 中写异步执行的代码, 否则无法异步执行( 这就是单独放异步代码的原因);
使用步骤
先创建一个线程池的配置,让Spring Boot加载,用来定义如何创建一个ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类
@Configuration
@EnableAsync
public class ExecutorConfig {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
@Value是我配置在application.properties,可以参考配置,自由定义
# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-
创建一个Service接口,是异步线程的接口
public interface AsyncService {
/**
* 执行异步任务
* 可以根据需求,自己加参数拟定,我这里就做个测试演示
*/
void executeAsync();
}
实现类
@Service
public class AsyncServiceImpl implements AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
logger.info("start executeAsync");
System.out.println("异步线程要做的事情");
System.out.println("可以在这里执行批量插入等耗时的事情");
logger.info("end executeAsync");
}
}
将Service层的服务异步化,在executeAsync()方法上增加注解@Async(“asyncServiceExecutor”),asyncServiceExecutor方法是前面ExecutorConfig.java中的方法名,表明executeAsync方法进入的线程池是asyncServiceExecutor方法创建的。
接下来就是在Controller里或者是哪里通过注解@Autowired注入这个Service
@Autowired
private AsyncService asyncService;
@GetMapping("/async")
public void async(){
asyncService.executeAsync();
}
用postmain或者其他工具来多次测试请求一下
通过以上日志可以发现,[async-service-]是有多个线程的,显然已经在我们配置的线程池中执行了,并且每次请求中,controller的起始和结束日志都是连续打印的,表明每次请求都快速响应了,而耗时的操作都留给线程池中的线程去异步执行;
虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
如上所示,showThreadPoolInfo方法中将任务总数、已完成数、活跃线程数,队列大小都打印出来了,然后Override了父类的execute、submit等方法,在里面调用showThreadPoolInfo方法,这样每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中;
修改ExecutorConfig.java的asyncServiceExecutor方法,将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改为ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
//在这里修改
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
再次启动该工程测试
提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了3个任务,完成了3个,当前有0个线程在处理任务,还剩0个任务在队列中等待,线程池的基本情况一路了然;
@Async 多线程获取返回值
// 异步执行代码
@Service("asyncExecutorTest")
public class AsyncExecutorTest {
// 异步执行的方法, 注解内为自定义线程池类名
@Async("asyncServiceExecutor")
public Future<Integer> test1(Integer i) throws InterruptedException {
Thread.sleep(100);
System.out.println("@Async 执行: " + i);
return new AsyncResult(i);
}
// 这里使用其它方式调用,详见后面的 service3 方法
public Integer test2(Integer i) throws InterruptedException {
Thread.sleep(100);
System.out.println(" excute.run 执行: " + i);
return i;
}
}
// 业务 service
@Service("asyncExcutorService")
public class AsyncExcutorService {
@Autowired
AsyncExecutorTest asyncExecutorTest;
@Autowired
Executor localBootAsyncExecutor;
// 测试 无返回值异步执行
public void service1(){
System.out.println("service1 执行----->");
for (int i = 0; i < 50; i++) {
try {
asyncExecutorTest.test1(i);
} catch (InterruptedException e) {
System.out.println("service1执行出错");
}
}
System.out.println("service1 结束----->");
}
// 测试 有返回值异步执行
public void service2(){
long l = System.currentTimeMillis();
System.out.println("service2 执行----->");
List<Future> result = new ArrayList<>();
try {
for (int i = 0; i < 300; i++) {
Future<Integer> integerFuture = asyncExecutorTest.test1(i);
result.add(integerFuture);
}
for (Future future : result) {
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException e) {
System.out.println("service2执行出错");
}
System.out.println("service2 结束----->" + (System.currentTimeMillis() - l));
}
// 测试 有返回值异步执行
public void service3(){
long l = System.currentTimeMillis();
List<Integer> result = new ArrayList<>();
try {
System.out.println("service3 执行----->");
int total = 300;
CountDownLatch latch = new CountDownLatch(total);
for (int i = 0; i < total; i++) {
final int y = i;
localBootAsyncExecutor.execute(() -> {
try {
result.add(asyncExecutorTest.test2(y));
} catch (InterruptedException e) {
System.out.println("service3执行出错");
} finally {
latch.countDown();
}
});
}
latch.await();
} catch (InterruptedException e) {
System.out.println("service3执行出错");
}
System.out.println("service3 结束----->" + (System.currentTimeMillis() - l));
}
}
这里说下 service1 和 service2 的区别:
- 两个都用的是一个线程池执行的
- service1 单纯执行业务, 不用返回数据, 主线程也不用等待
- service2 需要返回数据, 主线程需要等待结果( 注意返回值只能是 Future, 最后再 .get()去获取, 否则无法异步执行)
- service3 也可以返回数据, 但是书写上麻烦一些. 返回值直接是想要的结果, 不像 service2 还需要提取一次数据.