天天看點

AQS源碼探究_08 CyclicBarrier源碼分析

1、簡介

CyclicBarrier,回環栅欄,它會阻塞一組線程直到這些線程同時達到某個條件才繼續執行。它與CountDownLatch很類似,但又不同,CountDownLatch需要調用countDown()方法觸發事件,而CyclicBarrier不需要,它就像一個栅欄一樣,當一組線程都到達了栅欄處才繼續往下走。

工作原理圖:

AQS源碼探究_08 CyclicBarrier源碼分析
AQS源碼探究_08 CyclicBarrier源碼分析
AQS源碼探究_08 CyclicBarrier源碼分析

CyclicBarrier與CountDownLatch的異同?

兩者都能實作阻塞一組線程等待被喚醒;

前者是最後一個線程到達時自動喚醒;

後者是通過顯式地調用countDown()實作的;

前者是通過重入鎖及其條件鎖實作的,後者是直接基于AQS實作的;

前者具有“代”的概念,可以重複使用,後者隻能使用一次;

前者隻能實作多個線程到達栅欄處一起運作;

後者不僅可以實作多個線程等待一個線程條件成立,還能實作一個線程等待多個線程條件成立(詳見CountDownLatch那章使用案例);

2、入門案例

在分析源碼之前,先看一個入門案例:

使用一個CyclicBarrier使得5個玩家線程保持同步,當5個線程同時到達 cyclicBarrier.await();處,大家再一起往下運作。

/**
 * date: 2021/5/10
 *
 * @author csp
 */
public class CyclicBarrierTest01 {
    /**
     * 案例:
     * 模拟過 “王者榮耀” 遊戲開始邏輯
     */
    public static void main(String[] args) {
        // 第一步:定義玩家,定義5個
        String[] heros = {"安琪拉", "亞瑟", "馬超", "張飛", "劉備"};

        // 第二步:建立固定線程數量的線程池,線程數量為5
        ExecutorService service = Executors.newFixedThreadPool(5);

        // 第三步:建立barrier,parties 設定為5
        CyclicBarrier barrier = new CyclicBarrier(5);

        // 第四步:通過for循環開啟5任務,模拟開始遊戲,傳遞給每個任務(英雄名稱和barrier)
        for (int i = 0; i < 5; i++) {
            service.execute(new Player(heros[i], barrier));
        }

        // 所有線程執行完畢,關閉線程池釋放資源
        service.shutdown();
    }

    /**
     * 玩家線程:
     */
    static class Player implements Runnable {
        // 英雄名稱
        private String hero;
        // barrier
        private CyclicBarrier barrier;

