天天看点

Java并发学习笔记(三)-闭锁CountDownLatch

闭锁CountDownLatch是一种同步类,它的作用相当于一扇门,在闭锁达到结束状态之前,这扇门一直是关闭的,没有线程可以通过,当达到结束状态的时候,这扇门就会打开,并且会永远保持打开状态,允许所有的线程通过。闭锁包含一个计数器,计数器就是这扇门,当计数器不为0的时候,调用latch.await方法的线程就会一直阻塞,直到计数器为0。即调用await方法的线程等待计数器为0。

public class CountDownLatchDemo {

	public static void main(String[] args) {
		CountDownLatch beginLatch = new CountDownLatch(1);//初始开始闭锁为1
		CountDownLatch endLatch = new CountDownLatch(4);//初始结束闭锁为4
		ExecutorService es = Executors.newCachedThreadPool();
		es.execute(new VoteMachine(beginLatch, endLatch));
		for (int i = 0; i < 4; i++) {
			es.execute(new Voter(beginLatch, endLatch));
		}
	}

}

class VoteMachine implements Runnable {

	private CountDownLatch beginLatch;
	private CountDownLatch endLatch;

	public VoteMachine(CountDownLatch beginLatch, CountDownLatch endLatch) {
		this.beginLatch = beginLatch;
		this.endLatch = endLatch;
	}

	@Override
	public void run() {
		try {
			TimeUnit.MILLISECONDS.sleep(1000);
			System.out.println("开始投票");
			//latch计数器减一,开始闭锁为0,可以正常投票
			beginLatch.countDown();
			//等待,直到计数器为0,所有投票人投完票
			endLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("结果出来了");
	}

}

class Voter implements Runnable {
	private CountDownLatch beginLatch;
	private CountDownLatch endLatch;

	private static int no = 0;
	private int id;

	public Voter(CountDownLatch beginLatch, CountDownLatch endLatch) {
		this.beginLatch = beginLatch;
		this.endLatch = endLatch;
		this.id = ++no;
	}

	@Override
	public void run() {
		try {
			//等待投票器准备完毕,直到计数器为0
			beginLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(id + "我投完票了");
		//投票,计数器减1
		endLatch.countDown();
	}

}
           

以上是一个投票的模拟,有两个闭锁。所有投票人都需要在投票器准备完毕之后才能开始投票,由beginLatch控制。投票器要等到所有投票人投完票才能开始统计结果,由endLatch控制。闭锁使用方法很简单,主要方法就是await和countDown。

我们来看一下闭锁的实现

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
           

构造函数有一个初始的计数作为参数,计数会保存到类Sync。这是闭锁的内部类,继承自抽象类AbstractQueuedSynchronizer,这是Java并发库实现并发的基础(有关这部分,以后详细了解后再介绍)

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
           

同样地,await方法,调用的也是Sync的方法。acquireSharedInterruptibly的实现在AbstractQueuedSynchronizer里。

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
           

方法调用tryAcquireShared方法,此方法被类Sync覆盖

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
           

如果计数为0,就不做任何额外操作,线程可以继续执行。不然,会调用doAcquireSharedInterruptibly方法,开始等待,方法的实现在类AbstractQueuedSynchronizer

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
//同前面的方法,计数为0的时候r为1,是不然r为-1
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

调用该方法,线程进入循环,只有当计数器为0的时候,r为1,跳出循环

我们再来看countDown方法

public void countDown() {
        sync.releaseShared(1);
    }
           

毫不意外地,操作还是交给sync处理,使计数器减1

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
           
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
           

如果当前计数器为0,那么什么都不做,不然,计数器减1,并将新的计数保存回sync

继续阅读