AQS類源碼解讀
源碼作者:Doug Lea
文章作者:我
之前寫過有關AQS的源碼流程圖,但裡面沒有源碼隻有方法名,是以看的時候感覺很混亂,想必大家肯定也這麼感覺,先謝謝大家沒有吐槽我,但弟弟我知錯必改,這次我把AQS類從共享鎖和排他鎖【擷取鎖】到共享鎖和排他鎖【釋放鎖】,從邏輯分析到源碼解析都給大家安排的明明白白,由外到内一層層分析源碼, 以下内容除源碼,其他純手打(包括所有注釋)
,希望對大家有幫助哈!!!
首先了解AQS的Node内部類,AQS隊列操作離不開對Node類的操作:
// AQS是一個雙向連結清單,是以其中的節點都有自己的前驅和後繼指針
static final class Node {
// 節點類型 共享鎖,如果建立共享鎖節點,會建立一個空的Node節點
static final Node SHARED = new Node();
// 節點類型 排他鎖,如果建立排他鎖節點,會放一個null進入隊列
static final Node EXCLUSIVE = null;
/*
節點狀态 取消狀态,在釋放節點或者出現中斷操作時,會把節點置為這個狀态,
一般遇到這種節點把這個節點的前驅後繼指針指向null,便于垃圾回收
*/
static final int CANCELLED = 1;
// 節點狀态 可被喚醒狀态,這種狀态是節點的正常狀态,說明此節點可被前驅節點喚醒
static final int SIGNAL = -1;
// 節點狀态 存在于Condition隊列狀态,在Condition隊列等待被喚醒的節點就是此狀态
static final int CONDITION = -2;
// 節點狀态 屬于一個釋放共享鎖節點的過渡狀态(下方有詳解)
static final int PROPAGATE = -3;
// 節點等待狀态 存放上述節點狀态的容器
volatile int waitStatus;
// 存放前驅節點
volatile Node prev;
// 存放後繼節點
volatile Node next;
/*
存放節點中的任務 一般喚醒某個節點時,要确定這個節點的thread屬性不為null,
因為頭節點已經被喚醒了,是以頭節點的這個屬性是null
*/
volatile Thread thread;
// 存放後繼節點(下方有詳解)
Node nextWaiter;
}
大家一定會問上面既然有了next屬性,為什麼還要有nextWaiter屬性,這兩個屬性不一樣嗎?
- next屬性用于AQS同步隊列
- nextWaiter屬性用于Condition條件隊列
還有個需要注意的地方,代碼中的waitStatus會有0狀态,也就是waitStatus的預設狀态,int類型預設就是0,是以如果一個節點沒有幹過什麼事,那他的waitStatus就會是0了。
對于PROPAGATE狀态:因為在多線程環境少不了多個線程操作一個共享鎖節點的情況,一個節點成功擷取鎖之後狀态變為0,但如果發現其他線程已經在釋放這個鎖了,那就設定為這個狀态,讓後續線程能繼續釋放後繼的共享鎖,避免因多線程操作導緻鎖傳遞失敗
acquireShared():擷取共享鎖
public final void acquireShared(int arg) {
// 使用的是模闆方法設計模式,這個會調用子類的具體實作方法來嘗試擷取共享鎖
if (tryAcquireShared(arg) < 0)
// 如果嘗試擷取共享鎖失敗,就進入此方法實作阻塞。
doAcquireShared(arg);
}
doAcquireShared()
private void doAcquireShared(int arg) {
// 創新新的共享節點并插入AQS隊列
final Node node = addWaiter(Node.SHARED);
// 辨別是否入隊失敗
boolean failed = true;
try {
// 辨別是否被中斷
boolean interrupted = false;
for (;;) {
// 拿到新節點的前驅節點快照
final Node p = node.predecessor();
if (p == head) {
/*
如果前驅節點是頭節點,有可能到這個方法頭節點正好釋放鎖,是以,
調用子類的tryAcquireShared()方法嘗試能不能拿到鎖,
根據不同子類的調用傳回信号量
*/
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果拿到了鎖,那就重新設定頭節點并喚醒後繼節點執行任務
setHeadAndPropagate(node, r);
//把原頭節點的後繼指針指向null,便于GC回收
p.next = null;
// 如果中斷辨別為true,那就中斷阻塞目前線程
if (interrupted)
selfInterrupt();
// 辨別入隊成功
failed = false;
return;
}
}
/*
如果新節點的前驅節點不是頭節點,或者槍鎖失敗,
那判斷新節點是不是要進入隊列阻塞,如果需要阻塞那就通過中斷阻塞節點
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 設定中斷辨別位
interrupted = true;
}
} finally {
// 如果入隊失敗,那就消除該節點
if (failed)
cancelAcquire(node);
}
}
addWaiter()
private Node addWaiter(Node mode) {
// 新建立一個mode類型的節點,分為共享鎖、排他鎖類型
Node node = new Node(Thread.currentThread(), mode);
// 先存儲AQS隊列隊尾節點的快照,用來防止其他線程執行時修改隊列的隊尾節點
Node pred = tail;
// (下方有詳解)
if (pred != null) {
// 如果存在隊尾節點,就把新建立的共享鎖的node結點的前驅指針指向隊列的隊尾結點
node.prev = pred;
/*
為防止有其他現成已經修改了隊尾節點了,
是以使用CAS來把隊列隊尾的結點由快照擷取的pred修改為新建立的node
*/
if (compareAndSetTail(pred, node)) {
// 如果CAS替換隊尾節點成功,那就把原隊尾結點pred的後繼指針指向新建立的node節點
// 并傳回新節點
pred.next = node;
return node;
}
}
//節點入隊死循環邏輯
enq(node);
return node;
}
if (pred != null)代碼行解析:
- 這個if邏輯用到了一個【優化前置】的技術,繼續往下看大家會看到這個if分支邏輯和enq方法的死循環邏輯的一部分是相同的。
- 如果沒有線程競争,那這個if邏輯中就可以把節點加入到隊列,那就不用執行下面的enq的死循環方法,這樣就能提高系統的執行效率。
- 後面代碼會有很多用到了這種【優化前置】的技術,大家明白這種代碼優化就好,也可以說是【空間換時間】的一種技術表現形式吧。
enq()
private Node enq(final Node node) {
for (;;) {
// 先存儲AQS隊列隊尾節點的快照
Node t = tail;
if (t == null) {
// 如果沒有隊尾節點,說明隊列中沒有結點,那就建立一個空節點并CAS設定為隊首結點
if (compareAndSetHead(new Node()))
/*
CAS成功後,因為隊列現在隻有一個節點,
是以隊首和隊尾節點都設定為目前這個新的節點,并執行死循環邏輯
*/
tail = head;
} else {
/**
如果隊尾指針有值,說明目前隊列不是空隊列,
那就把新建立的共享鎖的node結點的前驅指針指向隊列的隊尾結點
*/
node.prev = t;
/*
為防止有其他現成已經修改了隊尾節點了,
是以使用CAS來把隊列隊尾的結點由快照擷取的t修改為新建立的node
*/
if (compareAndSetTail(t, node)) {
// 如果CAS替換隊尾節點成功,那就把原隊尾結點t的後繼指針指向新建立的node節點
// 并傳回原隊尾結點t
t.next = node;
return t;
}
}
}
}
AQS隊列是個雙向清單,添加新節點時,有以下流程:
先将新節點的前驅指針指向原隊尾節點。
然後再把原隊尾節點的後繼指針指向新節點。
setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
// 存放隊列頭節點的快照
Node h = head;
// 将被喚醒的節點設定成隊列的頭節點
setHead(node);
if (propagate > 0 || // 如果傳入的信号量大于0
h == null || // 如果頭節點為空
h.waitStatus < 0 || // 如果頭節點狀态不是取消狀态
(h = head) == null || // 如果頭節點為空
h.waitStatus < 0) { // 如果頭節點狀态不是取消狀态
// 存放要喚醒的頭節點的後繼節點快照
Node s = node.next;
if (s == null || s.isShared())
// 如果後繼節點為null或者後繼節點是共享鎖節點,那就喚醒共享鎖節點
doReleaseShared();
}
}
doReleaseShared()
private void doReleaseShared() {
for (;;) {
// 存放頭節點快照
Node h = head;
/*
如果隊列中還有其他排隊元素
(因為頭節點不為空,并且頭節點不是尾節點,說明隊列現在不隻有一個節點)
*/
if (h != null && h != tail) {
int ws = h.waitStatus;
/*
如果頭節點狀态是正常狀态,并且通過CAS把節點狀态修改為已被喚醒狀态,
如果修改失敗,那就執行死循環邏輯
*/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 如果修改成功,那就喚醒該頭節點執行任務
unparkSuccessor(h);
}
//如果發現該節點已經拿到鎖并狀态變為0了,那就CAS将該節點設定為PROPAGATE過渡狀态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 如果修改失敗,那就執行死循環邏輯
continue;
}
// 如果頭節點一直沒有改變過 或者 隊列為空 或者 隊列就一個節點,那就停止喚醒。
if (h == head)
break;
}
}
unparkSuccessor()
private void unparkSuccessor(Node node) {
// 擷取到要喚醒節點的目前狀态
int ws = node.waitStatus;
// 如果目前狀态不是取消狀态,說明正常,那就把狀态修改為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 擷取要喚醒節點的後繼節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
/*
如果後繼節點為null或者後繼節點是取消狀态,
那就在隊列從後往前找要喚醒節點的有效後繼節點,
并把該節點賦給剛才存放後繼節點的容器s
*/
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到了有效的後繼節點,那就使用LockSupport喚醒該後繼節點中的線程對象
if (s != null)
LockSupport.unpark(s.thread);
}
shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/*
擷取前驅節點的狀态,如果是SIGNAL,
說明還沒有被該前驅節點的前驅節點喚醒,那就需要阻塞
*/
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// 如果前驅節點是消除狀态
if (ws > 0) {
// 在隊列中從後往前尋找有效的前驅節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//将有效的前驅節點的後繼指針指向要阻塞的節點
pred.next = node;
} else {
/*
如果前驅節點不是取消狀态,不是等待被喚醒狀态,在AQS裡面就剩下PROPAGATE狀态了,
那就把這種過渡狀态設定為SIGNAL可被喚醒狀态,
node就可以等待被喚醒了
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不讓節點阻塞
return false;
}
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
// 阻塞該線程所在的對象
LockSupport.park(this);
// 判斷目前線程是否被中斷(下方有詳解)
// 此方法會判斷指定線程是不是已經被中斷,并會重置中斷狀态為false
return Thread.interrupted();
}
喚醒節點的方式:
- unpark(正常喚醒)
- interrupt(中斷喚醒)
cancelAcquire()
private void cancelAcquire(Node node) {
if (node == null)
return;
// 先将要消除節點的thread屬性設定為null
node.thread = null;
Node pred = node.prev;
// 如果要消除節點的前驅節點是CANCELLED狀态那就在隊列從該節點往前找有效的前驅節點
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 拿到有效前驅節點的後繼指針
Node predNext = pred.next;
// 将要消除節點的狀态設定為CANCELLED狀态
node.waitStatus = Node.CANCELLED;
// 如果要消除節點是尾節點,那就CAS把尾節點設定為有效的前驅節點
if (node == tail && compareAndSetTail(node, pred)) {
// 修改尾節點成功後,将新尾節點的後繼指針設定為null
compareAndSetNext(pred, predNext, null);
} else {
// 如果要消除節點不是尾節點,或者CAS尾節點失敗
int ws;
// 有效前驅節點不是頭節點
if (pred != head &&
// 有效前驅節點狀态正常
((ws = pred.waitStatus) == Node.SIGNAL ||
// 有效前驅節點狀态是PROPAGATE或者0并CAS為SIGNAL
(ws <= 0 &&
compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
//有效前驅節點線程任務不為null
pred.thread != null) {
// 總結起來就是說,如果有效的前驅節點是一個不是頭結點的正常可被喚醒的節點
Node next = node.next;
if (next != null && next.waitStatus <= 0)
/*
如果要消除節點的後繼節點不是消除狀态,
那就CAS把有效前驅節點的後繼指針指向要消除節點的後繼節點
*/
compareAndSetNext(pred, predNext, next);
} else {
// 如果要消除節點是頭節點,那就喚醒該節點的後繼節點
unparkSuccessor(node);
}
// 消除節點操作完成後,把已消除節點的後繼指針指向自己,便于GC垃圾回收
node.next = node;
}
}
acquire():擷取排他鎖
public final void acquire(int arg) {
// 子類嘗試擷取排他鎖
if (!tryAcquire(arg) &&
// 如果擷取排他鎖失敗,那就新增排他鎖節點并進行入隊操作,并判斷是否中斷阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// (下方有詳解)
selfInterrupt();
}
selfInterrupt() 方法:
- 如果acquireQueued()判斷目前狀态是已中斷狀态,那肯定是通過Thread.interrupted()方法拿到的中斷狀态,Thread.interrupted()之後會重置中斷狀态,是以需要再通過selfInterrupt()方法來中斷線程。
addWaiter()
在上面内容添加共享鎖節點時用過此方法,這兩個方法的唯一差別就是添加的鎖類型不同。
acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
// 設定入隊失敗辨別
boolean failed = true;
try {
// 中斷辨別設定為false
boolean interrupted = false;
for (;;) {
// 擷取新增結點的前驅節點
final Node p = node.predecessor();
// 如果新增節點的前驅節點是頭結點并擷取鎖成功
if (p == head && tryAcquire(arg)) {
// 設定新節點為頭結點
setHead(node);
// 把原頭結點後繼指針指向null,便于垃圾回收
p.next = null;
// 辨別修改為入隊成功
failed = false;
// 傳回中斷辨別,表明可被直接喚醒,不需要阻塞
return interrupted;
}
// 如果不是頭結點或者擷取鎖失敗,那就判斷是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果需要阻塞就使用LockSupport進行阻塞并判斷是否已被中斷
parkAndCheckInterrupt())
// 如果需要阻塞并目前已被中斷,那就修改中斷标志位
interrupted = true;
}
} finally {
// 如果入隊失敗就消除新節點,上面的擷取共享鎖源碼有講解cancelAcquire方法
if (failed)
cancelAcquire(node);
}
}
releaseShared():釋放共享鎖
public final boolean releaseShared(int arg) {
// 子類嘗試釋放共享鎖成功,那就執行釋放AQS節點邏輯
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()
private void doReleaseShared() {
for (;;) {
// 擷取頭節點快照
Node h = head;
// 如果隊列中有正在阻塞的節點
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭節點目前狀态是SIGNAL,那說明還沒有被喚醒
if (ws == Node.SIGNAL) {
// CAS修改頭節點狀态為0,如果失敗就走死循環邏輯
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 如果CAS成功,就釋放頭結點,此方法在上面有講解
unparkSuccessor(h);
}
// 如果目前狀态是0,那就CAS修改為PROPAGATE過渡狀态,失敗就進入死循環邏輯
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果頭節點一直沒有改變過 或者 隊列為空 或者 隊列就一個節點,那就停止喚醒。
if (h == head)
break;
}
}
release():釋放排他鎖
public final boolean release(int arg) {
// 子類嘗試釋放排他鎖成功
if (tryRelease(arg)) {
// 如果目前隊列頭節點不為null并且還未被釋放,那就執行喚醒邏輯
Node h = head;
if (h != null && h.waitStatus != 0)
// 此方法在上面已經講解過了
unparkSuccessor(h);
return true;
}
return false;
}
講完AQS之後,有兩個問題:
- JUC中的鎖類,不是都有公平鎖和非公平鎖之分嗎?
- 在這個AQS源碼裡面怎麼沒有提現公平和非公平呢?
這兩個問題我當時就有想過,不知道大家有沒有,我一個個給大家講解一下吧
- JUC中有常用鎖類Semaphore、CountDownLatch、CyclicBarrier、ReentrantLock、ReentrantReadWriteLock等,這些鎖類(CyclicBarrier除外)的構造器都有是否公平的辨別參數。
- JUC鎖類(CyclicBarrier除外)中都有一個繼承于AQS類的Sync内部類,以上我講的AQS源碼有tryRelease()、tryReleaseShared()、tryAcquire()、tryAcquireShared()這些方法在AQS類内部用的是模闆方法設計模式,是都沒有實作的,都是直接抛一個UnsupportedOperationException(沒有實作方法)的異常,這些方法就是在JUC各種鎖類中的Sync内部類中實作的,裡面就有對公平鎖和非公平鎖的具體判斷。
我感覺這個源碼解析配上我上一期的【AQS核心流程源碼圖像解讀】,應該就比較易懂了,大家可以試試。
AQS還有一個内部類是ConditionObject類,是Condition類的子類,Condition也是一個隊列,但底層是一個單向連結清單,對與【Condition類及其子類的源碼講解】,我就放到下期了,内容會包含Condition和ConditionObject類詳解。
對與【JUC常用鎖類的源碼講解】,我就放到下下期了,大家先消化消化AQS的源碼,JUC鎖類的拿鎖和放鎖操作都是基于AQS的。
本篇部落格要是有什麼技術問題,歡迎大家來指正。
如果大家有什麼不了解的,也歡迎大家評論,我很高興和大家一起讨論技術問題。