天天看點

Java并發Concurrent包的鎖(六)——CountDownLatch源碼分析及使用Sync類屬性構造函數重要方法使用情景

CountDownLatch 和 CyclicBarrier 比較相似,都屬于并發的實用工具類。

CyclicBarrier 的源碼分析和使用可以參考:Java并發Concurrent包的鎖(五)——CyclicBarrier源碼分析及使用

JDK 文檔中的定義:

用給定的計數初始化 CountDownLatch。由于調用了 countDown() 方法,是以在目前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都将立即傳回。這種現象隻出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

兩者的差別:

- CountDownLatch 的作用是允許 1 個(或N個)線程等待其他線程完成執行;而 CyclicBarrier 是允許 N 個線程互相等待;

- CountDownLatch 的計數器無法被重置;CyclicBarrier 的計數器可以被重置後使用,是以它是循環的。

下面通過源碼來分析 CountDownLatch 。

Sync類

類中引用了 Sync 類的對象, Sync 類是擴充了 AbstractQueuedSynchronizer 而來:

使用了 AQS 類的 state 值來儲存 count 數量。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = L;

        // 構造函數,帶count值參數
        Sync(int count) {
            setState(count);
        }

        // 擷取count值
        int getCount() {
            return getState();
        }

        // 重寫的方法,如果count為0,傳回1,即所有線程都到位
        protected int tryAcquireShared(int acquires) {
            return (getState() == ) ?  : -;
        }

        // 重寫的方法,實際是如果有線程到位了,讓count減1
        protected boolean tryReleaseShared(int releases) {
            // 一直循環
            for (;;) {
                int c = getState();
                if (c == )
                    return false;
                int nextc = c-;
                // 嘗試設定state為減1後的值,CAS方法如果為c,才設定新值
                if (compareAndSetState(c, nextc))
                    return nextc == ;
            }
        }
    }
           

屬性

// 引用了sync類的對象
    private final Sync sync;
           

構造函數

public CountDownLatch(int count) {
        if (count < ) throw new IllegalArgumentException("count < 0");
        // 線程數量count,調用Sync類的構造函數
        this.sync = new Sync(count);
    }
           

重要方法

await

使目前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly();
    }
           

這裡我們看調用的 AQS 類中的對應方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < )
            doAcquireSharedInterruptibly(arg);
    }
           

這裡的 tryAcquireShared 方法在 sync 類中進行了重寫,如果所有線程都到位了,傳回 1 。如果 count 不為 0 ,說明還有線程沒有到位,需要繼續等待。

countDown

countDown 方法調用 sync 重寫的 releaseShared 方法,有線程到位了就讓未到位的線程數量減 1:

public void countDown() {
        sync.releaseShared();
    }
           

使用情景

比如還拿 CyclicBarrier 中的檔案讀取的例子,使用 CountDownLatch 來再重新實作:

屏障類中的那個例子隻有 10 個子線程到達屏障後互相等待,現在要加入一個新的統計線程來等待這10個線程。

public class TestCDLatch {

    private static CountDownLatch countDownLatch;

    public static class SumThread extends Thread {

        @Override
        public void run() {
            System.out.println("需要等待 " + countDownLatch.getCount() + " 個子線程運作完!");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("其它線程都運作完畢,可以開始統計了");
        }

    }

    public static class CountThread extends Thread {

        @Override
        public void run() {
            System.out.println("子線程" + Thread.currentThread().getName() + "讀完了!");
            countDownLatch.countDown();
        }

    }

    public static void main(String[] args) {

        countDownLatch = new CountDownLatch();

        new SumThread().start();

        for(int i=; i<; i++){
            new CountThread().start();
        }

    }

}
           

運作結果:

需要等待  個子線程運作完!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
子線程Thread-讀完了!
其它線程都運作完畢,可以開始統計了