前言
AQS (AbstractQueuedSynchronizer)成为同步容器,主要用于构建锁或者其他同步组件的基础框架。通过维护一个共享状态(Volatile int state )和一个先进先出的线程等待队列来来实现一个多线访问共享的资源同步框架。这些同步框架有哪些呢?我们JUC中常用到的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。在这些同步框架中都定义了一个静态内部类Sync基础于AbstractQueuedSynchronizer并实现AQS的抽象方法来管理同步状态state。
AQS三大要素
- state
- 控制线性抢占锁的FIFO队列
- 期望同步组件类实现的获取、释放等重要方法
state
AQS提供三种方式访问sate(getState(),setState(int newState) ,compareAndSetState(int expect, int update))
volatile修饰的state保证可见性。
protected final int getState() {
return state;
}
getState():获取当前同步状态
protected final void setState(int newState) {
state = newState;
}
setState(int newState):设置当前同步状态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
compareAndSetState(int expect, int update) :使用CAS设置当前的同步状态,底层的CAS保证状态设置的原子性。
同步器主要重写的方法
以下为这里方法在AQS中定义
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire(int arg) :独占式获取同步资源,获取成功返回true,否则返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryRelease(int arg) :独占式获取释放资源,获取成功返回true,否则返回false。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
tryAcquireShared(int arg):共享时获取资源,负数表示失败;0表示成功(但没有可用资源);正数表示成功,而且有可用资源
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared(int arg):共享式释放资源,如果释放资源后允许唤醒后续等待线程则返回true,否则返回false。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
isHeldExclusively() :判断当前线程是否独占资源。
补充:什么是独占式和共享式。或者引用独占锁和共享锁更加理解些,独占锁:同一个时刻只允许只有一个线程能获取锁,其他获取不到锁的线程在同步队列中等待。当获取锁的线程释放了锁,后续的线程才能继续获取锁。ReentrantLock实现的锁就是一个独占锁。同样的如果一把锁同一个时刻可以被多个线程持有,那么该锁就是共享锁,常见的共享锁有ReentrantReadWriteLock中的读锁。
选择那些方法进行重写?
以上的方法,用protected 修饰,方法中无具体实现。因为具体实现留给子类(模板方法设计模式)。自定义的同步器一般采用独占式(重写tryAcquire()和tryRelease()方法)或者共享式(重写tryAcquireShared()和tryReleaseShared()方法)。但是也可以也可以同时实现独占和共享,例如ReentrantReadWriteLock。
同步器
下面结合源码简单分析AQS在各个同步组件中是如果使用的
CountDownLatch
CountDownLatch类主要的两个方法主要是
countDown():对count数值进行减1操作
await():调用该方法的线程会被挂起,直到count为0时,该线程才会继续执行;
下面CountDownLatch类主要结构
CountDownLatch中定义了一个私有静态内部类Sync,并且该类基础AbstractQueuedSynchronizer类,以共享的方式重写了tryAcquireShared()和tryReleaseShared()方法。其他同步器或者我们需要自定义同步器的时候可按照这个套路来。
其实AQS主要是为了维护state,那么下面我们主要看sate在CountDownLatch中是怎么使用的。
构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
//AbstractQueuedSynchronizer类中的SetSate方法
protected final void setState(int newState) {
state = newState;
}
不难发现实例一个CountDownLatch对象,其实就是将定义count值设置给底层AQS的state
await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
调用了AQS的acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //尝试获取共享锁 tryAcquireShared
doAcquireSharedInterruptibly(arg); //如果获取失败进入等待队列。
}
tryAcquireShared()
方法由 CountDownLatch 的内部类 Sync 实现
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果state为0,即getState() == 0结果true,返回1。此时上层tryAcquireShared(arg)<0得到的结果false,执行就直接返回了。但如果不为零,才进入等待队列。可以留意到调用 tryAcquireShared 仅仅检查 state值,而不会对其减 1,可以看到传入的参数 acquires根本没有用。
countDown ()方法
public void countDown() {
sync.releaseShared(1);
}
调用了AQS的releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //调用 countDown 中的 Sync 类中的方法,如果为ture
doReleaseShared();//释放调用await()的线程
return true;
}
return false;
}
而在这个方法里,
tryReleaseShared()
是由 countDown 中的 Sync 类实现,实现代码如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {//自旋
int c = getState(); //获取state的值,也即是CountDownLatch初始化时的count
if (c == 0)//如果staet为0说明,已经count已经别的线程倒数到0。直接返回false
return false;
int nextc = c-1; //否则对state进行减一
if (compareAndSetState(c, nextc)) //CAS方式更新state的值
return nextc == 0;//如果倒数到 0(即最后一次倒数),返回true。那么 releaseShared 中会调用 doReleaseShared(),让 AQS 释放资源出来。
}
}
小结
在调用CountDownLatch的
await()
方法的时,便会
尝试获取共享锁
,但是刚开始的时候
state是大于0
(实例CountDownLatch时的count参数需要大于0)的,此时(tryAcquireShared(arg) < 0为true),于是·线程会被阻塞·。state的初始值为count,当每一个线程调用·countDown()·方法时,就会对state的值进行·减一·。直到
state为0
.前面被阻塞的线程继续运行。
Semaphore
Semaphore 是信号量或许可证的意思,通过信号量可以对同一资源访问做数量的限制。适合控制有 “池” 概念的资源访问。例如控数据库连接池的并发访问数量。
下面结合源码简单分析AQS在Semaphore的使用。
构造函数
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
permits设置信号量的大小;fair设置是否公平性,分别对应这FairSync 和 NonfairSync ,这两个内部类都继承于Sync,和CountDownLatch一样Sync也是继承于AQS。默认为NonfairSync。
acquire()方法
acquire():申请许可,默认申请一个许可
acquire(int permits):申请指定数量的许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//调用了 AQS 中的 acquireSharedInterruptibly 方法
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //尝试获取信号量,获取失败返回负数,进入自旋等待队列
doAcquireSharedInterruptibly(arg); //排队
}
tryAcquireShared()
方法的具体实现在 Semaphore 内部静态类
FairSync
或
NonfairSync
中,如下:
Semaphore中的 NonfairSync
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) { //自旋
int available = getState();//查询剩余的许可数量
int remaining = available - acquires;//剩余的许可将去本次线程申请的许可
//1.如果remaining < 0,说明可用许可不足,获取失败
//2.如果remaining >= 0,则更新CAS更新许可的数量,即更新state的值,并返回0或正数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
Semaphore中的 FairSync
protected int tryAcquireShared(int acquires) {
for (;;) { //自旋
//用下面的if条件来维持公平性
if (hasQueuedPredecessors())//看是否有更早等待的线程,如果有,获取失败
return -1;
int available = getState(); //查询剩余的许可数量
int remaining = available - acquires; //剩余的许可将去本次线程申请的许可
//1.如果remaining < 0,说明可用许可不足,获取失败
//2.如果remaining >= 0,则更新CAS更新许可的数量,即更新state的值,并返回0或正数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
release()方法
release():释放许可,默认释放一个许可
release(int permits):释放指定数量的许可
public void release() {
sync.releaseShared(1); //调用AQS的releaseShared()方法
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//释放许可
doReleaseShared();//通知下个线程节点来获取资源
return true;
}
return false;
}
tryReleaseShared()
的实现也在 Semaphore 内部静态类
Sync
中,如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) { //自旋
int current = getState(); //获取当前的许可数量
int next = current + releases; //加上释放的许可
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //CAS更新许可
return true;
}
}
小结
在Semophore中,state表示
剩余的许可证(信号量)的数量
。对state操作逻辑主要放在了重写的方法
tryReleaseShared()
和
tryAcquireShared()
中,可看出Semaphore也是共享式的获取和释放资源。 获取许可时,
首先
检查剩余的许可数目是否满足本次线程需要的数目,如果
不够
则返回
负数
(获取许可失败),如果
够
则
自旋加compareAndSetState
更新state的数值,知道成功就返回正数;如果期间被别线程修改了导致剩余的许可数量不够,那么返回负数(获取许可失败)。