天天看點

J.U.C源碼分析:CountDownLatch

        CountDownLoad是在并發程式設計中使用較多的一個類,可以完成多個線程之間的互相等待和協作,源碼内容不多但功能強大且使用場景複雜多樣。

        源碼中對CountDownLoad功能的定義非常簡單:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
允許一個或多個線程等待的同步輔助在其他線程中執行的一組操作完成。
           

        簡單的說CountDownLoad實作了一個計數器的功能,使用CountDownLoad的時候需要設定一個值作為初始化使用。這個值也就是CountDownLoad在計數的時候最終所需要達到的值。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
           

        CountDownLoad隻有一個構造函數,就是說沒有無參構造函數,必須要在初始化的時候就指定count值。count值必須要大于0,否則抛出IllegalArgumentException非法參數。count值會被傳給Syuc類。

Syuc是CountDownLoad的靜态内部類,繼承自AbstractQueuedSynchronizer,final修飾不可被繼承。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
            return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
            return nextc == 0;
        }
    }
}
           

Syuc初始化的時候會調用setState(count)方法,這個方法來自于父類。

protected final void setState(int newState) {
    state = newState;
}
           

其中的state變量,是父類AbstractQueuedSynchronizer的成員變量,volatile修改保證可見性,讓多線程的情況下也能擷取到最新的值。

private volatile int state;

        getCount()擷取最新的state值,判斷與預先設定的值還差多少。

        tryAcquireShared(int acquires)方法,判斷state是否歸零,也就是CountDownLoad設定的預期值是否已經達到。

        tryReleaseShared(int releases)方法,嘗試釋放掉線程(這裡的并真的進行釋放,僅僅意味這個線程可以被釋放了),如果無需釋放傳回false,如果還有需要釋放的線程傳回false,如果釋放最後一個需要釋放的線程則傳回true。如果釋放線程失敗,将會一直循環并嘗試釋放線程,直到釋放掉一個線程。

        tryReleaseShared采用了compareAndSetState(int expect, int update)方法,将狀态變為關閉,采用CAS原理,增加其性能。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
           

CountDownLoad中的await()方法,阻塞目前線程,直到count值為0

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
           

await()方法會通過Syuc的父類的acquireSharedInterruptibly(int arg)來嘗試占用這個線程,造成堵塞(通過tryAcquireShared(arg)方法實作)

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
           

         doAcquireSharedInterruptibly(arg)方法則會對狀态作出判斷,如果目前計數為0,則了解傳回。如果目前計數大于零,線程被禁止排程,并且一直睡眠,直到count值歸0或者目前線程被其他線程中斷。

private void doAcquireSharedInterruptibly(int arg)
     throws InterruptedException {
     final Node node = addWaiter(Node.SHARED); //将目前的節點設定為共享節點
     boolean failed = true;
     try {
         for (;;) {
             final Node p = node.predecessor();//擷取目前節點的前一個節點
             if (p == head) {//如果前一個節點為head節點,按照FIFO的原則,可以直接嘗試擷取鎖。
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     setHeadAndPropagate(node, r); //擷取這個節點并且将它放到AQS的隊列列頭處,AQS列頭處的節點表示正在擷取鎖的節點
                     p.next = null; // help GC
                     failed = false;
                     return;
                 }
             }
             if (shouldParkAfterFailedAcquire(p, node) && //檢查下是否需要将目前節點挂起
                 parkAndCheckInterrupt())
                 throw new InterruptedException();
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }
           

        這裡需要補充一下AQS隊列是一個雙向隊列,節點中存儲在next和pre變量分别指向前一個節點和後一個節點,每個節點中都包含一個線程和一個表示節點類型的變量:這個變量可以表示是獨占節點還是共享節點。節點頭中的線程表示占有鎖的線程,其他節點中線程則等待擷取鎖。

        await(long timeout, TimeUnit unit)類似于await(),可以設定等待時間,當等待時間過期之後,線程變繼續運作。

        countDown()方法,會調用releaseShared(int arg)方法,會先嘗試擷取一個線程并且釋放他,tryReleaseShared()之前有說過隻有當所有線程都被釋放的瞬間才會為true。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
           

doReleaseShared()會放出解除阻塞線程的信号(這時候才會将被标記為可以釋放的線程釋放掉)。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}
           

總結: 多個線程調用await()方法被阻塞在一個連結清單裡面,然後這些線程會逐一調用countDown()方法,每調用一次count值便減1,直接count為0這些線程将會被釋放

繼續閱讀