天天看點

玩轉高并發系列----JUC并發工具類(二)

在前一章節我們詳細分析了

Semaphore

以及

CountDownLatch

的基本用法及實作原理,并且在将

CountDownLatch

時,也舉了一個運動員賽跑的栗子。下面我将接着這個栗子詳細分析

CyclicBarrier

的基本用法及底層實作原理。

CyclicBarrier

  1. 顧名思義,是一個可被循環使用的屏障。在前一章講述CountDownLatch的時候說道,

    CountDownLatch

    可以用在等待指定的線程數執行完成,即到達一個指定的時間節點後,同時開始其他的任務。而

    CyclicBarrier

    在其上做了更深層次的優化—當達到一個指定的節點後,CyclicBarrier會被重置,進而可以再次被利用,等待線程到達下一個時間節點。
還是沿用上一章節中,運動員的栗子。運動員需要準備,裁判需要等待所有運動員準備好後,才能開始鳴槍開始,這裡是第一個時間節點,跑完後,需要等所有運動員都到達終點後,才能開始統計各個運動員的名次,這是第二個時間節點,需要等待所有運動員的名次統計好後,才能舉行頒獎,這又是第三個時間節點。這個時候,如果使用

CountDownLatch

實作的,我們需要同時定義三個CountDownLatch對象,因為CountDownLatch中的state變量為0後,不會被重置為初始值,是以不能被重複循環利用。此時可以用

CyclicBarrier

來解決這個問題。
/*
*runner-2 is Ready!
runner-3 is Ready!
runner-1 is Ready!
runner-4 is Ready!
runner-5 is Ready!
=================All runners ready============
runner-2 arrive!
runner-4 arrive!
runner-1 arrive!
runner-3 arrive!
runner-5 arrive!
=================All runners ready============
runner-2 has been recorded...
runner-1 has been recorded...
runner-4 has been recorded...
runner-5 has been recorded...
runner-3 has been recorded...
=================All runners ready============
**/
ublic class CyclicBarrierDemo {
    public static void main(String[] args) {
        String prefix = "runner-";
        // 指定需要等待5個運動員同時完成某件事後,再執行某項任務
        // 同時指定了一個lambda表達式作為第二個參數,這個參數表示所有線程到達後,執行的任務(有最後一個達到的線程執行)
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("=================All runners ready============"));
        // 模拟五個運動員,即五個線程
        IntStream.rangeClosed(1, 5).forEach(i -> new Runner(cyclicBarrier, prefix + i).start());

    }

    private static class Runner extends Thread {

        private String name;
        private CyclicBarrier cyclicBarrier;

        public Runner(CyclicBarrier cyclicBarrier, String name) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
        }

        @Override
        public void run() {
            try {
            // 随機休眠一段時間:模拟準備時間
                sleepRandom();
                System.out.println(this.name + " is Ready!");
                // 同步阻塞,等待其他運動員準備完成
                cyclicBarrier.await();

				// 開始跑步後,模拟各個運動員跑步時間
                sleepRandom();
                System.out.println(this.name + " arrive!");
                // 等待所有運動員到達現場
                cyclicBarrier.await();

				// 模拟記錄運動員名次的事件
                sleepRandom();
                System.out.println(this.name + " has been recorded...");
                // 等待所有運動員,被記錄,開始頒獎
                cyclicBarrier.await();
            } catch (Exception e) {
                System.out.println(this.name + " Error");
            }
        }
    }
}
           
  1. 源碼分析:

    CyclicBarrier

    沒有直接基于AQS實作同步過程,而是通過

    ReentrantLock

    +

    Condition

    的方式實作的。
首先一起來看一下其構造函數:構造函數比較簡單,就是完成基本的指派操作
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        // 指定需要等待的線程總數
        this.parties = parties;
        // 記錄仍然需要等待的線程個數:每達到一個線程處于阻塞狀态時,count-1,
        // 當count為0時,喚醒所有等待的線程
        this.count = parties;
        // 指定當parties個線程達到,即count為0時,執行的回調方法。
        this.barrierCommand = barrierAction;
    }
           
接下來重點分析

await()

方法,這個方法是

CyclicBarrier

的核心方法。
// 方法聲明抛出InterruptedException,表示可以相應中斷
// BrokenBarrierException表示可以中斷一次屏障,即打破CyclicBarrier中線程阻塞的狀态,使其喚醒
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 擷取目前鎖
        lock.lock();
        try {
        // 定義了一個Generation内部類,表示一次循環
            final Generation g = generation;
		// 循環被打破,表示該CyclicBarrier不可用,抛出BrakenBarrierException異常
            if (g.broken)
                throw new BrokenBarrierException();

			// 響應線程中斷
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			
			// 計數器減一,即表示一個線程任務到達。
            int index = --count;
            // 減為0後,即count==0,此時index==count,表示目前循環所有任務都到達,需喚醒阻塞的線程
            if (index == 0) {  // tripped
            
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 這裡就是構造函數中設定的回調。不為null,則會執行回調,且隻會由最後一個到達的線程執行。
                    if (command != null)
                        command.run();
                    // ranAction設定為true,表示需要喚醒阻塞的線程,繼續往下執行,而不是阻塞,false表示需要打破這次循環
                    ranAction = true;
                    // 開啟下一代,即下一輪循環:其實就是設定count重新等于parties
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                // 判斷目前調用的是逾時等待await(timeout,unit),還是一直等待await()
                    if (!timed)
                    // 阻塞目前線程,同時釋放鎖:此處的trip就是該Lock的Condition。對Condition不了解的同學可以看前面章節。
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                    // 目前循環不可用時,抛出異常資訊,同時喚醒所有阻塞的線程(在breakBarrier()中喚醒)
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
// 開始下一次循環:主要完成三件事:
// 1. trip.signalAll();喚醒所有阻塞在trip上的線程
// 2. count = parties;将需要等待的線程數重新設定回原來的parties值,以便開始下一次循環
// 3. generation = new Generation();開始下一次循環,broken預設為false
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

// 打破循環,也主要完成三件事
// 1. generation.broken = true;将目前循環的broken設定為true,後續加入CyclicBarrier的線程,即執行await()方法時,會抛出異常
// 2. count = parties;恢複count的值,同上
// 3. trip.signalAll();喚醒所有阻塞的線程。
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
           
通過上述分析可知:
  1. 無論是調用

    nextGeneration()

    還是調用

    breakBarrier()

    都會喚醒目前阻塞的所有線程,隻是前者會開啟下一代,即開始下一輪循環等待。而後者不會開啟下一代,且禁止線程再次加入

    CyclicBarrier

    的阻塞隊列中來。
  2. CyclicBarrier

    是可重用的。等待線程到達後,會開啟下一代,每一輪循環,被稱為一個Generation,也就是一個同步點,一代。
  3. CyclicBarrier

    會響應中斷。當線程沒有到齊時,如果有線程接收到了中斷信号,所有阻塞的線程都會被喚醒,且其他線程無法再加入
  4. CyclicBarrier

    可以指定回調函數,且回調函數隻會被最後到達的線程執行,而不是每個線程都執行。

繼續閱讀