天天看點

Java多線程系列--“JUC鎖”09之 CountDownLatch原理和示例

概要

前面對”獨占鎖”和”共享鎖”有了個大緻的了解;本章,我們對

CountDownLatch

進行學習。和ReadWriteLock.ReadLock一樣,

CountDownLatch

的本質也是一個”共享鎖”。本章的内容包括:

  • CountDownLatch簡介
  • CountDownLatch資料結構
  • CountDownLatch源碼分析(基于JDK1.7.0_40)
  • CountDownLatch示例

CountDownLatch簡介

CountDownLatch

是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

CountDownLatch和CyclicBarrier的差別
  • CountDownLatch的作用是允許1或N個線程等待其他線程完成執行;而CyclicBarrier則是允許N個線程互相等待。
  • CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用,是以它被稱為是循環的barrier。

關于CyclicBarrier的原理,後面一章再來學習。

CountDownLatch函數清單
CountDownLatch(int count)
構造一個用給定計數初始化的 CountDownLatch。

// 使目前線程在鎖計數器倒計數至零之前一直等待,除非線程被中斷。
void await()
// 使目前線程在鎖計數器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。
boolean await(long timeout, TimeUnit unit)
// 遞減鎖計數器的計數,如果計數到達零,則釋放所有等待的線程。
void countDown()
// 傳回目前計數。
long getCount()
// 傳回辨別此鎖計數器及其狀态的字元串。
String toString()
           

CountDownLatch資料結構

CountDownLatch的UML類圖如下:

Java多線程系列--“JUC鎖”09之 CountDownLatch原理和示例

CountDownLatch

的資料結構很簡單,它是通過”共享鎖”實作的。它包含了sync對象,sync是Sync類型。Sync是執行個體類,它繼承于AQS。

CountDownLatch源碼分析(基于JDK1.7.0_40)

CountDownLatch完整源碼(基于JDK1.7.0_40)

CountDownLatch

是通過“共享鎖”實作的。下面,我們分析

CountDownLatch

中3個核心函數: CountDownLatch(int count), await(), countDown()。

CountDownLatch(int count)
public CountDownLatch(int count) {
    if (count < ) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
           

說明:該函數是建立一個Sync對象,而Sync是繼承于AQS類。Sync構造函數如下:

Sync(int count) {
    setState(count);
}
           

setState()在AQS中實作,源碼如下:

protected final void setState(long newState) {
    state = newState;
}
           

說明:在AQS中,

state

是一個

private volatile long

類型的對象。對于CountDownLatch而言,state表示”鎖計數器“。CountDownLatch中的getCount()最終是調用AQS中的getState(),傳回的state對象,即”鎖計數器“。

await()
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly();
}
           

說明:該函數實際上是調用的AQS的

acquireSharedInterruptibly(1)

;

AQS中的acquireSharedInterruptibly()的源碼如下:

public final void acquireSharedInterruptibly(long arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < )
        doAcquireSharedInterruptibly(arg);
}
           

說明:

acquireSharedInterruptibly()

的作用是擷取共享鎖。

如果目前線程是中斷狀态,則抛出異常InterruptedException。否則,調用

tryAcquireShared(arg)

嘗試擷取共享鎖;嘗試成功則傳回,否則就調用

doAcquireSharedInterruptibly()

。doAcquireSharedInterruptibly()會使目前線程一直等待,直到目前線程擷取到共享鎖(或被中斷)才傳回。

tryAcquireShared()

在CountDownLatch.java中被重寫,它的源碼如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == ) ?  : -;
}
           

說明:

tryAcquireShared()

的作用是嘗試擷取共享鎖。如果”鎖計數器=0”,即鎖是可擷取狀态,則傳回1;否則,鎖是不可擷取狀态,則傳回-1。

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 建立"目前線程"的Node節點,且Node中記錄的鎖是"共享鎖"類型;并将該節點添加到CLH隊列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 擷取上一個節點。
            // 如果上一節點是CLH隊列的表頭,則"嘗試擷取共享鎖"。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= ) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // (上一節點不是CLH隊列的表頭) 目前線程一直等待,直到擷取到共享鎖。
            // 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀态)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

說明:

  • addWaiter(Node.SHARED)的作用是,建立”目前線程“的Node節點,且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);并将該節點添加到CLH隊列末尾。關于Node和CLH在”Java多線程系列–“JUC鎖”03之 公平鎖(一)”已經詳細介紹過,這裡就不再重複說明了;
  • node.predecessor()的作用是,擷取上一個節點。如果上一節點是CLH隊列的表頭,則”嘗試擷取共享鎖“;
  • shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試擷取鎖失敗之後,線程應該等待,則傳回true;否則,傳回false;
  • 當shouldParkAfterFailedAcquire()傳回ture時,則調用parkAndCheckInterrupt(),目前線程會進入等待狀态,直到擷取到共享鎖才繼續運作。
  • doAcquireSharedInterruptibly()

    中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函數在”Java多線程系列–“JUC鎖”03之 公平鎖(一)”中介紹過,這裡也就不再詳細說明了。
countDown()
public void countDown() {
    sync.releaseShared();
}
           

說明:該函數實際上調用

releaseShared(1)

釋放共享鎖。

releaseShared()

在AQS中實作,源碼如下:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 擷取“鎖計數器”的狀态
        int c = getState();
        if (c == )
            return false;
        // “鎖計數器”-1
        int nextc = c-;
        // 通過CAS函數進行指派。
        if (compareAndSetState(c, nextc))
            return nextc == ;
    }
}
           

說明:

tryReleaseShared()

的作用是釋放共享鎖,将“鎖計數器”的值-1。

總結

CountDownLatch是通過“共享鎖”實作的。在建立CountDownLatch中時,會傳遞一個int類型參數count,該參數是“鎖計數器”的初始狀态,表示該“共享鎖”最多能被count個線程同時擷取。當某線程調用該CountDownLatch對象的await()方法時,該線程會等待“共享鎖”可用時,才能擷取“共享鎖”進而繼續運作。而“共享鎖”可用的條件,就是“鎖計數器”的值為0!而“鎖計數器”的初始值為count,每當一個線程調用該CountDownLatch對象的countDown()方法時,才将“鎖計數器”-1;通過這種方式,必須有count個線程調用countDown()之後,“鎖計數器”才為0,而前面提到的等待線程才能繼續運作!

以上,就是CountDownLatch的實作原理。

CountDownLatch的使用示例

下面通過CountDownLatch實作:”主線程”等待”5個子線程”全部都完成”指定的工作(休眠1000ms)”之後,再繼續運作。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest1 {

    private static int LATCH_SIZE = ;
    private static CountDownLatch doneSignal;

    public static void main(String[] args) {

        try {
            doneSignal = new CountDownLatch(LATCH_SIZE);

            // 建立5個任務
            for (int i = ; i < LATCH_SIZE; i++)
                new InnerThread().start();

            System.out.println("main await begin.");
            // "主線程"等待線程池中5個任務的完成
            doneSignal.await();

            System.out.println("main await finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class InnerThread extends Thread {
        public void run() {
            try {
                Thread.sleep();
                System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
                // 将CountDownLatch的數值減1
                doneSignal.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           

運作結果:

main await begin.
Thread- sleep ms.
Thread- sleep ms.
Thread- sleep ms.
Thread- sleep ms.
Thread- sleep ms.
main await finished.
           

結果說明:主線程通過doneSignal.await()等待其它線程将doneSignal遞減至0。其它的5個InnerThread線程,每一個都通過doneSignal.countDown()将doneSignal的值減1;當doneSignal為0時,main被喚醒後繼續執行。

繼續閱讀