天天看点

Java并发编程(一):CountDownLatch使用及原理解析CountDownLatch 使用场景CountDownLatch与thread.join()的区别CountDownLatch源码分析

CountDownLatch 使用场景

CountDonwLatch可以使一个线程等待其它线程完成后再执行。 例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

使用场景及代码样例1: 主线程内执行10个子线程,子线程全部执行完毕后,主线程继续执行。
import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {


    public static void testCountDownLatch(){

        int threadCount = 10;

        final CountDownLatch latch = new CountDownLatch(threadCount);

        for(int i=0; i< threadCount; i++){

            new Thread(new Runnable() {

                @Override
                public void run() {

                    System.out.println("线程" + Thread.currentThread().getId() + "开始执行");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println("线程" + Thread.currentThread().getId() + "执行已经执行完毕");

                    latch.countDown();
                }
            }).start();
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10个子线程已经执行完毕!开始执行主线程");
    }


    public static void main(String[] args){


        testCountDownLatch();

    }

}
           

输出结果:

线程10开始执行

线程11开始执行

线程12开始执行

线程13开始执行

线程14开始执行

线程15开始执行

线程16开始执行

线程17开始执行

线程18开始执行

线程19开始执行

线程11执行已经执行完毕

线程16执行已经执行完毕

线程17执行已经执行完毕

线程14执行已经执行完毕

线程15执行已经执行完毕

线程12执行已经执行完毕

线程18执行已经执行完毕

线程13执行已经执行完毕

线程10执行已经执行完毕

线程19执行已经执行完毕

10个子线程已经执行完毕!开始执行主线程

使用场景及代码样例2: 三个工作线程等待信号线程执行完毕后一起继续执行。

工作线程代码:

public class PlayerThreadWithLatch implements Runnable{

    CountDownLatch countDownLatch;


    public PlayerThreadWithLatch(CountDownLatch countDownLatch){
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            countDownLatch.await();
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Player线程" + Thread.currentThread().getId() + "执行完毕");
    }
}

           

信号线程代码:

public static void testForMutiMaster(){
        
        int threadCount = 1;

        final CountDownLatch latch = new CountDownLatch(threadCount);

        PlayerThreadWithLatch playerThreadWithLatch1 = new PlayerThreadWithLatch(latch);
        PlayerThreadWithLatch playerThreadWithLatch2 = new PlayerThreadWithLatch(latch);
        PlayerThreadWithLatch playerThreadWithLatch3 = new PlayerThreadWithLatch(latch);

        new Thread(new Runnable() {

            @Override
            public void run() {

                System.out.println("信号线程" + Thread.currentThread().getId() + "开始执行");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("信号线程线程" + Thread.currentThread().getId() + "执行已经执行完毕");

                latch.countDown();
            }
        }).start();

        new Thread(playerThreadWithLatch1).start();
        new Thread(playerThreadWithLatch2).start();
        new Thread(playerThreadWithLatch3).start();
        
    }
    
    public static void main(String[] args){
        testForMutiMaster();
    }
           

输出结果:

信号线程10开始执行

信号线程线程10执行已经执行完毕

Player线程11执行完毕

Player线程12执行完毕

Player线程13执行完毕

CountDownLatch与thread.join()的区别

join 必须等待子线程完全结束后,主线程才能继续执行。

CountDownLatch可以在子线程的任意阶段调用countDown()方法,以唤醒等待线程。

总结:CountDownLatch 比thread.join()更灵活。

CountDownLatch源码分析

CountDownLatch 持有一个内部类 Sync, Sync 是AbstractQueuedSynchronizer的子类。 CountDownLatch是通过AbstractQueuedSynchronizer实现的线程同步。关于AbstractQueuedSynchronizer的介绍可以参考这边文章:http://ifeve.com/introduce-abstractqueuedsynchronizer/

下面来看下 CountDownLatch await()的主要逻辑:AbstractQueuedSynchronizer的doAcquireSharedInterruptibly()方法

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //此处尝试去获取锁,当 state等于0时获取到锁。
                    //state就是 CountDownLatch构造方法的参数,
                    //countDown()方法就是给这个值减1
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                //如果没有获取到锁,会通过下面这个方法阻塞线程,避免一直死循环。
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

           

看下AbstractQueuedSynchronizer parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
  }
           

是通过 LockSupport.part() 方法来阻塞线程。

当 countDown方法将 state 减为0时, 会通过 LockSupport.unpark()方法唤醒线程。

至此CountDownLatch的介绍告一段落,后面会继续介绍CyclicBarrier和 Semaphore。