天天看点

JUC之CountDownLatch、CyclicBarrier-多线程与高并发

CountDownLatch实现了通过一个计数,来控制一个或多个线程是否需要一直阻塞等待。

CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(Number);
           

初始化时指定一个数字,通过await()方法控制线程阻塞等待。

startSignal.await();
           

然后通过countDown()方法实现对初始化时指定的数字减1,每调用一次就减1一次,直到指定的数字变量减为0,这个时候await()实现的阻塞状态被改变,由此程序得以继续向下执行。

doneSignal.countDown();
           

我们可以看一个JDK中官方给的例子:

例子中,用到了两个CountDownlatch对象,一个用于控制线程开始工作,一个控制等待所有线程工作完成。

package basic.aqs;

import java.util.concurrent.CountDownLatch;

class Driver { // ...
    void main() throws InterruptedException {
      CountDownLatch startSignal = new CountDownLatch(1);
      CountDownLatch doneSignal = new CountDownLatch(15);
 
      for (int i = 0; i < 15; ++i) // create and start threads
        new Thread(new Worker(startSignal, doneSignal)).start();
 
      //doSomethingElse();            // don't let run yet开始前的准备工作
      startSignal.countDown();      // let all threads proceed 让线程正常工作取消阻塞
      //doSomethingElse();
      doneSignal.await();           // wait for all to finish等待所有线程执行完成
    }
  }
           
package basic.aqs;

import java.util.concurrent.CountDownLatch;

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
    }
    public void run() {
      try {
        startSignal.await();
        doWork();
        doneSignal.countDown();
      } catch (InterruptedException ex) {} // return;
    }
 
    void doWork() {

    }
  }
           

另外,await方法可以实现控制等待的时间,超过这个时间的就不等待阻塞了,直接执行后面的操作。

countDownLatch.await(10,TimeUnit.MILLISECONDS);
           
CountDownLatch底层应用了AQS(AbstractQueuedSynchronizer)同步框架功能。
           

从上面的例子我们可以看到,CountDownLatch中指定的数字不断减1,直到为0。

CyclicBarrier比CountDownLatch升级了一下,初始化指定的数字可以进行重置,然后重复这个countdown过程。由此实现了分批等待操作,比如下面这个例子,对于100个线程,每满20个,就将这20个线程的状态改为正常运行的状态不再阻塞。

package basic.aqs.CyclicBarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class TestCyclicBarrier {
    public static void main(String[] args) {
        //CyclicBarrier barrier = new CyclicBarrier(20);

        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));

        /*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
            @Override
            public void run() {
                System.out.println("满人,发车");
            }
        });*/

        for(int i=0; i<100; i++) {

                new Thread(()->{
                    try {
                        barrier.await();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            
        }
    }
}