在前一章節我們詳細分析了
Semaphore
以及
CountDownLatch
的基本用法及實作原理,并且在将
CountDownLatch
時,也舉了一個運動員賽跑的栗子。下面我将接着這個栗子詳細分析
CyclicBarrier
的基本用法及底層實作原理。
CyclicBarrier
- 顧名思義,是一個可被循環使用的屏障。在前一章講述CountDownLatch的時候說道,
可以用在等待指定的線程數執行完成,即到達一個指定的時間節點後,同時開始其他的任務。而CountDownLatch
在其上做了更深層次的優化—當達到一個指定的節點後,CyclicBarrier會被重置,進而可以再次被利用,等待線程到達下一個時間節點。CyclicBarrier
還是沿用上一章節中,運動員的栗子。運動員需要準備,裁判需要等待所有運動員準備好後,才能開始鳴槍開始,這裡是第一個時間節點,跑完後,需要等所有運動員都到達終點後,才能開始統計各個運動員的名次,這是第二個時間節點,需要等待所有運動員的名次統計好後,才能舉行頒獎,這又是第三個時間節點。這個時候,如果使用實作的,我們需要同時定義三個CountDownLatch對象,因為CountDownLatch中的state變量為0後,不會被重置為初始值,是以不能被重複循環利用。此時可以用
CountDownLatch
來解決這個問題。
CyclicBarrier
/*
*runner-2 is Ready!
runner-3 is Ready!
runner-1 is Ready!
runner-4 is Ready!
runner-5 is Ready!
=================All runners ready============
runner-2 arrive!
runner-4 arrive!
runner-1 arrive!
runner-3 arrive!
runner-5 arrive!
=================All runners ready============
runner-2 has been recorded...
runner-1 has been recorded...
runner-4 has been recorded...
runner-5 has been recorded...
runner-3 has been recorded...
=================All runners ready============
**/
ublic class CyclicBarrierDemo {
public static void main(String[] args) {
String prefix = "runner-";
// 指定需要等待5個運動員同時完成某件事後,再執行某項任務
// 同時指定了一個lambda表達式作為第二個參數,這個參數表示所有線程到達後,執行的任務(有最後一個達到的線程執行)
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("=================All runners ready============"));
// 模拟五個運動員,即五個線程
IntStream.rangeClosed(1, 5).forEach(i -> new Runner(cyclicBarrier, prefix + i).start());
}
private static class Runner extends Thread {
private String name;
private CyclicBarrier cyclicBarrier;
public Runner(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
try {
// 随機休眠一段時間:模拟準備時間
sleepRandom();
System.out.println(this.name + " is Ready!");
// 同步阻塞,等待其他運動員準備完成
cyclicBarrier.await();
// 開始跑步後,模拟各個運動員跑步時間
sleepRandom();
System.out.println(this.name + " arrive!");
// 等待所有運動員到達現場
cyclicBarrier.await();
// 模拟記錄運動員名次的事件
sleepRandom();
System.out.println(this.name + " has been recorded...");
// 等待所有運動員,被記錄,開始頒獎
cyclicBarrier.await();
} catch (Exception e) {
System.out.println(this.name + " Error");
}
}
}
}
- 源碼分析:
沒有直接基于AQS實作同步過程,而是通過CyclicBarrier
+ReentrantLock
的方式實作的。Condition
首先一起來看一下其構造函數:構造函數比較簡單,就是完成基本的指派操作
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 指定需要等待的線程總數
this.parties = parties;
// 記錄仍然需要等待的線程個數:每達到一個線程處于阻塞狀态時,count-1,
// 當count為0時,喚醒所有等待的線程
this.count = parties;
// 指定當parties個線程達到,即count為0時,執行的回調方法。
this.barrierCommand = barrierAction;
}
接下來重點分析方法,這個方法是
await()
的核心方法。
CyclicBarrier
// 方法聲明抛出InterruptedException,表示可以相應中斷
// BrokenBarrierException表示可以中斷一次屏障,即打破CyclicBarrier中線程阻塞的狀态,使其喚醒
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 擷取目前鎖
lock.lock();
try {
// 定義了一個Generation内部類,表示一次循環
final Generation g = generation;
// 循環被打破,表示該CyclicBarrier不可用,抛出BrakenBarrierException異常
if (g.broken)
throw new BrokenBarrierException();
// 響應線程中斷
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 計數器減一,即表示一個線程任務到達。
int index = --count;
// 減為0後,即count==0,此時index==count,表示目前循環所有任務都到達,需喚醒阻塞的線程
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 這裡就是構造函數中設定的回調。不為null,則會執行回調,且隻會由最後一個到達的線程執行。
if (command != null)
command.run();
// ranAction設定為true,表示需要喚醒阻塞的線程,繼續往下執行,而不是阻塞,false表示需要打破這次循環
ranAction = true;
// 開啟下一代,即下一輪循環:其實就是設定count重新等于parties
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 判斷目前調用的是逾時等待await(timeout,unit),還是一直等待await()
if (!timed)
// 阻塞目前線程,同時釋放鎖:此處的trip就是該Lock的Condition。對Condition不了解的同學可以看前面章節。
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 目前循環不可用時,抛出異常資訊,同時喚醒所有阻塞的線程(在breakBarrier()中喚醒)
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 開始下一次循環:主要完成三件事:
// 1. trip.signalAll();喚醒所有阻塞在trip上的線程
// 2. count = parties;将需要等待的線程數重新設定回原來的parties值,以便開始下一次循環
// 3. generation = new Generation();開始下一次循環,broken預設為false
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
// 打破循環,也主要完成三件事
// 1. generation.broken = true;将目前循環的broken設定為true,後續加入CyclicBarrier的線程,即執行await()方法時,會抛出異常
// 2. count = parties;恢複count的值,同上
// 3. trip.signalAll();喚醒所有阻塞的線程。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
通過上述分析可知:
- 無論是調用
還是調用
nextGeneration()
都會喚醒目前阻塞的所有線程,隻是前者會開啟下一代,即開始下一輪循環等待。而後者不會開啟下一代,且禁止線程再次加入
breakBarrier()
的阻塞隊列中來。
CyclicBarrier
是可重用的。等待線程到達後,會開啟下一代,每一輪循環,被稱為一個Generation,也就是一個同步點,一代。
CyclicBarrier
會響應中斷。當線程沒有到齊時,如果有線程接收到了中斷信号,所有阻塞的線程都會被喚醒,且其他線程無法再加入
CyclicBarrier
可以指定回調函數,且回調函數隻會被最後到達的線程執行,而不是每個線程都執行。
CyclicBarrier