CyclicBarrier源碼分析 public class CyclicBarrierExample3 {
private static CyclicBarrier1 barrier = new CyclicBarrier1(3, new Runnable() {
@Override
public void run() {
System.out.println("callbakck ISIrunning");
}
});
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
/* Thread t = */new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
},"線程"+(i+1)).start();
}
}
}
public class CyclicBarrier1 {
//一次等待中,有一個generation和count。不同的組等待generation不同。
private volatile static int i;
private static class Generation {//表示一組,
boolean broken = false;//一組等待是否壞了
String name = "g"+(++i);
}
private final int parties;//線程等待數量,不變
private int count;//等待線程數,減一,會變,剩餘等待數量
private Generation generation = new Generation();//每一組一個generation
private final ReentrantLock1 lock = new ReentrantLock1();//非公平鎖
private final Condition trip = lock.newCondition();
private final Runnable barrierCommand;//執行函數
private void nextGeneration() {//線程等待數量夠了,喚醒所有線程,并在重新初始化一個generation
//第3個線程來signalAll,第1第2個線程會從condition隊列移到AQS隊列去,
//第3個線程來unlock,第1第2線程不一定會進來執行(因為AQS隊列前面可能還有别的線程)。
//第3個線程unlock,隻是允許外部線程或者AQS中的線程或者condition加到AQS中的線程,喚醒一個進來。
trip.signalAll();
count = parties;//parties不會變化,count會減減,這裡重置count等着下一次的等待用。
generation = new Generation();//重新初始化generation
}
private void breakBarrier() {//
generation.broken = true;// 這一組異常,會影響後面不是一組的線程。
count = parties;//重置count
trip.signalAll();//喚醒所有,condition加到AQS去。
}
//多線程通路
private int dowait(boolean timed, long nanos) throws Exception {
final ReentrantLock1 lock = this.lock;
lock.lock();
//進來的是AQS中的(包括condition移過去的)
try {
//每一組一個g,nextGeneration()會建立generation。如果改變了,這個線程使用的是上組中最後線程改變的generation。
final Generation g = generation;
// if(Thread.currentThread().getName().equals("線程4")
// || Thread.currentThread().getName().equals("線程9")) {
// Thread.currentThread().interrupt();
// }
if (g.broken)//generation不會被改變
throw new BrokenBarrierException();
if (Thread.interrupted()) {// 線程中斷,
breakBarrier();//broken = true; 不會改變generation,後面進來的線程都使用這個generation,直接跑異常。
throw new InterruptedException();
}
int index = --count;//數量減一,index儲存線上程棧中,
if (index == 0) { //釋放所有線程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//隻有一組中,最後進來的線程 改變generation。後面進來的線程屬于另一組。generation由上一組中最後進來的線程修改。
nextGeneration();//喚醒所有等待線程,重置count和generation。注意這裡沒有釋放鎖, 建立一個generation。
return 0;
} finally {
if (!ranAction)//ranAction=false進去
breakBarrier();//broken = true;
}
}
// 死循環直到 喚醒, broken, 中斷, 逾時
for (;;) {
try {
// if(Thread.currentThread().getName().equals("線程4")
// || Thread.currentThread().getName().equals("線程9")) {
// throw new Exception();
// }
if (!timed)//是否有逾時
trip.await();//線程轉移到Condition上等待,并且head喚醒AQS下一個節點。
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);//等待時間
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();//設定broken=true,重置count,condition加到AQS去。
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)//true
throw new BrokenBarrierException();
// g != generation表示正常換代了,傳回目前線程所在栅欄的下标
// 如果 g == generation,說明還沒有換代,那為什麼會醒了?
// 因為一個線程可以使用多個栅欄,當别的栅欄喚醒了這個線程,就會走到這裡,是以需要判斷是否是目前代。
// 正是因為這個原因,才需要generation來保證正确。
if (g != generation)
return index;//傳回
if (timed && nanos <= 0L) {
breakBarrier();//
throw new TimeoutException();
}
}
} finally {
lock.unlock();//外部線程或者AQS中的線程或者condition加到AQS中的線程,喚醒一個進來。
}
}
public CyclicBarrier1(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;//等待線程數
this.count = parties;//等待線程數
this.barrierCommand = barrierAction;
}
public CyclicBarrier1(int parties) {
this(parties, null);
}
public int getParties() {
return parties;//parties不變
}
public int await() throws Exception, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit) throws Exception, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public boolean isBroken() {
final ReentrantLock1 lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() {
final ReentrantLock1 lock = this.lock;
lock.lock();
try {
breakBarrier(); //
nextGeneration(); //
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock1 lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}