本人新書出版,對技術感興趣的朋友請關注:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9UFWl5WNyI2c1cFZv5kMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZwpmL1AzN0IjNzkTMyIzMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
https://mp.weixin.qq.com/s/uq2cw2Lgf-s4nPHJ4WH4aw
在前面的篇章中,講解了ReentrantLock + Condition,并講述了2者結合的一個典型應用:ArrayBlockingQueue/LinkedBlockingQueue。
今天講述2者結合的另一個典型應用:CyclicBarrier。
#CyclicBarrier的概念
要介紹CyclicBarrier這個概念,可以從下面這個比喻開始:一個小公司的所有人周六要出去團建,大家提前商定好周六早上9點公司門口集合,然後到了目的地,在分開行動。
在這裡,“公司門口集合地“就是一個CyclicBarrier:大家本來是分散的,在某個時間點集中到一塊,等所有人到齊,然後到了目的地再分散行動。
具體到代碼中,就是
CyclicBarrier cb = new CyclicBarrier(5);
cb.await(); //5個線程,各自分别調await,都阻塞。直到5個線程都掉了await()的那個時間點,5個線程同時解鎖,然後各自繼續各自的事情!
#ReentrantLock + Condition
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition(); //用于線程之間互相喚醒
private final int parties; //類似于AQS那個state原子變量,也就是總共的線程個數
...
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) { //響應中斷
breakBarrier();
throw new InterruptedException();
}
int index = --count; //每個線程調用1次await(),count--
if (index == 0) { //count減到0的時候,此線程,喚醒其他所有線程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null) //一起喚醒之後,還可以執行一個回調。
command.run();
ranAction = true;
nextGeneration(); //喚醒其他所有線程,同時把count值複原,用于下一次的CyclicBarrier(因為CyclicBarrier可以重複使用)
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) { //count > 0,說明人沒到齊,阻塞自己
try {
if (!timed)
trip.await(); //關鍵點:await阻塞自己的同時,會把鎖釋放掉,這樣别的線程就會拿到鎖,執行上面的index = count--
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation) //從阻塞中喚醒,傳回
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
整個代碼唯一不太好明白的是Generation,這個東西主要是用來複原CyclicBarrier,進而可以下1次重用。不像CountDownLatch,用完1次,就不能再用了。