天天看點

JUC之CountDownLatch、CyclicBarrier-多線程與高并發

CountDownLatch實作了通過一個計數,來控制一個或多個線程是否需要一直阻塞等待。

CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(Number);
           

初始化時指定一個數字,通過await()方法控制線程阻塞等待。

startSignal.await();
           

然後通過countDown()方法實作對初始化時指定的數字減1,每調用一次就減1一次,直到指定的數字變量減為0,這個時候await()實作的阻塞狀态被改變,由此程式得以繼續向下執行。

doneSignal.countDown();
           

我們可以看一個JDK中官方給的例子:

例子中,用到了兩個CountDownlatch對象,一個用于控制線程開始工作,一個控制等待所有線程工作完成。

package basic.aqs;

import java.util.concurrent.CountDownLatch;

class Driver { // ...
    void main() throws InterruptedException {
      CountDownLatch startSignal = new CountDownLatch(1);
      CountDownLatch doneSignal = new CountDownLatch(15);
 
      for (int i = 0; i < 15; ++i) // create and start threads
        new Thread(new Worker(startSignal, doneSignal)).start();
 
      //doSomethingElse();            // don't let run yet開始前的準備工作
      startSignal.countDown();      // let all threads proceed 讓線程正常工作取消阻塞
      //doSomethingElse();
      doneSignal.await();           // wait for all to finish等待所有線程執行完成
    }
  }
           
package basic.aqs;

import java.util.concurrent.CountDownLatch;

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
    }
    public void run() {
      try {
        startSignal.await();
        doWork();
        doneSignal.countDown();
      } catch (InterruptedException ex) {} // return;
    }
 
    void doWork() {

    }
  }
           

另外,await方法可以實作控制等待的時間,超過這個時間的就不等待阻塞了,直接執行後面的操作。

countDownLatch.await(10,TimeUnit.MILLISECONDS);
           
CountDownLatch底層應用了AQS(AbstractQueuedSynchronizer)同步架構功能。
           

從上面的例子我們可以看到,CountDownLatch中指定的數字不斷減1,直到為0。

CyclicBarrier比CountDownLatch更新了一下,初始化指定的數字可以進行重置,然後重複這個countdown過程。由此實作了分批等待操作,比如下面這個例子,對于100個線程,每滿20個,就将這20個線程的狀态改為正常運作的狀态不再阻塞。

package basic.aqs.CyclicBarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class TestCyclicBarrier {
    public static void main(String[] args) {
        //CyclicBarrier barrier = new CyclicBarrier(20);

        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("滿人"));

        /*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
            @Override
            public void run() {
                System.out.println("滿人,發車");
            }
        });*/

        for(int i=0; i<100; i++) {

                new Thread(()->{
                    try {
                        barrier.await();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            
        }
    }
}