springboot线程池结合CountDownLatch使用
使用场景
等待多个子线程并行执行完毕,通知主线程继续执行任务
1 配置线程池
配置spring boot 线程池
2 多线程调用
2.1 调用代码
使用 CountDownLatch 统计线程数,实现等待多个子线程执行完毕
@RestController
public class TestController {
private static Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private TestService testService;
@Autowired
private ThreadPoolTaskExecutor ThreadPoolA;
@RequestMapping("/test")
public void test()throws Exception{
logger.info("main->start");
//初始化子线程个数 2
final CountDownLatch latch =new CountDownLatch(2);
ThreadPoolA.execute(() -> {
try {
logger.info("thread1->doWork");
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
logger.info("thread1->end");
//线程数-1
latch.countDown();
}
});
ThreadPoolA.execute(()->{
try {
logger.info("thread2->doWork");
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
logger.info("thread2->end");
//线程数-1
latch.countDown();
}
});
logger.info("main->doWork");
//等待线程执行完毕
latch.await();
logger.info("main->end");
}
}
需获取线程返回值时 ThreadPoolA.execute 改为 ThreadPoolA.submit,例如:
//省略
Future<Long> future = ThreadPoolA.submit(() -> {
//省略
return 1L;
});
//省略
latch.await();
//调用future.get()将会在该线程执行完毕后才会往下执行
logger.info(future.get());
为避免线程数统计不正确导致死锁,建议使用latch.await(long timeout, TimeUnit unit)
//十分钟内线程计数未归零,将自动释放
latch.await(10, TimeUnit.MINUTES);
2.2 执行结果

2 CountDownLatch 源码展示
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* 初始化线程个数
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 当前线程等待直到线程数为0
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 当前线程等待直到线程数为0或超过等待时间
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 统计线程个数,如果计数达到零,则释放所有等待线程
*/
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}