AQS介紹
AQS是同步鎖内實作同步的共同父類,如下UML圖能看出,ReentrantLock等鎖都是基于AQS。
下面主要介紹獨占鎖(Exclusive)的實作。
(本文參照網上資料和jdk1.8源碼加上自己了解,可能會有一些論述錯誤的地方,望諒解并指正)
一、背景知識
AQS内部會維護一個雙向連結清單,來存儲争取鎖的線程。
(圖檔來自網絡)
Node節點的屬性除了nextWaiter都是volatile的。
第二點,解釋一下節點的兩種狀态,獨占和共享: Exclusive(獨占,隻有一個線程能執行,如ReentrantLock),Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
AQS存在一個屬性為volatile的state,這個state在不同的實作上可以有不同的含義。
- ReentrantLock的state表示是否已鎖定,當調用lock時,state就會加一(可重入),unlock就會減一。
- CountdownLatch的state表示倒計時的個數,每次countDown會減一。
第三點,線程的中斷是個什麼?
Java中斷機制是一種協作機制,也就是說通過中斷并不能直接終止另一個線程,而需要被中斷的線程自己進行中斷。這好比是家裡的父母叮囑在外的子女要注意身體,但子女是否注意身體,怎麼注意身體則完全取決于自己。這并不是使線程進入阻塞狀态。
第四點 waitStatus的狀态
源碼裡面說了很大一段,總結成下面幾個意思。
注意,status表示的都是目前節點後面的節點狀态
共有以下五種狀态:
- CANCELLED(1):表示目前結點已取消排程。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀态,進入該狀态後的結點将不會再變化。
- SIGNAL(-1):表示後繼結點在等待目前結點喚醒,也就是說後繼節點被阻塞了。後繼結點入隊時,會将前繼結點的狀态更新為SIGNAL。
- CONDITION(-2):表示結點等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀态的結點将從等待隊列轉移到同步隊列中,等待擷取同步鎖。
- PROPAGATE(-3):傳播。共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。
- 0:新結點入隊時的預設狀态。
注意,負值表示結點處于有效等待狀态,而正值表示結點已被取消。是以源碼中很多地方用>0、<0來判斷結點的狀态是否正常。
二、AQS的加鎖方法
2.1、acquire(int arg)
此方法是獨占模式下線程擷取共享資源的頂層入口。如果擷取到資源,線程直接傳回,否則進入等待隊列,直到擷取到資源為止,且整個過程忽略中斷的影響。比如ReentrantLock的lock方法就會調用acquire。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire會去擷取鎖,如果成功就傳回true,否則傳回false。
- addWaiter會把目前線程加入隊列的尾部,并且是Exclusive。
- acquireQueued用于擷取資源,如果線程被中斷過傳回true,此時就會執行下面的selfInterrupt()。
以ReentrantLock為例:
//FairSync
final void lock() {
acquire(1);
}
直接調用acquire。
2.2、tryAcquire(int acquires)
AQS并沒有具體的實作tryAcquires,它讓子類自行決定實作的方式。
//aqs并沒有實作
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這裡之是以沒有定義成abstract,是因為獨占模式下隻用實作tryAcquire-tryRelease,而共享模式下隻用實作tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實作另一模式下的接口。說到底,Doug Lea還是站在咱們開發者的角度,盡量減少不必要的工作量。
以ReentrantLock的tryAcquire為例。
//FairSync調用的是父類Sync的tryAcquire
//ReentrantLock,lock傳入參數qcquires=1
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//ReentrantLock的state表示上鎖的次數
//0表示未上鎖,如果沒上鎖,cas上鎖,然後把獨占線程設定為目前的
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果目前線程就是那個獨占的,也就是之前上鎖的,現在是重入鎖,state(c)加1(acquires)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//擷取不到鎖,傳回false
return false;
}
2.3、addWaiter(Node mode)
private Node addWaiter(Node mode) {
//mode模式有獨占和共享兩種
//Node.EXCLUSIVE for exclusive, Node.SHARED for shared
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 先試試最快速的直接插到隊尾,如果失敗,走下面的enq
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2.4、enq(final Node node)
能看出是CAS自旋加入隊尾。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize,隊列為空,還沒初始化,需要初始化
if (compareAndSetHead(new Node()))
tail = head;
// 初始化之後還會再進行一遍循環
//這個連結清單的建立方式是先建立了一個僞頭節點,然後從下一個節點開始才是真正的node
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.5、acquireQueued(final Node node, int arg)
tryAcquire失敗,addWaiter加入隊列。那麼下一步就應該是讓他休息,等待别人喚醒他。
注意,正常情況下,該方法會一直阻塞,直到拿到鎖才會傳回(parkAndCheckInterrupt方法就是幹這個的),但是如果抛出異常,就會移除節點,繼續上抛異常。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标記是否成功拿到資源
try {
boolean interrupted = false;//标記是否被中斷過
//注意自旋
for (;;) {
final Node p = node.predecessor();//拿到前驅節點
if (p == head && tryAcquire(arg)) {
// 如果前一個節點是head,head是僞頭結點,那麼目前節點就是真正地第一個節點
// 再tryAcquire拿到資源,才能進入這裡
setHead(node);
p.next = null; // help GC
// setHead把prev斷了,現在要讓head的next斷掉,友善gc
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//沒得拿到資源,那應該讓他wait了
// 如果被中斷過,要修改狀态
interrupted = true;
}
} finally {
if (failed)
// 如果等待過程中沒有成功擷取資源(如timeout,或者可中斷的情況下被中斷了),那麼取消結點在隊列中的等待。
cancelAcquire(node);
}
}
先看shouldParkAfterFailedAcquire和parkAndCheckInterrupt。在這之前,建議了解中斷什麼意思,開篇就提到了。(本人到這裡時候已經被中斷整蒙了,是以去查了一下,寫在了開頭)
2.5.1、 shouldParkAfterFailedAcquire(Node,Node)
檢查是否可以進入waiting狀态。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// pred前驅節點和node目前節點
// 目前節點的狀态儲存在前驅節點當中
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 前驅節點已經是signal,也就是前驅節點釋放時會通知自己,可以安全的park
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 大于0隻有一個狀态,為cancelled。
//前驅節點取消了,那就順着鍊向前找到一個非cancelled的狀态(正常等待),然後排在他後面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前驅正常,那就把前驅的狀态設定成SIGNAL,因為前驅節點可能是new狀态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
總之,前一個節點必須是signal,自己才能去休息。
2.5.2、 parkAndCheckInterrupt()
檢查過可以休息了,現在真正要去休息了。此方法會讓線程真正的waiting。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
park會進入waiting狀态。此時,隻有unpark和interrupt可以喚醒他。
2.5.3、小結
acquiredQueued()分為下面幾步:
- 節點進入隊尾,找到一個安全休息點
- 調用park進入waiting,隻有unpark和interrupt能喚醒
- 被喚醒後,看自己是不是有資格能拿到号。如果拿到,head指向目前結點,并傳回從入隊到拿到号的整個過程中是否被中斷過;如果沒拿到,繼續流程1
2.6.總結
再回到acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 先調用tryAcquire嘗試擷取,成功就直接傳回了
- 如果擷取失敗,需要加入隊列,調用addWaiter
- acquireQueued讓線程休息,輪到自己擷取,并且擷取到資源後才傳回,如果被中斷過,傳回true,否則是false
- 線程在等待過程中被中斷過,不響應,隻有在擷取資源後,才進行selfInterrupt,把中斷補上
下面是流程圖
(圖源網絡)
三、解鎖
3.1、 release(int arg)
與acquire對應,release是解鎖時候會調用的。以ReentrantLock為例:
public void unlock() {
sync.release(1);
}
下面是release源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) {
//tryRelease需要子類實作
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
用tryRelease釋放鎖,tryRelease需要實作AQS的類自己重寫。
3.2、 tryRelease(int arg)
AQS的tryRelease同tryAcquire一樣都隻是抛出異常。下面以ReentrantLock的具體實作為例。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
ReentrantLock的state是加鎖的層數,傳入參數releases是1,是以第一行把鎖的層數減一。如果c為0,說明沒有鎖了,傳回true,并且設定目前的獨占線程是null。
3.3、 unparkSuccessor(Node)
喚醒目前節點的後繼節點。
private void unparkSuccessor(Node node) {
//node是目前節點
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)//更新head狀态為0,head,狀态為0說明下一個可以被喚醒了
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//下一個要喚醒的節點
if (s == null || s.waitStatus > 0) {
//如果為空或者被取消了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//從後向前找
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
unpark會喚醒等待隊列中最靠前的,未被取消的線程。
3.4、 繼續acquireQueued
另一個線程被喚醒之後,會從acquireQueued的阻塞位置恢複。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 2.在這裡tryAcquire能夠擷取到鎖了,是以進來
setHead(node);
// 3.把目前節點設定為頭結點
p.next = null; // help GC
failed = false;
// 4.傳回
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// 1.從這裡恢複,然後繼續循環
}
} finally {
if (failed)
cancelAcquire(node);
}
}
四、示例
假設ABC三個線程競争,以ReentrantLock的公平鎖為例,下面是隊列的變化關系:
step1
假設最開始A線程先擷取鎖,沒有發生競争,不需要等待,是以隊列此時是空的。
step2
現在B線程來擷取鎖,發現state是1,即已經加鎖了。AQS會先建立一個僞頭節點,然後把B進入隊列。
step3
最後C線程擷取,同樣需要排隊。
step4
A線程釋放鎖,會調用unparkSuccessor()把頭結點的waitStatus置0,表示後續節點可以為喚醒了。
step5
B開始執行。首先tryAcquire能擷取到鎖,setHead(B),并把B的waitStatus設為-1,表示下一個被阻塞。此時隊列狀态如下:
step6
B釋放鎖,unparkSuccessor()喚醒首節點。
step7
C擷取鎖同理。
step8
C釋放鎖,什麼也不用做,因為C是最後一個節點,C離開之後,隊列為空。
五、總結
對于獨占鎖來說,自己實作自定義鎖繼承AQS基本隻用實作tryAcquire和tryRelease兩個方法(還有isHeldExclusively(),用condition時候才必須重寫)。這兩個方法内容是加鎖和删除鎖時候的邏輯部分,交由程式員自己編寫。其他的對隊列的方法,AQS都已經實作完了,可以直接拿來用。并且,AQS實作的隊列是一種雙向隊列,目前節點的狀态由前一節點保持,而且還利用一個僞頭節點。
本篇文章為記錄自己學習AQS的過程,如有不對,還請指正
參考文章:
https://segmentfault.com/a/1190000015804888
https://www.cnblogs.com/waterystone/p/4920797.html
https://blog.csdn.net/sscout/article/details/102616722