概要
前面對”獨占鎖”和”共享鎖”有了個大緻的了解;本章,我們對
CountDownLatch
進行學習。和ReadWriteLock.ReadLock一樣,
CountDownLatch
的本質也是一個”共享鎖”。本章的内容包括:
- CountDownLatch簡介
- CountDownLatch資料結構
- CountDownLatch源碼分析(基于JDK1.7.0_40)
- CountDownLatch示例
CountDownLatch簡介
CountDownLatch
是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
CountDownLatch和CyclicBarrier的差別
- CountDownLatch的作用是允許1或N個線程等待其他線程完成執行;而CyclicBarrier則是允許N個線程互相等待。
- CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用,是以它被稱為是循環的barrier。
關于CyclicBarrier的原理,後面一章再來學習。
CountDownLatch函數清單
CountDownLatch(int count)
構造一個用給定計數初始化的 CountDownLatch。
// 使目前線程在鎖計數器倒計數至零之前一直等待,除非線程被中斷。
void await()
// 使目前線程在鎖計數器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。
boolean await(long timeout, TimeUnit unit)
// 遞減鎖計數器的計數,如果計數到達零,則釋放所有等待的線程。
void countDown()
// 傳回目前計數。
long getCount()
// 傳回辨別此鎖計數器及其狀态的字元串。
String toString()
CountDownLatch資料結構
CountDownLatch的UML類圖如下:

CountDownLatch
的資料結構很簡單,它是通過”共享鎖”實作的。它包含了sync對象,sync是Sync類型。Sync是執行個體類,它繼承于AQS。
CountDownLatch源碼分析(基于JDK1.7.0_40)
CountDownLatch完整源碼(基于JDK1.7.0_40)
CountDownLatch
是通過“共享鎖”實作的。下面,我們分析
CountDownLatch
中3個核心函數: CountDownLatch(int count), await(), countDown()。
CountDownLatch(int count)
public CountDownLatch(int count) {
if (count < ) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
說明:該函數是建立一個Sync對象,而Sync是繼承于AQS類。Sync構造函數如下:
Sync(int count) {
setState(count);
}
setState()在AQS中實作,源碼如下:
protected final void setState(long newState) {
state = newState;
}
說明:在AQS中,
state
是一個
private volatile long
類型的對象。對于CountDownLatch而言,state表示”鎖計數器“。CountDownLatch中的getCount()最終是調用AQS中的getState(),傳回的state對象,即”鎖計數器“。
await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
說明:該函數實際上是調用的AQS的
acquireSharedInterruptibly(1)
;
AQS中的acquireSharedInterruptibly()的源碼如下:
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < )
doAcquireSharedInterruptibly(arg);
}
說明:
acquireSharedInterruptibly()
的作用是擷取共享鎖。
如果目前線程是中斷狀态,則抛出異常InterruptedException。否則,調用
tryAcquireShared(arg)
嘗試擷取共享鎖;嘗試成功則傳回,否則就調用
doAcquireSharedInterruptibly()
。doAcquireSharedInterruptibly()會使目前線程一直等待,直到目前線程擷取到共享鎖(或被中斷)才傳回。
tryAcquireShared()
在CountDownLatch.java中被重寫,它的源碼如下:
protected int tryAcquireShared(int acquires) {
return (getState() == ) ? : -;
}
說明:
tryAcquireShared()
的作用是嘗試擷取共享鎖。如果”鎖計數器=0”,即鎖是可擷取狀态,則傳回1;否則,鎖是不可擷取狀态,則傳回-1。
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 建立"目前線程"的Node節點,且Node中記錄的鎖是"共享鎖"類型;并将該節點添加到CLH隊列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 擷取上一個節點。
// 如果上一節點是CLH隊列的表頭,則"嘗試擷取共享鎖"。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= ) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (上一節點不是CLH隊列的表頭) 目前線程一直等待,直到擷取到共享鎖。
// 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
說明:
- addWaiter(Node.SHARED)的作用是,建立”目前線程“的Node節點,且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);并将該節點添加到CLH隊列末尾。關于Node和CLH在”Java多線程系列–“JUC鎖”03之 公平鎖(一)”已經詳細介紹過,這裡就不再重複說明了;
- node.predecessor()的作用是,擷取上一個節點。如果上一節點是CLH隊列的表頭,則”嘗試擷取共享鎖“;
- shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試擷取鎖失敗之後,線程應該等待,則傳回true;否則,傳回false;
- 當shouldParkAfterFailedAcquire()傳回ture時,則調用parkAndCheckInterrupt(),目前線程會進入等待狀态,直到擷取到共享鎖才繼續運作。
-
中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函數在”Java多線程系列–“JUC鎖”03之 公平鎖(一)”中介紹過,這裡也就不再詳細說明了。doAcquireSharedInterruptibly()
countDown()
public void countDown() {
sync.releaseShared();
}
說明:該函數實際上調用
releaseShared(1)
釋放共享鎖。
releaseShared()
在AQS中實作,源碼如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 擷取“鎖計數器”的狀态
int c = getState();
if (c == )
return false;
// “鎖計數器”-1
int nextc = c-;
// 通過CAS函數進行指派。
if (compareAndSetState(c, nextc))
return nextc == ;
}
}
說明:
tryReleaseShared()
的作用是釋放共享鎖,将“鎖計數器”的值-1。
總結
CountDownLatch是通過“共享鎖”實作的。在建立CountDownLatch中時,會傳遞一個int類型參數count,該參數是“鎖計數器”的初始狀态,表示該“共享鎖”最多能被count個線程同時擷取。當某線程調用該CountDownLatch對象的await()方法時,該線程會等待“共享鎖”可用時,才能擷取“共享鎖”進而繼續運作。而“共享鎖”可用的條件,就是“鎖計數器”的值為0!而“鎖計數器”的初始值為count,每當一個線程調用該CountDownLatch對象的countDown()方法時,才将“鎖計數器”-1;通過這種方式,必須有count個線程調用countDown()之後,“鎖計數器”才為0,而前面提到的等待線程才能繼續運作!
以上,就是CountDownLatch的實作原理。
CountDownLatch的使用示例
下面通過CountDownLatch實作:”主線程”等待”5個子線程”全部都完成”指定的工作(休眠1000ms)”之後,再繼續運作。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest1 {
private static int LATCH_SIZE = ;
private static CountDownLatch doneSignal;
public static void main(String[] args) {
try {
doneSignal = new CountDownLatch(LATCH_SIZE);
// 建立5個任務
for (int i = ; i < LATCH_SIZE; i++)
new InnerThread().start();
System.out.println("main await begin.");
// "主線程"等待線程池中5個任務的完成
doneSignal.await();
System.out.println("main await finished.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class InnerThread extends Thread {
public void run() {
try {
Thread.sleep();
System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
// 将CountDownLatch的數值減1
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
運作結果:
main await begin.
Thread- sleep ms.
Thread- sleep ms.
Thread- sleep ms.
Thread- sleep ms.
Thread- sleep ms.
main await finished.
結果說明:主線程通過doneSignal.await()等待其它線程将doneSignal遞減至0。其它的5個InnerThread線程,每一個都通過doneSignal.countDown()将doneSignal的值減1;當doneSignal為0時,main被喚醒後繼續執行。