天天看点

Java多线程 -- JUC包源码分析11 -- CyclicBarrier源码分析

本人新书出版,对技术感兴趣的朋友请关注:

Java多线程 -- JUC包源码分析11 -- CyclicBarrier源码分析

https://mp.weixin.qq.com/s/uq2cw2Lgf-s4nPHJ4WH4aw

在前面的篇章中,讲解了ReentrantLock + Condition,并讲述了2者结合的一个典型应用:ArrayBlockingQueue/LinkedBlockingQueue。

今天讲述2者结合的另一个典型应用:CyclicBarrier。

#CyclicBarrier的概念

要介绍CyclicBarrier这个概念,可以从下面这个比喻开始:一个小公司的所有人周六要出去团建,大家提前商定好周六早上9点公司门口集合,然后到了目的地,在分开行动。

在这里,“公司门口集合地“就是一个CyclicBarrier:大家本来是分散的,在某个时间点集中到一块,等所有人到齐,然后到了目的地再分散行动。

具体到代码中,就是

CyclicBarrier cb = new CyclicBarrier(5);

cb.await();  //5个线程,各自分别调await,都阻塞。直到5个线程都掉了await()的那个时间点,5个线程同时解锁,然后各自继续各自的事情!

           

#ReentrantLock + Condition

public class CyclicBarrier {

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();  //用于线程之间互相唤醒
    private final int parties;   //类似于AQS那个state原子变量,也就是总共的线程个数

    ...
}
           
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {         //响应中断
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;  //每个线程调用1次await(),count--
           if (index == 0) {  //count减到0的时候,此线程,唤醒其他所有线程
               boolean ranAction = false;
               try {
		   final Runnable command = barrierCommand;
                   if (command != null)  //一起唤醒之后,还可以执行一个回调。
                       command.run();
                   ranAction = true;
                   nextGeneration(); //唤醒其他所有线程,同时把count值复原,用于下一次的CyclicBarrier(因为CyclicBarrier可以重复使用)
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            for (;;) {   //count > 0,说明人没到齐,阻塞自己
                try {
                    if (!timed)
                        trip.await(); //关键点:await阻塞自己的同时,会把锁释放掉,这样别的线程就会拿到锁,执行上面的index = count--
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
			           throw ie;
		            } else {
			           Thread.currentThread().interrupt();
		             }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)   //从阻塞中唤醒,返回
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }
           

整个代码唯一不太好明白的是Generation,这个东西主要是用来复原CyclicBarrier,从而可以下1次重用。不像CountDownLatch,用完1次,就不能再用了。

上一篇: pandasNote3
下一篇: pandasNote1