springboot線程池結合CountDownLatch使用
使用場景
等待多個子線程并行執行完畢,通知主線程繼續執行任務
1 配置線程池
配置spring boot 線程池
2 多線程調用
2.1 調用代碼
使用 CountDownLatch 統計線程數,實作等待多個子線程執行完畢
@RestController
public class TestController {
private static Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private TestService testService;
@Autowired
private ThreadPoolTaskExecutor ThreadPoolA;
@RequestMapping("/test")
public void test()throws Exception{
logger.info("main->start");
//初始化子線程個數 2
final CountDownLatch latch =new CountDownLatch(2);
ThreadPoolA.execute(() -> {
try {
logger.info("thread1->doWork");
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
logger.info("thread1->end");
//線程數-1
latch.countDown();
}
});
ThreadPoolA.execute(()->{
try {
logger.info("thread2->doWork");
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
logger.info("thread2->end");
//線程數-1
latch.countDown();
}
});
logger.info("main->doWork");
//等待線程執行完畢
latch.await();
logger.info("main->end");
}
}
需擷取線程傳回值時 ThreadPoolA.execute 改為 ThreadPoolA.submit,例如:
//省略
Future<Long> future = ThreadPoolA.submit(() -> {
//省略
return 1L;
});
//省略
latch.await();
//調用future.get()将會在該線程執行完畢後才會往下執行
logger.info(future.get());
為避免線程數統計不正确導緻死鎖,建議使用latch.await(long timeout, TimeUnit unit)
//十分鐘内線程計數未歸零,将自動釋放
latch.await(10, TimeUnit.MINUTES);
2.2 執行結果

2 CountDownLatch 源碼展示
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* 初始化線程個數
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 目前線程等待直到線程數為0
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 目前線程等待直到線程數為0或超過等待時間
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 統計線程個數,如果計數達到零,則釋放所有等待線程
*/
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}