天青色等煙雨,而我在等你,微信公衆号搜尋:徐同學呀,持續更新肝貨,快來關注我,和我一起學習吧~
更多JUC源碼解讀系列文章請持續關注JUC源碼解讀文章目錄JDK8 !
文章目錄
-
- 一、前言
- 二、獨占模式擷取鎖
-
- 1、acquireQueued做的不隻是阻塞線程
- 2、shouldParkAfterFailedAcquire通過前驅判斷是否應該阻塞
- 3、parkAndCheckInterrupt阻塞線程
- 三、獨占模式可中斷擷取鎖
-
- doAcquireInterruptibly阻塞線程并響應中斷
- 四、獨占模式可逾時擷取鎖
-
- doAcquireNanos阻塞線程自動喚醒響應中斷
- 五、獨占模式釋放鎖
-
- unparkSuccessor喚醒後繼線程
- 六、取消擷取鎖cancelAcquire
- 七、總結
一、前言
在AQS中,獨占鎖又稱為互斥鎖,其擷取與釋放鎖的過程由兩個模闆方法實作:
acquire
與
release
,其中兩個函數
tryAcquire
與
tryRelease
,AQS并沒有給出具體實作,需要子類根據實際情況自行實作,如
ReentrantLock
中實作了公平與非公平的
tryAcquire
。
一個線程擷取鎖,無非就是對state變量進行CAS修改,修改成功則擷取鎖,修改失敗則進入隊列,而AQS就是負責線程進入同步隊列以後的邏輯,如何出入隊列?如何阻塞?如何喚醒?一切的核心都在AQS裡。
二、獨占模式擷取鎖
tryAcquire
,AQS中沒有給出具體實作,暫且不談,主要看
acquireQueued
。
tryAcquire
擷取鎖失敗,傳回false,則執行
acquireQueued
,進入同步隊列流程。
addWaiter
是進入隊列的操作,其主要流程是建立節點然後将新節點CAS排到隊列尾部,而
acquireQueued
的職責是線程進入隊列之後的操作,繼續擷取鎖?還是阻塞?
public final void acquire(int arg) {
//若沒有搶到鎖,則進入同步隊列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//自己中斷自己(補償機制)
selfInterrupt();
}
1、acquireQueued做的不隻是阻塞線程
acquireQueued
要做的不隻是單純的阻塞線程,還有被喚醒或者自旋擷取鎖後出隊列。
- 擷取node節點的前驅節點,判斷其是否是head,是則繼續搶鎖(可能剛入隊列就排在head後面,也有可能自旋後,有其他節點擷取鎖出隊列,而使得node排在head後面),搶鎖成功則出隊換頭。
- node的前驅節點不是head或者搶鎖失敗,進入阻塞判斷
。shouldParkAfterFailedAcquire
- 判斷應該放心阻塞,調用
阻塞目前線程。parkAndCheckInterrupt
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//拿node的前一個節點
final Node p = node.predecessor();
//若p是頭節點,,說明自己排在隊列的第一個嘗試搶鎖
if (p == head && tryAcquire(arg)) {
//node成為新的head
setHead(node);
p.next = null; // help GC
failed = false;
//拿到鎖了傳回false
return interrupted;
}
//1.應該阻塞,調用parkAndCheckInterrupt阻塞線程
//2.不應該阻塞,再給一次搶鎖的機會
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//基本不可能走到這一步,除非是系統級别的異常導緻擷取鎖失敗for循環意外退出,
cancelAcquire(node);
}
}
2、shouldParkAfterFailedAcquire通過前驅判斷是否應該阻塞
阻塞判斷
shouldParkAfterFailedAcquire
,什麼情況下應該阻塞線程?什麼情況下應該再給一次搶鎖的機會?3種情況:
- 判斷node的前驅節點
(這裡和CLH鎖自旋檢測前驅狀态一樣),node的線程就放心阻塞,因為會在下次某個線程釋放鎖後,被node前驅喚醒。waitStatus=SIGNAL
- node的前驅節點
,waitStatus>0
隻有waitStatus
,是以node前驅節點被取消了,剔除取消節點,給node連結一個正常的前驅,然後再自旋一次。CANCELLED>0
- node的前驅節點
或者waitStatus=0
(waitStatus=PROPAGATE
是共享鎖傳播的情況暫時不考慮),此時将node前驅節點waitStatus=PROPAGATE
設定為waitStatus
,然後再給一次自旋的機會。SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* node拿鎖失敗,前驅節點的狀态是SIGNAL,node節點可以放心的阻塞,
* 因為下次會被喚醒
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* pred節點被取消了,跳過pred
*/
do {
//pred = pred.prev;
//node.pred = pred;
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
//跳過取消節點,給node找一個正常的前驅,然後再循環一次
} else {
/* 0 -3
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
3、parkAndCheckInterrupt阻塞線程
shouldParkAfterFailedAcquire
判斷應該阻塞線程,則調用
parkAndCheckInterrupt
,其内部調用
LockSupport.park(this)
阻塞目前線程。
LockSupport
對
UNSAFE
中的
park
、
unpark
進行了封裝,其能精準阻塞一個線程,也能精準喚醒一個線程(不同于
wait
和
notify
)。阻塞喚醒會導緻線程進行上下文切換。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
三、獨占模式可中斷擷取鎖
可中斷擷取鎖
acquireInterruptibly
和不可中斷擷取鎖
acquire
邏輯類似,差別在于
acquireInterruptibly
可響應
InterruptedException
中斷異常(外部調用
Thread#interrupt
,就可能會導緻中斷)。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
//有被中斷 抛異常
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly阻塞線程并響應中斷
doAcquireInterruptibly
進入隊列以後的邏輯也與
acquireQueued
差不多,差別也是在于
doAcquireInterruptibly
可響應
InterruptedException
中斷異常。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//建立node,入隊列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//判斷前驅是否是head
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
//1.應該阻塞,調用parkAndCheckInterrupt阻塞線程
//2.不應該阻塞,再給一次搶鎖的機會
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//遇到中斷抛了異常InterruptedException
throw new InterruptedException();
}
} finally {
if (failed)
//沒有擷取鎖,被中斷,取消節點
cancelAcquire(node);
}
}
四、獨占模式可逾時擷取鎖
可逾時擷取鎖,不僅可以響應中斷,還可以将線程阻塞一段時間,自動喚醒。
tryAcquireNanos
可傳入一個納秒機關的時間
nanosTimeout
,可逾時的邏輯在
doAcquireNanos
中。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
//響應中斷
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos阻塞線程自動喚醒響應中斷
doAcquireNanos
和
acquireQueued
邏輯類似,但是也可以響應中斷,同時還可以讓線程阻塞一段時間自動喚醒,如果逾時了還沒擷取鎖則傳回false。
doAcquireNanos
還有一個非常不同之處,就是即使
shouldParkAfterFailedAcquire
判斷應該阻塞了,也有可能不阻塞,還會再自旋一段時間,這個自旋的時長有一個門檻值
spinForTimeoutThreshold = 1000L
,1000納秒,自旋了1000納秒後還沒有擷取鎖,且此時也判斷應該阻塞了,就讓線程休眠一段時間。
線程喚醒,有可能是自動喚醒,有可能是被其他釋放鎖的線程喚醒,喚醒後又被中斷過則抛出異常
InterruptedException
,如果沒有中斷,則繼續循環剛才的流程(判斷前驅是否是head,判斷是否逾時,判斷是否應該阻塞)。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
//已經逾時直接傳回false,擷取鎖失敗
return false;
//計算deadline
final long deadline = System.nanoTime() + nanosTimeout;
//入隊列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//判斷前驅是否是head
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
//逾時傳回false,擷取鎖失敗
if (nanosTimeout <= 0L)
return false;
//1.應該阻塞,調用parkAndCheckInterrupt阻塞線程
//2.不應該阻塞,再給一次搶鎖的機會
//3.自旋1000納秒,還沒有擷取鎖就休眠一段時間。1毫秒=1*1000*1000納秒
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//阻塞一段時間
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
//響應中斷
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
五、獨占模式釋放鎖
釋放鎖的流程很簡單,
tryRelease
需要子類實作,暫時不考慮,當
tryRelease
釋放鎖成功後喚醒後繼節點。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//head不為空,head不是初始化,釋放鎖成功後喚醒後繼節點
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor喚醒後繼線程
喚醒後繼的條件是
h != null && h.waitStatus != 0
,head不為null且head的狀态不是初始狀态,則喚醒後繼。在獨占模式下
h.waitStatus
可能等于0,-1:
-
,線程釋放鎖,同步隊列中的節點可能剛入隊列,還沒有阻塞,是以無需喚醒隊列。h.waitStatus=0
-
,head後繼應該正常喚醒。h.waitStatus=-1
unparkSuccessor
喚醒後繼需要做如下兩步:
-
将h.waitStatus < 0
重置為0。node.waitStatus
- 喚醒後繼節點,若後繼節點為空或者被取消,則從tail向前找一個距離head最近的正常的節點喚醒。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//喚醒後繼節點的線程,若為空,從tail往後周遊找一個距離head最近的正常的節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
//這裡找到的正常節點,并沒有傳回,而是繼續往前找
s = t;
}
if (s != null)
//喚醒線程
LockSupport.unpark(s.thread);
}
node後繼是null或者取消狀态,為什麼要從tail向前找,而不是直接從head向後找到第一個正常的節點就可以傳回了?目的在于為了照顧剛入隊列的節點。看入隊列操作
addWaiter
。
節點入隊不是一個原子操作, 雖然用了
compareAndSetTail
操作保證了目前節點被設定成尾節點,但是隻能保證,此時step1和step2是執行完成的,有可能在step3還沒有來的及執行到的時候,有其他線程調用了
unparkSuccessor
方法,此時
pred.next
的值還沒有被設定成node,是以從head往後周遊可能周遊不到尾節點,但是因為尾節點此時已經設定完成,
node.prev = pred
也被執行過了,是以如果從tail往前周遊,新加的尾節點就可以周遊到了,并且可以通過它一直往前找。
總結來說,之是以從tail往前周遊,是因為在多線程并發條件下,如果一個節點的next屬性為null, 并不能保證它就是尾節點(可能是因為新加的尾節點還沒來得及執行
pred.next = node
), 但是一個節點如果能入隊, 則它的prev屬性一定是有值的,是以反向查找一定是最精确的。(這裡需要感謝這篇文章《逐行分析AQS源碼(2)——獨占鎖的釋放》,之前喚醒節點從tail往前找我了解錯了,看了這位部落客分析,才更了解了源碼的邏輯。)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
//設定node節點的上一個節點是tail
node.prev = pred; //step1
//cas設定tail指針指向node
if (compareAndSetTail(pred, node)) {//step2
pred.next = node; //step3
//mode進入尾部成功,傳回
return node;
}
}
enq(node);
return node;
}
六、取消擷取鎖cancelAcquire
什麼時候可以取消擷取鎖呢?AQS并沒有對外開放取消節點的操作權限,而是在可中斷擷取鎖的時候,中斷導緻擷取失敗,則會被取消擷取鎖。
取消擷取鎖的條件是雙向隊列,一個節點可知前驅和後繼,如單向的
CLH
隊列鎖就不能有取消的動作。而且取消這個動作還是相對複雜的,但是目的很簡單,就是剔除隊列中的取消節點。
- 首先會将node的thread設定為空。
- 其次檢查node前驅是否是取消狀态,是則循環跳過,一直為node找一個正常的前驅。
- 接着
設定為node.waitStatus
。CANCELLED
- 判斷node是否在尾部,是則tail指針前移到node前驅上,node前驅成為新的tail,其next指針(
)設定為null。predNext
- 若node不是在尾部,判斷其前驅是否是head以及是否是正常節點。node前驅不是head且正常節點,則将node後繼連結到node前驅next指針(
)上(predNext
),進而使node被剔除。Node next = node.next;compareAndSetNext(pred, predNext, next);
- 若node不是在尾部,且node前驅是head,則喚醒node的後繼。node前驅不是head,但是不正常節點(剛好被取消的),則也喚醒node的後繼,這時的喚醒不是為了讓node後繼擷取鎖,而是為node的後繼連結一個正常的前驅(node後繼自旋判斷阻塞時
,會連結一個正常的前驅)。shouldParkAfterFailedAcquire
若node的前驅是取消狀态,在跳過取消節點,找到一個正确前驅連結給node,此時新找到的前驅
pred.next
不會指向node,是以
pred.next
就不是node。類似如下的三角關系。

private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
//pred = pred.prev;
//node.prev = pred;
node.prev = pred = pred.prev;
//如果node的前驅也是取消節點,則pred.next就不是node
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
//如果node在尾部,tail前移
if (node == tail && compareAndSetTail(node, pred)) {
//node設定為null
compareAndSetNext(pred, predNext, null);
} else {
//node不在尾部
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
//前繼節點是個正常阻塞節點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//node後繼成為pred的後繼
compareAndSetNext(pred, predNext, next);
} else {
// 如果node的前驅是個head,則喚醒node後繼,
//node前繼節點不是一個正常的節點,喚醒後繼節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
七、總結
- AQS實作的鎖終歸還是個自旋鎖(
),雖然不是無限自旋,也是給了一定的自旋次數,然後再阻塞。for (;;)
- AQS之是以是CLH的變種就在于判斷阻塞時
,自旋檢查前驅節點的狀态。shouldParkAfterFailedAcquire
- 取消擷取鎖必須是雙向隊列,且隻在可中斷擷取鎖時,中斷導緻擷取鎖失敗後取消節點。取消過程中有可能喚醒取消節點的後繼,但不一定是讓其擷取鎖,而是讓其連結一個正常的前驅。
PS: 如若文章中有錯誤了解,歡迎批評指正,同時非常期待你的評論、點贊和收藏。我是徐同學,願與你共同進步!