CountDownLatch 和 CyclicBarrier 比較相似,都屬于并發的實用工具類。
CyclicBarrier 的源碼分析和使用可以參考:Java并發Concurrent包的鎖(五)——CyclicBarrier源碼分析及使用
JDK 文檔中的定義:
用給定的計數初始化 CountDownLatch。由于調用了 countDown() 方法,是以在目前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都将立即傳回。這種現象隻出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
兩者的差別:
- CountDownLatch 的作用是允許 1 個(或N個)線程等待其他線程完成執行;而 CyclicBarrier 是允許 N 個線程互相等待;
- CountDownLatch 的計數器無法被重置;CyclicBarrier 的計數器可以被重置後使用,是以它是循環的。
下面通過源碼來分析 CountDownLatch 。
Sync類
類中引用了 Sync 類的對象, Sync 類是擴充了 AbstractQueuedSynchronizer 而來:
使用了 AQS 類的 state 值來儲存 count 數量。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
// 構造函數,帶count值參數
Sync(int count) {
setState(count);
}
// 擷取count值
int getCount() {
return getState();
}
// 重寫的方法,如果count為0,傳回1,即所有線程都到位
protected int tryAcquireShared(int acquires) {
return (getState() == ) ? : -;
}
// 重寫的方法,實際是如果有線程到位了,讓count減1
protected boolean tryReleaseShared(int releases) {
// 一直循環
for (;;) {
int c = getState();
if (c == )
return false;
int nextc = c-;
// 嘗試設定state為減1後的值,CAS方法如果為c,才設定新值
if (compareAndSetState(c, nextc))
return nextc == ;
}
}
}
屬性
// 引用了sync類的對象
private final Sync sync;
構造函數
public CountDownLatch(int count) {
if (count < ) throw new IllegalArgumentException("count < 0");
// 線程數量count,調用Sync類的構造函數
this.sync = new Sync(count);
}
重要方法
await
使目前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
這裡我們看調用的 AQS 類中的對應方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < )
doAcquireSharedInterruptibly(arg);
}
這裡的 tryAcquireShared 方法在 sync 類中進行了重寫,如果所有線程都到位了,傳回 1 。如果 count 不為 0 ,說明還有線程沒有到位,需要繼續等待。
countDown
countDown 方法調用 sync 重寫的 releaseShared 方法,有線程到位了就讓未到位的線程數量減 1:
public void countDown() {
sync.releaseShared();
}
使用情景
比如還拿 CyclicBarrier 中的檔案讀取的例子,使用 CountDownLatch 來再重新實作:
屏障類中的那個例子隻有 10 個子線程到達屏障後互相等待,現在要加入一個新的統計線程來等待這10個線程。
public class TestCDLatch {
private static CountDownLatch countDownLatch;
public static class SumThread extends Thread {
@Override
public void run() {
System.out.println("需要等待 " + countDownLatch.getCount() + " 個子線程運作完!");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("其它線程都運作完畢,可以開始統計了");
}
}
public static class CountThread extends Thread {
@Override
public void run() {
System.out.println("子線程" + Thread.currentThread().getName() + "讀完了!");
countDownLatch.countDown();
}
}
public static void main(String[] args) {
countDownLatch = new CountDownLatch();
new SumThread().start();
for(int i=; i<; i++){
new CountThread().start();
}
}
}
運作結果:
需要等待 個子線程運作完!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
其它線程都運作完畢,可以開始統計了