AQS重生之路
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。

1.1 Node分析
static final class Node {
//标记该线程是获取共享资源时被阻塞挂起后放入AQS队列
static final Node SHARED = new Node();
//标记线程是获取独占资源时被挂起后放入AQS队列的
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int PROPAGATE = -3;
//记录当前线程等待状态,可以为
//CANCELLED(线程被取消了)
//SIGNAL(线程需要被唤醒)
//CONDITION(线程在条件队列里面等待)
//PROPAGATE(释放共享资源时需要通知其他节点)
volatile int waitStatus;
volatile Node prev;
volatile Node next;
//存放进入AQS队列里面的线程
volatile Thread thread;
Node nextWaiter;
}
/**
头尾节点
*/
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
上述state变量扩展:对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于semaphore来说,state用来表示当前可用信号的个数;对于CountDownlatch来说,state用来表示计数器当前的值。
1.2 ConditionObject
**字段:**ConditionObject内部类只有两个变量分别存储条件队列的头、尾元素。
方法:
1.3 变量
方法操作
对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。
独占方式
注意:阻塞的方式进行资源获取,try相关的方法都需要子类去实现。
- void acquire(intarg)
//会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回 //失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
- void acquireInterruptibly(int arg)
- boolean release(int arg)
1.尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = ; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
共享方式
当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。比如Semaphore信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。
- void acquireShared(int arg)
会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己。 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
- void acquireSharedInterruptibly(int arg)
- boolean releaseShared(int arg)
1.4 维护AQS队列
入队操作
简述:当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列。
1.当队列为空,首先设置一个head的哨兵节点,并让尾部也指向head节点
2.第二次循环,将当前节点链接到尾部,并将tail指向尾节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1.5 AQS—条件变量的支持
主要就是通过
Lock.newCondition()
来获取条件变量,然后通过
condition.await()
和
condition.signal()
来控制线程通信。
案例如下
package demo;
import java.util.HashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
new Thread(Test::await).start();
new Thread(Test::signal).start();
}
private static void await(){
lock.lock();
try {
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private static void signal(){
lock.lock();
try {
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
a.源码分析
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建新的node节点,并插入到条件队列末尾
Node node = addConditionWaiter();
//释放当前线程获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//调用park()方法阻塞挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
...
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//将条件队列头元素移动到AQS队列
doSignal(first);
}
b.总体流程
1.当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。
2.如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。
3.这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。
4.当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。
最后使用一个图总结如下:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。