天天看點

Java多線程 -- JUC包源碼分析11 -- CyclicBarrier源碼分析

本人新書出版,對技術感興趣的朋友請關注:

Java多線程 -- JUC包源碼分析11 -- CyclicBarrier源碼分析

https://mp.weixin.qq.com/s/uq2cw2Lgf-s4nPHJ4WH4aw

在前面的篇章中,講解了ReentrantLock + Condition,并講述了2者結合的一個典型應用:ArrayBlockingQueue/LinkedBlockingQueue。

今天講述2者結合的另一個典型應用:CyclicBarrier。

#CyclicBarrier的概念

要介紹CyclicBarrier這個概念,可以從下面這個比喻開始:一個小公司的所有人周六要出去團建,大家提前商定好周六早上9點公司門口集合,然後到了目的地,在分開行動。

在這裡,“公司門口集合地“就是一個CyclicBarrier:大家本來是分散的,在某個時間點集中到一塊,等所有人到齊,然後到了目的地再分散行動。

具體到代碼中,就是

CyclicBarrier cb = new CyclicBarrier(5);

cb.await();  //5個線程,各自分别調await,都阻塞。直到5個線程都掉了await()的那個時間點,5個線程同時解鎖,然後各自繼續各自的事情!

           

#ReentrantLock + Condition

public class CyclicBarrier {

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();  //用于線程之間互相喚醒
    private final int parties;   //類似于AQS那個state原子變量,也就是總共的線程個數

    ...
}
           
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {         //響應中斷
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;  //每個線程調用1次await(),count--
           if (index == 0) {  //count減到0的時候,此線程,喚醒其他所有線程
               boolean ranAction = false;
               try {
		   final Runnable command = barrierCommand;
                   if (command != null)  //一起喚醒之後,還可以執行一個回調。
                       command.run();
                   ranAction = true;
                   nextGeneration(); //喚醒其他所有線程,同時把count值複原,用于下一次的CyclicBarrier(因為CyclicBarrier可以重複使用)
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            for (;;) {   //count > 0,說明人沒到齊,阻塞自己
                try {
                    if (!timed)
                        trip.await(); //關鍵點:await阻塞自己的同時,會把鎖釋放掉,這樣别的線程就會拿到鎖,執行上面的index = count--
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
			           throw ie;
		            } else {
			           Thread.currentThread().interrupt();
		             }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)   //從阻塞中喚醒,傳回
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }
           

整個代碼唯一不太好明白的是Generation,這個東西主要是用來複原CyclicBarrier,進而可以下1次重用。不像CountDownLatch,用完1次,就不能再用了。

上一篇: pandasNote3
下一篇: pandasNote1