1、簡介
CyclicBarrier,回環栅欄,它會阻塞一組線程直到這些線程同時達到某個條件才繼續執行。它與CountDownLatch很類似,但又不同,CountDownLatch需要調用countDown()方法觸發事件,而CyclicBarrier不需要,它就像一個栅欄一樣,當一組線程都到達了栅欄處才繼續往下走。
工作原理圖:

CyclicBarrier與CountDownLatch的異同?
兩者都能實作阻塞一組線程等待被喚醒;
前者是最後一個線程到達時自動喚醒;
後者是通過顯式地調用countDown()實作的;
前者是通過重入鎖及其條件鎖實作的,後者是直接基于AQS實作的;
前者具有“代”的概念,可以重複使用,後者隻能使用一次;
前者隻能實作多個線程到達栅欄處一起運作;
後者不僅可以實作多個線程等待一個線程條件成立,還能實作一個線程等待多個線程條件成立(詳見CountDownLatch那章使用案例);
2、入門案例
在分析源碼之前,先看一個入門案例:
使用一個CyclicBarrier使得5個玩家線程保持同步,當5個線程同時到達 cyclicBarrier.await();處,大家再一起往下運作。
/**
* date: 2021/5/10
*
* @author csp
*/
public class CyclicBarrierTest01 {
/**
* 案例:
* 模拟過 “王者榮耀” 遊戲開始邏輯
*/
public static void main(String[] args) {
// 第一步:定義玩家,定義5個
String[] heros = {"安琪拉", "亞瑟", "馬超", "張飛", "劉備"};
// 第二步:建立固定線程數量的線程池,線程數量為5
ExecutorService service = Executors.newFixedThreadPool(5);
// 第三步:建立barrier,parties 設定為5
CyclicBarrier barrier = new CyclicBarrier(5);
// 第四步:通過for循環開啟5任務,模拟開始遊戲,傳遞給每個任務(英雄名稱和barrier)
for (int i = 0; i < 5; i++) {
service.execute(new Player(heros[i], barrier));
}
// 所有線程執行完畢,關閉線程池釋放資源
service.shutdown();
}
/**
* 玩家線程:
*/
static class Player implements Runnable {
// 英雄名稱
private String hero;
// barrier
private CyclicBarrier barrier;
public Player(String hero, CyclicBarrier barrier) {
this.hero = hero;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 每個玩家加載進度不一樣,這裡使用随機數來模拟!
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
System.out.println(hero + ":加載進度100%,等待其他玩家加載完成中...");
// 隻有當5個玩家線程都加載完畢後,栅欄才放行!
barrier.await();
System.out.println(hero + ":發現所有英雄加載完成,開始戰鬥吧!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
運作結果如下:
張飛:加載進度100%,等待其他玩家加載完成中...
亞瑟:加載進度100%,等待其他玩家加載完成中...
馬超:加載進度100%,等待其他玩家加載完成中...
安琪拉:加載進度100%,等待其他玩家加載完成中...
劉備:加載進度100%,等待其他玩家加載完成中...
劉備:發現所有英雄加載完成,開始戰鬥吧!
張飛:發現所有英雄加載完成,開始戰鬥吧!
安琪拉:發現所有英雄加載完成,開始戰鬥吧!
馬超:發現所有英雄加載完成,開始戰鬥吧!
亞瑟:發現所有英雄加載完成,開始戰鬥吧!
3、源碼分析
成員屬性
// 重入鎖: 因為barrier實作是依賴于Condition條件隊列的,condition條件隊列必須依賴lock才能使用。
private final ReentrantLock lock = new ReentrantLock();
// 條件鎖,名稱為trip,絆倒的意思,可能是指線程來了先絆倒,等達到一定數量了再喚醒:
// 線程挂起實作使用的condition隊列。
// 條件:目前代所有線程到位,這個條件隊列内的線程才會被喚醒。
private final Condition trip = lock.newCondition();
// // 需要等待的線程數量: Barrier需要參與進來的線程數量
private final int parties;
// 當喚醒的時候執行的指令: 目前代最後一個到位的線程需要執行的事件
private final Runnable barrierCommand;
// 代: 表示barrier對象 目前 “代”
private Generation generation = new Generation();
// 目前這一代還需要等待的線程數:
// 表示目前“代”還有多少個線程未到位,初始值為parties。
private int count;
内部類
Generation,中文翻譯為"代",一代人的代,用于控制CyclicBarrier的循環使用。
比如,上面示例中的5個線程完成後進入下一代,繼續等待5個線程達到栅欄處再一起執行,而CountDownLatch則做不到這一點,CountDownLatch是一次性的,無法重置其次數。
/**
* 表示:“代”
*/
private static class Generation {
// 表示目前“代”是否被打破,如果代被打破,那麼再來到這一代的線程 就會直接抛出BrokenException異常
// 且在這一代挂起的線程都會被喚醒,然後抛出 BrokerException異常。
boolean broken = false;
}
構造方法
構造方法需要傳入一個parties變量,也就是需要等待的線程數。
/*
* parties:Barrier需要參與的線程數量,每次屏障需要參與的線程數
* barrierAction:目前“代”最後一個到位的線程,需要執行的事件(可以為null)
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
// 因為小于等于0的barrier沒有任何意義..沒有任何線程可以參與進來~
if (parties <= 0) throw new IllegalArgumentException();
// 初始化parties
this.parties = parties;
// 初始化count等于parties,後面目前代每到位一個線程,count--
this.count = parties;
// 初始化都到達栅欄處執行的指令
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
成員方法
1. nextGeneration方法
/**
* 開啟下一代的方法,當這一代所有線程到位後(假設barrierCommand不為空,還需要最後一個線程執行完事件),
* 會調用nextGeneration()開啟新的一代。
*/
private void nextGeneration() {
// 将在trip條件隊列内挂起的線程全部喚醒
trip.signalAll();
// 重置count為parties
count = parties;
// 開啟新的一代..使用一個新的generation對象,表示新的一代,新的一代和上一代沒有任何關系。
generation = new Generation();
}
2. breakBarrier方法
/**
* 打破barrier屏障,在屏障内的線程都會抛出異常..
*/
private void breakBarrier() {
// 将代中的broken設定為true,表示這一代是被打破了的,再來到這一代的線程,直接抛出異常.
generation.broken = true;
// 重置count為parties
count = parties;
// 将在trip條件隊列内挂起的線程全部喚醒,喚醒後的線程會檢查目前代是否是打破的,
// 如果是打破的話,接下來的邏輯和開啟下一代喚醒的邏輯不一樣.
trip.signalAll();
}
3. await()方法
- 每個需要在栅欄處等待的線程都需要顯式地調用
方法等待其它線程的到來。await()
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
// 調用dowait方法,不需要逾時
return dowait(true, unit.toNanos(timeout));
}
4. dowait方法(重點)
dowait()方法裡的整個邏輯分成兩部分:
(1)最後一個線程走上面的邏輯,當count減為0的時候,打破栅欄,它調用nextGeneration()方法通知條件隊列中的等待線程轉移到AQS的隊列中等待被喚醒,并進入下一代。
(2)非最後一個線程走下面的for循環邏輯,這些線程會阻塞在condition的await()方法處,它們會加入到條件隊列中,等待被通知,當它們喚醒的時候已經更新換“代”了,這時候傳回。
/**
* timed:表示目前調用await方法的線程是否指定了逾時時長,如果true 表示 線程是響應逾時的
* nanos:線程等待逾時時長納秒,如果 timed == false ===> nanos == 0
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 擷取barrier全局鎖對象
final ReentrantLock lock = this.lock;
// 加鎖
// 為什麼要加鎖呢?
// 因為 barrier的挂起和喚醒依賴的元件都是condition。
lock.lock();
try {
// 目前代: 擷取barrier目前的 “代”
final Generation g = generation;
// 檢查: 如果目前代是已經被打破狀态,則目前調用await方法的線程,直接抛出Broken異常
if (g.broken)
throw new BrokenBarrierException();
// 中斷檢查: 如果目前線程的中斷标記位為true,則打破目前代,然後目前線程抛出中斷異常
if (Thread.interrupted()) {
// 1.設定目前代的狀态為broken狀态
// 2.喚醒在trip條件隊列内的線程
breakBarrier();
throw new InterruptedException();
}
// 執行到這裡,說明 目前線程中斷狀态是正常的 false, 目前代的broken為 false(未打破狀态)
// 正常邏輯...
// count的值減1
// 假設 parties 給的是 5,那麼index對應的值為 4,3,2,1,0
int index = --count;
// 如果數量減到0了,走這段邏輯(最後一個線程走這裡):
// 條件成立:說明目前線程是最後一個到達barrier的線程,此時需要做什麼呢?
if (index == 0) { // tripped
// 标記:true表示 最後一個線程 執行cmd時未抛異常。 false,表示最後一個線程執行cmd時抛出異常了.
// cmd就是建立 barrier對象時 指定的第二個 Runnable接口實作,這個可以為null
boolean ranAction = false;
try {
// 如果初始化的時候傳了指令,這裡執行
final Runnable command = barrierCommand;
// 條件成立:說明建立barrier對象時 指定 Runnable接口了,這個時候最後一個到達的線程 就需要執行這個接口
if (command != null)
command.run();
// command.run()未抛出異常的話,那麼線程會執行到這裡。
ranAction = true;
// 調用下一代方法: 開啟新的一代
// 1.喚醒trip條件隊列内挂起的線程,被喚醒的線程 會依次 擷取到lock,然後依次退出await方法。
// 2.重置count 為 parties
// 3.建立一個新的generation對象,表示新的一代
nextGeneration();
// 傳回0,因為目前線程是此 代 最後一個到達的線程,是以Index == 0
return 0;
} finally {
if (!ranAction)
// 如果command.run()執行抛出異常的話,會進入到這裡。
breakBarrier();
}
}
// 執行到這裡,說明目前線程 并不是最後一個到達Barrier的線程..此時需要進入一個自旋中.
// 這個循環隻有非最後一個線程可以走到
// 自旋,一直到條件滿足、目前代被打破、線程被中斷,等待逾時
for (;;) {
try {
// 條件成立:說明目前線程是不指定逾時時間的
if (!timed)
// 調用condition的await()方法:
// 目前線程 會 釋放掉lock,然後進入到trip條件隊列的尾部,然後挂起自己,等待被喚醒。
trip.await();
else if (nanos > 0L)
// 逾時等待方法:
// 說明目前線程調用await方法時 是指定了逾時時間的!
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 抛出中斷異常,會進來這裡。
// 什麼時候會抛出InterruptedException異常呢?
// Node節點在 條件隊列内 時 收到中斷信号時 會抛出中斷異常!
// 條件一:g == generation 成立,說明目前代并沒有變化。
// 條件二:! g.broken 目前代如果沒有被打破,那麼目前線程就去打破,并且抛出異常..
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 執行到else有幾種情況?
// 1.代發生了變化,這個時候就不需要抛出中斷異常了,因為 代已經更新了,這裡喚醒後就走正常邏輯了..隻不過設定下 中斷标記。
// 2.代沒有發生變化,但是代被打破了,此時也不用傳回中斷異常,執行到下面的時候會抛出 brokenBarrier異常。也記錄下中斷标記位。
Thread.currentThread().interrupt();
}
}
// 喚醒後,執行到這裡,有幾種情況?
// 1.正常情況,目前barrier開啟了新的一代(trip.signalAll())
// 2.目前Generation被打破,此時也會喚醒所有在trip上挂起的線程
// 3.目前線程trip中等待逾時,然後主動轉移到 阻塞隊列 然後擷取到鎖 喚醒。
// 檢查:
// 條件成立:目前代已經被打破
if (g.broken)
// 線程喚醒後依次抛出BrokenBarrier異常。
throw new BrokenBarrierException();
// 喚醒後,執行到這裡,有幾種情況?
// 1.正常情況,目前barrier開啟了新的一代(trip.signalAll()),
// 3.目前線程trip中等待逾時,然後主動轉移到 阻塞隊列 然後擷取到鎖 喚醒。
// 正常來說這裡肯定不相等
// 因為上面打破栅欄的時候調用nextGeneration()方法時generation的引用已經變化了
// 條件成立:說明目前線程挂起期間,最後一個線程到位了,然後觸發了開啟新的一代的邏輯,此時喚醒trip條件隊列内的線程。
if (g != generation)
// 傳回目前線程的index。
return index;
// 喚醒後,執行到這裡,有幾種情況?
// 3.目前線程trip中等待逾時,然後主動轉移到阻塞隊列然後擷取到鎖 喚醒。
// 逾時檢查
if (timed && nanos <= 0L) {
// 打破barrier
breakBarrier();
// 抛出逾時異常.
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
4、總結
CyclicBarrier會使一組線程阻塞在await()處,當最後一個線程到達時喚醒(隻是從條件隊列轉移到AQS隊列中)前面的線程大家再繼續往下走;
CyclicBarrier不是直接使用AQS實作的一個同步器;
CyclicBarrier基于ReentrantLock及其Condition實作整個同步邏輯;