【Java計時器】CountDownLatch
- 前言
- 如何使用
- 分析
-
- 構造方法
- await
- acquireSharedInterruptibly
- doAcquireSharedInterruptibly
- setHeadAndPropagate
- doReleaseShared
- countDown
- releaseShared
- tryReleaseShared
- 插曲:獨占&共享
- 總結
- 結束語
😃
AQS(一)
AQS(二)
AQS(三)
😮
前言
CountDownLatch是基于AQS進行實作的一個計時器,如果閱讀了上面AQS系列的文章,相信對CountDownLatch裡頭的區區300多行代碼,會有一個更加深刻的了解。
如何使用
CountDownLatch計時器的作用可以讓多個線程先各自完成自己的任務再繼續執行接下來的工作。實作方式如下:
這裡先模拟構造了一個查詢線程,其中睡個2s鐘模拟操作,在這之後,計數器會進行減一的操作。
class InquireThread implements Runnable {
private CountDownLatch countDownLatch;
public InquireThread(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("查詢開始...目前線程号: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown(); // 計數器減一
}
}
在CountDownLatchDemo 這裡,我的計數器countDownLatch 初始化為5,意味着我可以讓5條線程共享鎖。然後,建立線程的時候,把它作為參數進行傳遞。然後調用await進行等待,待計數器置為0,執行compute方法。
/***
*
* @Author:fsn
* @Date: 2020/4/2 14:24
* @Description 倒計時器
* 可以聯系join() 一起了解, CountDownLatch可以使目前線程,
* 不用等到子線程全部執行完才開始繼續執行, 隻要子線程執行的
* 過程中檢測到計時器為0, 就會喚醒目前線程
*/
public class CountDownLatchDemo {
private CountDownLatch countDownLatch = new CountDownLatch(5);
private int count = 6;
void test() throws InterruptedException {
Thread thread = null;
long result;
while ((count--) > 0) {
thread= new Thread(new InquireThread(countDownLatch));
thread.start();
}
countDownLatch.await();
compute();
}
private void compute() {
System.out.println("任務完成");
}
public static void main(String[] args) throws InterruptedException {
CountDownLatchDemo demo = new CountDownLatchDemo();
demo.test();
}
}
分析
構造方法
我們可以從構造方法入手,CountDownLatch提供的構造方法需要一個int參數,該參數用于初始化Syn内部類,該類也是繼承于AQS。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync内部類的構造方法如下所示,它會調用父類AQS的setState,初始化同步狀态。
Sync(int count) {
setState(count);
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
await
在初始化計時器後,我們會調用 countDownLatch.await()進行阻塞。該方法如下,如果阻塞的過程中,某些線程發生中斷,它會抛出中斷異常。另外,如果計時器遲遲沒有歸0,又沒有線程中斷,它會一直在這裡等到天荒地老(沒有設定逾時機制的話)。
可以嘗試将上述例子中,countDownLatch初始化的線程數調大或者建立線程數小于countDownLatch初始化的線程數。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly
接着就是acquireSharedInterruptibly方法,該放方法先進行線程中斷的判斷,然後通過tryAcquireShared方法擷取共享鎖。
tryAcquireShared和我們之前分析的 tryAcquire方法一樣,AQS都有沒有提供具體實作,而是由子類去做具體的操作。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
是以tryAcquireShared的具體操作是在CountDownLatch類中,它會擷取同步狀态,并判斷是否為0。為0代表擷取成功,則傳回1,反之-1。
貌似有個問題。。這個acquires似乎這裡沒有用到啊。。那還傳進來做啥?小夥伴可以先想想,我把自己的想法放在結束語裡邊。看看能否達成共識!
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly
當tryAcquireShared 傳回-1的時候,getState()傳回的同步狀态不為0,意味着還有線程占着茅坑,擷取鎖失敗,是以得進入同步隊列中。即接着就是doAcquireSharedInterruptibly方法。是不是覺得代碼和前面分析的acquireQueued頗有相似之處。
不同的是,這裡tryAcquireShared嘗試擷取的是共享鎖,當同步狀态不為0,則會嘗試挂起。另外,setHeadAndPropagate也是不一樣的地方,這裡不僅會設定頭部還會向後進行傳播。
拓展:doAcquireSharedInterruptibly方法和doAcquireShared類似,隻不過,該方法才遇到中斷後會抛出中斷異常。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 擷取節點的上一個節點
final Node p = node.predecessor();
if (p == head) {
// 嘗試擷取共享鎖
int r = tryAcquireShared(arg);
// r = (getState() == 0) ? 1 : -1;
// 如果同步狀态等于0,傳回1,反之傳回-1
if (r >= 0) {
// 設定頭部和傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 判斷擷取鎖失敗之後是否可以進入等待喚醒狀态
// 該方法保證目前線程的前驅節點的waitStatus屬性值為SIGNAL,
// 進而保證了自己挂起後,前驅節點會負責在合适的時候喚醒自己。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
當進入該方法時,意味着擷取共享鎖成功。此時同步狀态為0了,此方法在共享模式下,如果propagate > 0或 傳播(propagate )狀态已設定,會檢查後繼者是否正在等待。
關于傳播狀态,我們之前已經說過,這裡回憶一下吧:
- CANCELLED :1 表明一個等待的線程被取消了
- SIGNAL : -1 表明一個等待線程的下一個線程需要被喚醒
- CONDITION: -2 目前線程正在等待中
- PROPAGATE :-3 下一次申請共享鎖方法應該被無條件的傳播
- 0:初始值
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
setHead(node)之前也做了分析了,它将擷取鎖的節點設定為頭部節點,将線程和prev 節點置為null。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
然後是 Node s = node.next,該行代碼會尋找目前節點的下一個節點,如果下一個節點為null或者是共享節點(通過如下isShared方法進行判斷)則進入doReleaseShared方法。
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
doReleaseShared
該方法做了個啥事情呢?通過注釋,我們可以知道它的工作是釋放共享鎖,并確定能夠喚醒繼任者。
首先,擷取頭結點的狀态,并判斷是否為SIGNALif (ws == Node.SIGNAL),如果是的話CAS操作修改為0,如果成功喚醒繼任者,不成功繼續自旋。如果狀态已經是0了,則嘗試設定為PROPAGATE狀态(下一次申請共享鎖方法應該被無條件的傳播 )
最後,會判斷頭部節點是否被修改了,若被修改了說明有其他擷取了鎖,為了保證正确的傳播,繼續循環。反之,跳出doReleaseShared方法。最後在doAcquireSharedInterruptibly方法處return。
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// 要喚醒繼任者(下一個)節點,是以h不能為null,也不能等于尾部節點
if (h != null && h != tail) {
// 如果節點為SIGNAL,會通過CAS将狀态置為0,然後喚醒繼任者
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果節點的狀态為0,則CAS嘗試設定為PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 多線程環境下, 頭結點可能被修改, 此時說明其他節點擷取到了鎖
if (h == head) // loop if head changed
break;
}
}
countDown
CountDownLatch裡頭的await方法分析完了還有一件事,就是我們的計時器進行同步狀态的減操作,不然,同步狀态一直不會為0,await不設定逾時則會一直卡殼。是以接下來分析的就是CountDownLatch操作。
如下所示,每次使用countDown,都會進行減一,而實際操作在于releaseShared
public void countDown() {
sync.releaseShared(1);
}
releaseShared
其中,tryReleaseShared方法可以和上文的tryAcquireShared方法對比了解。如果減一成功,進行釋放共享鎖的操作。
而關于這個doReleaseShared方法,我們已經在面分析過了,這也是為什麼該方法設定state的時候需要CAS的原因之一,因為該方法有兩個入口,會産生線程不安全。
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
關于tryReleaseShared方法,也比較簡單,擷取同步狀态,判斷是否為0,進行減一操作,然後進行CAS交換。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
插曲:獨占&共享
在前面的文章分析中,我們對 addWaiter做過分析。我們從acquire方法進行引入,然後分析的是獨占鎖的申請方式。而這裡addWaiter(Node.SHARED),表示的建立一個申請共享鎖的節點,并進入同步隊列。
public final void acquire(int arg) {
// 如果申請鎖不成功, 則放入阻塞隊列,這裡還是會調用tryAcquire()方法
// 該方法也是我們自定義實作的
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
關于共享和獨占的差別之一在于,Node.EXCLUSIVE是一個null節點,而Node.SHARED則是一個執行個體化過的節點。
它們進入addWaiter之後,會重新構造一個節點,其中mode對應的就是share或者exclusive。從構造方法這裡,我們可以看到對于獨占鎖來說,nextWaiter 總是null的。而共享鎖則是一個執行個體化過的節點。
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
總結
本文分析了CountDownLatch的原理,并通過await方法和countDown方法進行切入點,分析了AQS的共享鎖的擷取及傳播操作。
結束語
文中提到了acquires參數在tryAcquireShared都沒有用到為什麼還要傳進來呢?其實我覺的是 Doug Lea的設計習慣,因為這個參數對于CountDownLatch雖然沒有用,但不代表對重入鎖沒有用,而AQS作為它們兩個的父類(說的不準确,應該是它們内部類Syn的父類),也可以對tryAcquireShared方法進行重載,但專門為CountDownLatch類重載一個方法又沒太多必要,是以幹脆就這麼用了吧。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}