AQS介绍
AQS是同步锁内实现同步的共同父类,如下UML图能看出,ReentrantLock等锁都是基于AQS。
下面主要介绍独占锁(Exclusive)的实现。
(本文参照网上资料和jdk1.8源码加上自己理解,可能会有一些论述错误的地方,望谅解并指正)
一、背景知识
AQS内部会维护一个双向链表,来存储争取锁的线程。
(图片来自网络)
Node节点的属性除了nextWaiter都是volatile的。
第二点,解释一下节点的两种状态,独占和共享: Exclusive(独占,只有一个线程能执行,如ReentrantLock),Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
AQS存在一个属性为volatile的state,这个state在不同的实现上可以有不同的含义。
- ReentrantLock的state表示是否已锁定,当调用lock时,state就会加一(可重入),unlock就会减一。
- CountdownLatch的state表示倒计时的个数,每次countDown会减一。
第三点,线程的中断是个什么?
Java中断机制是一种协作机制,也就是说通过中断并不能直接终止另一个线程,而需要被中断的线程自己处理中断。这好比是家里的父母叮嘱在外的子女要注意身体,但子女是否注意身体,怎么注意身体则完全取决于自己。这并不是使线程进入阻塞状态。
第四点 waitStatus的状态
源码里面说了很大一段,总结成下面几个意思。
注意,status表示的都是当前节点后面的节点状态
共有以下五种状态:
- CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
- SIGNAL(-1):表示后继结点在等待当前结点唤醒,也就是说后继节点被阻塞了。后继结点入队时,会将前继结点的状态更新为SIGNAL。
- CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE(-3):传播。共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
- 0:新结点入队时的默认状态。
注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
二、AQS的加锁方法
2.1、acquire(int arg)
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。比如ReentrantLock的lock方法就会调用acquire。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire会去获取锁,如果成功就返回true,否则返回false。
- addWaiter会把当前线程加入队列的尾部,并且是Exclusive。
- acquireQueued用于获取资源,如果线程被中断过返回true,此时就会执行下面的selfInterrupt()。
以ReentrantLock为例:
//FairSync
final void lock() {
acquire(1);
}
直接调用acquire。
2.2、tryAcquire(int acquires)
AQS并没有具体的实现tryAcquires,它让子类自行决定实现的方式。
//aqs并没有实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。
以ReentrantLock的tryAcquire为例。
//FairSync调用的是父类Sync的tryAcquire
//ReentrantLock,lock传入参数qcquires=1
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//ReentrantLock的state表示上锁的次数
//0表示未上锁,如果没上锁,cas上锁,然后把独占线程设置为当前的
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程就是那个独占的,也就是之前上锁的,现在是重入锁,state(c)加1(acquires)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取不到锁,返回false
return false;
}
2.3、addWaiter(Node mode)
private Node addWaiter(Node mode) {
//mode模式有独占和共享两种
//Node.EXCLUSIVE for exclusive, Node.SHARED for shared
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 先试试最快速的直接插到队尾,如果失败,走下面的enq
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2.4、enq(final Node node)
能看出是CAS自旋加入队尾。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize,队列为空,还没初始化,需要初始化
if (compareAndSetHead(new Node()))
tail = head;
// 初始化之后还会再进行一遍循环
//这个链表的创建方式是先创建了一个伪头节点,然后从下一个节点开始才是真正的node
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.5、acquireQueued(final Node node, int arg)
tryAcquire失败,addWaiter加入队列。那么下一步就应该是让他休息,等待别人唤醒他。
注意,正常情况下,该方法会一直阻塞,直到拿到锁才会返回(parkAndCheckInterrupt方法就是干这个的),但是如果抛出异常,就会移除节点,继续上抛异常。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记是否被中断过
//注意自旋
for (;;) {
final Node p = node.predecessor();//拿到前驱节点
if (p == head && tryAcquire(arg)) {
// 如果前一个节点是head,head是伪头结点,那么当前节点就是真正地第一个节点
// 再tryAcquire拿到资源,才能进入这里
setHead(node);
p.next = null; // help GC
// setHead把prev断了,现在要让head的next断掉,方便gc
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//没得拿到资源,那应该让他wait了
// 如果被中断过,要修改状态
interrupted = true;
}
} finally {
if (failed)
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
先看shouldParkAfterFailedAcquire和parkAndCheckInterrupt。在这之前,建议了解中断什么意思,开篇就提到了。(本人到这里时候已经被中断整蒙了,所以去查了一下,写在了开头)
2.5.1、 shouldParkAfterFailedAcquire(Node,Node)
检查是否可以进入waiting状态。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// pred前驱节点和node当前节点
// 当前节点的状态保存在前驱节点当中
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 前驱节点已经是signal,也就是前驱节点释放时会通知自己,可以安全的park
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 大于0只有一个状态,为cancelled。
//前驱节点取消了,那就顺着链向前找到一个非cancelled的状态(正常等待),然后排在他后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前驱正常,那就把前驱的状态设置成SIGNAL,因为前驱节点可能是new状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
总之,前一个节点必须是signal,自己才能去休息。
2.5.2、 parkAndCheckInterrupt()
检查过可以休息了,现在真正要去休息了。此方法会让线程真正的waiting。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
park会进入waiting状态。此时,只有unpark和interrupt可以唤醒他。
2.5.3、小结
acquiredQueued()分为下面几步:
- 节点进入队尾,找到一个安全休息点
- 调用park进入waiting,只有unpark和interrupt能唤醒
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1
2.6.总结
再回到acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 先调用tryAcquire尝试获取,成功就直接返回了
- 如果获取失败,需要加入队列,调用addWaiter
- acquireQueued让线程休息,轮到自己获取,并且获取到资源后才返回,如果被中断过,返回true,否则是false
- 线程在等待过程中被中断过,不响应,只有在获取资源后,才进行selfInterrupt,把中断补上
下面是流程图
(图源网络)
三、解锁
3.1、 release(int arg)
与acquire对应,release是解锁时候会调用的。以ReentrantLock为例:
public void unlock() {
sync.release(1);
}
下面是release源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
//tryRelease需要子类实现
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
用tryRelease释放锁,tryRelease需要实现AQS的类自己重写。
3.2、 tryRelease(int arg)
AQS的tryRelease同tryAcquire一样都只是抛出异常。下面以ReentrantLock的具体实现为例。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
ReentrantLock的state是加锁的层数,传入参数releases是1,所以第一行把锁的层数减一。如果c为0,说明没有锁了,返回true,并且设置当前的独占线程是null。
3.3、 unparkSuccessor(Node)
唤醒当前节点的后继节点。
private void unparkSuccessor(Node node) {
//node是当前节点
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)//更新head状态为0,head,状态为0说明下一个可以被唤醒了
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//下一个要唤醒的节点
if (s == null || s.waitStatus > 0) {
//如果为空或者被取消了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//从后向前找
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
unpark会唤醒等待队列中最靠前的,未被取消的线程。
3.4、 继续acquireQueued
另一个线程被唤醒之后,会从acquireQueued的阻塞位置恢复。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 2.在这里tryAcquire能够获取到锁了,所以进来
setHead(node);
// 3.把当前节点设置为头结点
p.next = null; // help GC
failed = false;
// 4.返回
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// 1.从这里恢复,然后继续循环
}
} finally {
if (failed)
cancelAcquire(node);
}
}
四、示例
假设ABC三个线程竞争,以ReentrantLock的公平锁为例,下面是队列的变化关系:
step1
假设最开始A线程先获取锁,没有发生竞争,不需要等待,所以队列此时是空的。
step2
现在B线程来获取锁,发现state是1,即已经加锁了。AQS会先创建一个伪头节点,然后把B进入队列。
step3
最后C线程获取,同样需要排队。
step4
A线程释放锁,会调用unparkSuccessor()把头结点的waitStatus置0,表示后续节点可以为唤醒了。
step5
B开始执行。首先tryAcquire能获取到锁,setHead(B),并把B的waitStatus设为-1,表示下一个被阻塞。此时队列状态如下:
step6
B释放锁,unparkSuccessor()唤醒首节点。
step7
C获取锁同理。
step8
C释放锁,什么也不用做,因为C是最后一个节点,C离开之后,队列为空。
五、总结
对于独占锁来说,自己实现自定义锁继承AQS基本只用实现tryAcquire和tryRelease两个方法(还有isHeldExclusively(),用condition时候才必须重写)。这两个方法内容是加锁和删除锁时候的逻辑部分,交由程序员自己编写。其他的对队列的方法,AQS都已经实现完了,可以直接拿来用。并且,AQS实现的队列是一种双向队列,当前节点的状态由前一节点保持,而且还利用一个伪头节点。
本篇文章为记录自己学习AQS的过程,如有不对,还请指正
参考文章:
https://segmentfault.com/a/1190000015804888
https://www.cnblogs.com/waterystone/p/4920797.html
https://blog.csdn.net/sscout/article/details/102616722