        public Player(String hero, CyclicBarrier barrier) {
            this.hero = hero;
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                // 每個玩家加載進度不一樣,這裡使用随機數來模拟!
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                System.out.println(hero + ":加載進度100%,等待其他玩家加載完成中...");
                // 隻有當5個玩家線程都加載完畢後,栅欄才放行!
                barrier.await();
                System.out.println(hero + ":發現所有英雄加載完成,開始戰鬥吧!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
      

運作結果如下:

張飛:加載進度100%,等待其他玩家加載完成中...
亞瑟:加載進度100%,等待其他玩家加載完成中...
馬超:加載進度100%,等待其他玩家加載完成中...
安琪拉:加載進度100%,等待其他玩家加載完成中...
劉備:加載進度100%,等待其他玩家加載完成中...
劉備:發現所有英雄加載完成,開始戰鬥吧!
張飛:發現所有英雄加載完成,開始戰鬥吧!
安琪拉:發現所有英雄加載完成,開始戰鬥吧!
馬超:發現所有英雄加載完成,開始戰鬥吧!
亞瑟:發現所有英雄加載完成,開始戰鬥吧!
      

3、源碼分析

成員屬性

// 重入鎖: 因為barrier實作是依賴于Condition條件隊列的,condition條件隊列必須依賴lock才能使用。
private final ReentrantLock lock = new ReentrantLock();

// 條件鎖,名稱為trip,絆倒的意思,可能是指線程來了先絆倒,等達到一定數量了再喚醒:
// 線程挂起實作使用的condition隊列。
// 條件:目前代所有線程到位,這個條件隊列内的線程才會被喚醒。
private final Condition trip = lock.newCondition();

// // 需要等待的線程數量: Barrier需要參與進來的線程數量
private final int parties;

// 當喚醒的時候執行的指令: 目前代最後一個到位的線程需要執行的事件
private final Runnable barrierCommand;

// 代: 表示barrier對象 目前 “代”
private Generation generation = new Generation();

// 目前這一代還需要等待的線程數: 
// 表示目前“代”還有多少個線程未到位,初始值為parties。
private int count;
      

内部類

Generation,中文翻譯為"代",一代人的代,用于控制CyclicBarrier的循環使用。

比如,上面示例中的5個線程完成後進入下一代,繼續等待5個線程達到栅欄處再一起執行,而CountDownLatch則做不到這一點,CountDownLatch是一次性的,無法重置其次數。

/**
 * 表示:“代”
 */
private static class Generation {
    // 表示目前“代”是否被打破,如果代被打破,那麼再來到這一代的線程 就會直接抛出BrokenException異常
    // 且在這一代挂起的線程都會被喚醒,然後抛出 BrokerException異常。
    boolean broken = false;
}
      

構造方法

構造方法需要傳入一個parties變量,也就是需要等待的線程數。

/* 
 * parties:Barrier需要參與的線程數量,每次屏障需要參與的線程數
 * barrierAction:目前“代”最後一個到位的線程,需要執行的事件(可以為null)
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    // 因為小于等于0的barrier沒有任何意義..沒有任何線程可以參與進來~
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化parties
    this.parties = parties;
    // 初始化count等于parties,後面目前代每到位一個線程,count--
    this.count = parties;
    // 初始化都到達栅欄處執行的指令
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
      

成員方法

1. nextGeneration方法

/**
 * 開啟下一代的方法,當這一代所有線程到位後(假設barrierCommand不為空,還需要最後一個線程執行完事件),
 * 會調用nextGeneration()開啟新的一代。
 */
private void nextGeneration() {
    // 将在trip條件隊列内挂起的線程全部喚醒
    trip.signalAll();

    // 重置count為parties
    count = parties;

    // 開啟新的一代..使用一個新的generation對象,表示新的一代,新的一代和上一代沒有任何關系。
    generation = new Generation();
}
      

2. breakBarrier方法

/**
 * 打破barrier屏障,在屏障内的線程都會抛出異常..
 */
private void breakBarrier() {
    // 将代中的broken設定為true,表示這一代是被打破了的,再來到這一代的線程,直接抛出異常.
    generation.broken = true;
    // 重置count為parties
    count = parties;
    // 将在trip條件隊列内挂起的線程全部喚醒,喚醒後的線程會檢查目前代是否是打破的,
    // 如果是打破的話,接下來的邏輯和開啟下一代喚醒的邏輯不一樣.
    trip.signalAll();
}
      

3. await()方法

  • 每個需要在栅欄處等待的線程都需要顯式地調用

    await()

    方法等待其它線程的到來。
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
     
    // 調用dowait方法,不需要逾時           
    return dowait(true, unit.toNanos(timeout));
}
      

4. dowait方法(重點)

dowait()方法裡的整個邏輯分成兩部分:

(1)最後一個線程走上面的邏輯,當count減為0的時候,打破栅欄,它調用nextGeneration()方法通知條件隊列中的等待線程轉移到AQS的隊列中等待被喚醒,并進入下一代。

(2)非最後一個線程走下面的for循環邏輯,這些線程會阻塞在condition的await()方法處,它們會加入到條件隊列中,等待被通知,當它們喚醒的時候已經更新換“代”了,這時候傳回。

/**
 * timed:表示目前調用await方法的線程是否指定了逾時時長,如果true 表示 線程是響應逾時的
 * nanos:線程等待逾時時長納秒,如果 timed == false ===> nanos == 0
 */
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    // 擷取barrier全局鎖對象
    final ReentrantLock lock = this.lock;
    // 加鎖
    // 為什麼要加鎖呢?
    // 因為 barrier的挂起和喚醒依賴的元件都是condition。
    lock.lock();
    try {
        // 目前代: 擷取barrier目前的 “代”
        final Generation g = generation;

        // 檢查: 如果目前代是已經被打破狀态,則目前調用await方法的線程,直接抛出Broken異常
        if (g.broken)
            throw new BrokenBarrierException();

        // 中斷檢查: 如果目前線程的中斷标記位為true,則打破目前代,然後目前線程抛出中斷異常
        if (Thread.interrupted()) {
            // 1.設定目前代的狀态為broken狀态  
            // 2.喚醒在trip條件隊列内的線程
            breakBarrier();
            throw new InterruptedException();
        }

        // 執行到這裡,說明 目前線程中斷狀态是正常的 false, 目前代的broken為 false(未打破狀态)
        // 正常邏輯...
        
        // count的值減1
        // 假設 parties 給的是 5,那麼index對應的值為 4,3,2,1,0
        int index = --count;
        // 如果數量減到0了,走這段邏輯(最後一個線程走這裡):
        // 條件成立:說明目前線程是最後一個到達barrier的線程,此時需要做什麼呢?
        if (index == 0) {  // tripped
            // 标記:true表示 最後一個線程 執行cmd時未抛異常。  false,表示最後一個線程執行cmd時抛出異常了.
            // cmd就是建立 barrier對象時 指定的第二個 Runnable接口實作,這個可以為null
            boolean ranAction = false;
            try {
                // 如果初始化的時候傳了指令,這裡執行
                final Runnable command = barrierCommand;
                // 條件成立:說明建立barrier對象時 指定 Runnable接口了,這個時候最後一個到達的線程 就需要執行這個接口
                if (command != null)
                    command.run();

                // command.run()未抛出異常的話,那麼線程會執行到這裡。
                ranAction = true;

                // 調用下一代方法: 開啟新的一代
                // 1.喚醒trip條件隊列内挂起的線程,被喚醒的線程 會依次 擷取到lock,然後依次退出await方法。
                // 2.重置count 為 parties
                // 3.建立一個新的generation對象,表示新的一代
                nextGeneration();
                // 傳回0,因為目前線程是此 代 最後一個到達的線程,是以Index == 0
                return 0;
            } finally {
                if (!ranAction)
                    // 如果command.run()執行抛出異常的話,會進入到這裡。
                    breakBarrier();
            }
        }

        // 執行到這裡,說明目前線程 并不是最後一個到達Barrier的線程..此時需要進入一個自旋中.

        // 這個循環隻有非最後一個線程可以走到
        // 自旋,一直到條件滿足、目前代被打破、線程被中斷,等待逾時
        for (;;) {
            try {
                // 條件成立:說明目前線程是不指定逾時時間的
                if (!timed)
                    // 調用condition的await()方法:
                    // 目前線程 會 釋放掉lock,然後進入到trip條件隊列的尾部,然後挂起自己,等待被喚醒。
                    trip.await();
                else if (nanos > 0L)
                    // 逾時等待方法:
                    // 說明目前線程調用await方法時 是指定了逾時時間的!
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 抛出中斷異常,會進來這裡。
                // 什麼時候會抛出InterruptedException異常呢?
                // Node節點在 條件隊列内 時 收到中斷信号時 會抛出中斷異常!


                // 條件一:g == generation 成立,說明目前代并沒有變化。
                // 條件二:! g.broken 目前代如果沒有被打破,那麼目前線程就去打破,并且抛出異常..
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 執行到else有幾種情況?
                    // 1.代發生了變化,這個時候就不需要抛出中斷異常了,因為 代已經更新了,這裡喚醒後就走正常邏輯了..隻不過設定下 中斷标記。
                    // 2.代沒有發生變化,但是代被打破了,此時也不用傳回中斷異常,執行到下面的時候會抛出  brokenBarrier異常。也記錄下中斷标記位。
                    Thread.currentThread().interrupt();
                }
            }

            // 喚醒後,執行到這裡,有幾種情況?
            // 1.正常情況,目前barrier開啟了新的一代(trip.signalAll())
            // 2.目前Generation被打破,此時也會喚醒所有在trip上挂起的線程
            // 3.目前線程trip中等待逾時,然後主動轉移到 阻塞隊列 然後擷取到鎖 喚醒。

            // 檢查: 
            // 條件成立:目前代已經被打破
            if (g.broken)
                // 線程喚醒後依次抛出BrokenBarrier異常。
                throw new BrokenBarrierException();

            // 喚醒後,執行到這裡,有幾種情況?
            // 1.正常情況,目前barrier開啟了新的一代(trip.signalAll()),
            // 3.目前線程trip中等待逾時,然後主動轉移到 阻塞隊列 然後擷取到鎖 喚醒。
            // 正常來說這裡肯定不相等
            // 因為上面打破栅欄的時候調用nextGeneration()方法時generation的引用已經變化了
           
            // 條件成立:說明目前線程挂起期間,最後一個線程到位了,然後觸發了開啟新的一代的邏輯,此時喚醒trip條件隊列内的線程。
            if (g != generation)
                // 傳回目前線程的index。
                return index;

            // 喚醒後,執行到這裡,有幾種情況?
            // 3.目前線程trip中等待逾時,然後主動轉移到阻塞隊列然後擷取到鎖 喚醒。
            // 逾時檢查
            if (timed && nanos <= 0L) {
                // 打破barrier
                breakBarrier();
                // 抛出逾時異常.
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
      

4、總結

CyclicBarrier會使一組線程阻塞在await()處,當最後一個線程到達時喚醒(隻是從條件隊列轉移到AQS隊列中)前面的線程大家再繼續往下走;

CyclicBarrier不是直接使用AQS實作的一個同步器;

CyclicBarrier基于ReentrantLock及其Condition實作整個同步邏輯;

繼續閱讀