天天看點

CyclicBarrier源碼分析

CyclicBarrier源碼分析
public class CyclicBarrierExample3 {
    private static CyclicBarrier1 barrier = new CyclicBarrier1(3, new Runnable() {
        @Override
        public void run() {
            System.out.println("callbakck ISIrunning");
        }
    });
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            /* Thread t = */new Thread(new Runnable() {
                @Override
                public void run() {
                     try {
                        barrier.await();
                        System.out.println(Thread.currentThread().getName());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },"線程"+(i+1)).start();
        }
    }
}      
public class CyclicBarrier1 {
    //一次等待中,有一個generation和count。不同的組等待generation不同。
    private volatile static int i; 
    private static class Generation {//表示一組,
        boolean broken = false;//一組等待是否壞了
        String name = "g"+(++i);
    }
    private final int parties;//線程等待數量,不變
    private int count;//等待線程數,減一,會變,剩餘等待數量
    private Generation generation = new Generation();//每一組一個generation
    
    private final ReentrantLock1 lock = new ReentrantLock1();//非公平鎖
    private final Condition trip = lock.newCondition();
    private final Runnable barrierCommand;//執行函數

    private void nextGeneration() {//線程等待數量夠了,喚醒所有線程,并在重新初始化一個generation
        //第3個線程來signalAll,第1第2個線程會從condition隊列移到AQS隊列去,
        //第3個線程來unlock,第1第2線程不一定會進來執行(因為AQS隊列前面可能還有别的線程)。
        //第3個線程unlock,隻是允許外部線程或者AQS中的線程或者condition加到AQS中的線程,喚醒一個進來。
        trip.signalAll();
        
        count = parties;//parties不會變化,count會減減,這裡重置count等着下一次的等待用。
        generation = new Generation();//重新初始化generation
    }

    private void breakBarrier() {// 
        generation.broken = true;// 這一組異常,會影響後面不是一組的線程。
        count = parties;//重置count
        trip.signalAll();//喚醒所有,condition加到AQS去。
    }

    //多線程通路
    private int dowait(boolean timed, long nanos) throws Exception {
        final ReentrantLock1 lock = this.lock;
        lock.lock();
        //進來的是AQS中的(包括condition移過去的)
        try {
            //每一組一個g,nextGeneration()會建立generation。如果改變了,這個線程使用的是上組中最後線程改變的generation。
            final Generation g = generation;
            
//            if(Thread.currentThread().getName().equals("線程4") 
//                    || Thread.currentThread().getName().equals("線程9")) {
//                Thread.currentThread().interrupt();
//            }
            
            if (g.broken)//generation不會被改變
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {// 線程中斷, 
                breakBarrier();//broken = true; 不會改變generation,後面進來的線程都使用這個generation,直接跑異常。
                throw new InterruptedException();
            }

            int index = --count;//數量減一,index儲存線上程棧中,
            if (index == 0) {  //釋放所有線程  
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //隻有一組中,最後進來的線程 改變generation。後面進來的線程屬于另一組。generation由上一組中最後進來的線程修改。
                    nextGeneration();//喚醒所有等待線程,重置count和generation。注意這裡沒有釋放鎖, 建立一個generation。
                    return 0;
                } finally {
                    if (!ranAction)//ranAction=false進去
                        breakBarrier();//broken = true;
                }
            }

            // 死循環直到  喚醒, broken, 中斷, 逾時
            for (;;) {
                try {
                    
//                    if(Thread.currentThread().getName().equals("線程4") 
//                            || Thread.currentThread().getName().equals("線程9")) {
//                        throw new Exception();
//                    }
                    
                    if (!timed)//是否有逾時
                        trip.await();//線程轉移到Condition上等待,并且head喚醒AQS下一個節點。
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//等待時間
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();//設定broken=true,重置count,condition加到AQS去。 
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)//true
                    throw new BrokenBarrierException();

                // g != generation表示正常換代了,傳回目前線程所在栅欄的下标
                // 如果 g == generation,說明還沒有換代,那為什麼會醒了?
                // 因為一個線程可以使用多個栅欄,當别的栅欄喚醒了這個線程,就會走到這裡,是以需要判斷是否是目前代。
                // 正是因為這個原因,才需要generation來保證正确。
                if (g != generation)
                    return index;//傳回

                if (timed && nanos <= 0L) {
                    breakBarrier();// 
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//外部線程或者AQS中的線程或者condition加到AQS中的線程,喚醒一個進來。
        }
    }

    public CyclicBarrier1(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;//等待線程數
        this.count = parties;//等待線程數
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier1(int parties) {
        this(parties, null);
    }

    public int getParties() {
        return parties;//parties不變
    }

    public int await() throws Exception, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit) throws Exception, BrokenBarrierException, TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    public boolean isBroken() {
        final ReentrantLock1 lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    public void reset() {
        final ReentrantLock1 lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   //  
            nextGeneration(); //  
        } finally {
            lock.unlock();
        }
    }

    public int getNumberWaiting() {
        final ReentrantLock1 lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}