掃描下方二維碼或者微信搜尋公衆号,即可關注微信公衆号,閱讀更多
菜鳥飛呀飛
和
Spring源碼分析
文章。
Java并發程式設計
文章目錄
-
- 擷取鎖:acquire()
-
- 第一步:tryAcquire()
- 第二步:addWaiter()
- 第三步:acquireQueued()
- 第四步:selfInterrupt()
- 釋放鎖:release()
- 總結
在上一篇文章中分析了隊列同步器(AQS)的設計原理,這一篇文章将結合具體的源代碼來分析AQS。如果了解了上一篇文章中介紹的同步隊列的資料結構,那麼源碼看起來相對比較好了解。讀過Spring源碼的朋友應該能感受到Spring中方法和變量的命名是非正常範的,見名知意。與Spring相比,JDK中某些類的源碼,命名則不是那麼規範,裡面存在各種簡寫,單個字母命名變量,有時候看起來十分燒腦。其中AQS中就存在這種情況,是以如果不了解AQS的設計原理,直接看源碼,就會顯得比較困難。
- AQS中擷取鎖和釋放鎖的方法很多,今天以acquire()和release()為例來進行源碼分析,其他方法的邏輯與這兩個方法邏輯基本一樣。
public static void main(String[] args) {
// 公平鎖
ReentrantLock lock = new ReentrantLock(true);
try{
// 加鎖
lock.lock();
System.out.println("Hello World");
}finally {
// 釋放鎖
lock.unlock();
}
}
- 以上面demo為例,先建立了一個公平鎖,然後調用lock()方法來加鎖,最後調用unlock()方法釋放鎖。在lock()方法中會調用到Sync的lock()方法,由于我們建立的是公平鎖,是以這個時候調用的是FailSync的lock()方法。
static final class FairSync extends Sync {
final void lock() {
// 調用acquire()擷取同步狀态
acquire(1);
}
}
擷取鎖:acquire()
- 在FailSync的lock()方法中就會調用到AQS的模闆方法:
。最終的加鎖邏輯就是在acquire()方法中實作的。acquire()
的源碼如下:acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 調用selfInterrupt()方法重新設定中斷辨別
selfInterrupt();
}
- 這個方法就3行代碼,但是邏輯相當複雜。可以拆分為四段邏輯:第一步:調用tryAcquire()方法;第二步:執行addWaiter()方法;第三步:執行acquireQueued()方法;第四步:執行selfInterrupt()方法。下面将圍繞這4步展開分析。我在
這個方法上簡單寫了些注釋,也可以直接參考。acquire()
public final void acquire(int arg) {
/*
* acquire()的作用是擷取同步狀态,這裡同步狀态的含義等價于鎖
* tryAcquire()方法是嘗試擷取同步狀态,如果該方法傳回true,表示擷取同步狀态成功;傳回false表示擷取同步狀态失敗
* 第1種情況:當tryAcquire()傳回true時,acquire()方法會直接結束。
* 因為此時!tryAcquire(arg) 就等于false,而&&判斷具有短路作用,當因為&&判斷前面的條件判斷為false,&&後面的判斷就不會進行了,是以此時就不會執行後面的if語句,此時方法就直接傳回了。
* 這種情況表示線程擷取鎖成功了
* 第2中情況:當tryAcquire()傳回false時,表示線程沒有擷取到鎖,
* 這個時候就需要将線程加入到同步隊列中了。此時 !tryAcquire() == true,是以會進行&&後面的判斷,即acquireQueued()方法的判斷,在進行acquireQueued()方法判斷之前會先執行addWaiter()方法。
*
* addWaiter()方法傳回的是目前線程代表的節點,這個方法的作用是将目前線程放入到同步隊列中。
* 然後再調用acquireQueued()方法,在這個方法中,會先判斷目前線程代表的節點是不是第二個節點,如果是就會嘗試擷取鎖,如果擷取不到鎖,線程就會被阻塞;如果擷取到鎖,就會傳回。
* 如果目前線程代表的節點不是第二個節點,那麼就會直接阻塞,隻有當擷取到鎖後,acquireQueued()方法才會傳回
*
* acquireQueued()方法如果傳回的是true,表示線程是被中斷後醒來的,此時if的條件判斷成功,就會執行selfInterrupt()方法,該方法的作用就是将目前線程的中斷辨別位設定為中斷狀态。
* 如果acquireQueued()方法傳回的是false,表示線程不是被中斷後醒來的,是正常喚醒,此時if的條件判斷不會成功。acquire()方法執行結束
*
* 總結:隻有當線程擷取到鎖時,acquire()方法才會結束;如果線程沒有擷取到鎖,那麼它就會一直阻塞在acquireQueued()方法中,那麼acquire()方法就一直不結束。
*
*/
// tryAcquire()方法是AQS中定義的一個方法,它需要同步元件的具體實作類來重寫該方法。是以在公平鎖的同步元件FairSync和非公平鎖的同步組NonfairSync中,tryAcquire()方法的實作代碼邏輯是不一樣的。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 調用selfInterrupt()方法重新設定中斷辨別
selfInterrupt();
}
第一步:tryAcquire()
- tryAcquire()是由AQS的子類重寫的方法,是以代碼邏輯和具體的鎖的實作有關系,由于本次Demo中使用的是公平鎖,是以它最終會調用FairSync類的tryAcquire()方法。tryAcquire()方法的作用就是擷取同步狀态(也就是擷取鎖),如果目前線程成功擷取到鎖,那麼就會将AQS中的同步狀态state加1,然後傳回true,如果沒有擷取到鎖,将會傳回false。FairSync類上tryAcquire()方法的源碼如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 擷取同步狀态state的值(在AQS中,state就相當于鎖,如果線程能成功修改state的值,那麼就表示該線程擷取到了鎖)
int c = getState();
if (c == 0) {
// 如果c等于0,表示還沒有任何線程擷取到鎖
/**
* 此時可能存在多個線程同時執行到這兒,均滿足c==0這個條件。
* 在if條件中,會先調用hasQueuedPredecessors()方法來判斷隊列中是否已經有線程在排隊,該方法傳回true表示有線程在排隊,傳回false表示沒有線程在排隊
* 第1種情況:hasQueuedPredecessors()傳回true,表示有線程排隊,
* 此時 !hasQueuedPredecessors() == false,由于&& 運算符的短路作用,if的條件判斷為false,那麼就不會進入到if語句中,tryAcquire()方法就會傳回false
*
* 第2種情況:hasQueuedPredecessors()傳回false,表示沒有線程排隊
* 此時 !hasQueuedPredecessors() == true, 那麼就會進行&&後面的判斷,就會調用compareAndSetState()方法去進行修改state字段的值
* compareAndSetState()方法是一個CAS方法,它會對state字段進行修改,它的傳回值結果又需要分兩種情況
* 第 i 種情況:對state字段進行CAS修改成功,就會傳回true,此時if的條件判斷就為true了,就會進入到if語句中,同時也表示目前線程擷取到了鎖。那麼最終tryAcquire()方法會傳回true
* 第 ii 種情況:如果對state字段進行CAS修改失敗,說明在這一瞬間,已經有其他線程擷取到了鎖,那麼if的條件判斷就為false了,就不會進入到if語句塊中,最終tryAcquire()方法會傳回false。
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 目前線程成功修改了state字段的值,那麼就表示目前線程擷取到了鎖,那麼就将AQS中鎖的擁有者設定為目前線程,然後傳回true。
setExclusiveOwnerThread(current);
return true;
}
}
// 如果c等于0,則表示已經有線程擷取到了鎖,那麼這個時候,就需要判斷擷取到鎖的線程是不是目前線程
else if (current == getExclusiveOwnerThread()) {
// 如果是目前線程,那麼就将state的值加1,這就是鎖的重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 因為此時肯定隻有一個線程擷取到了鎖,隻有擷取到鎖的線程才會執行到這行代碼,是以可以直接調用setState(nextc)方法來修改state的值,這兒不會存線上程安全的問題。
setState(nextc);
// 然後傳回true,表示目前線程擷取到了鎖
return true;
}
// 如果state不等于0,且目前線程也等于已經擷取到鎖的線程,那麼就傳回false,表示目前線程沒有擷取到鎖
return false;
}
}
- 在tryAcquire()方法中,先判斷了同步狀态state是不是等于0。
-
如果等于0,就表示目前還沒有線程持有到鎖,那麼這個時候就會先調用1.
方法判斷同步隊列中有沒有等待擷取鎖的線程,如果有線程在排隊,那麼目前線程肯定擷取鎖失敗(因為AQS的設計的原則是FIFO,既然前面有人已經在排隊了,你就不能插隊,老老實實去後面排隊去),那麼tryAcquire()方法會傳回false。如果同步隊列中沒有線程排隊,那麼就讓目前線程對state進行CAS操作,如果設定成功,就表示目前擷取到鎖了,傳回true;如果CAS失敗,表示在這一瞬間,鎖被其他線程搶走了,那麼目前線程就擷取鎖失敗,就傳回false。hasQueuedPredecessors()
-
如果不等于0,就表示已經有線程擷取到鎖了,那麼此時就會去判斷目前線程是不是等于已經持有鎖的線程(2.
),如果相等,就讓state加1,這就是所謂的重入鎖,重入鎖就是允許同一個線程多次擷取到鎖。getExclusiveOwnerThread()方法的作用就是傳回已經持有鎖的線程
-
state既不等于0,目前線程也不等于持有鎖的線程,那麼就傳回false,表示目前線程擷取到鎖失敗。3.
- 就這樣,通過tryAcquire()方法,就實作了線程能否擷取到鎖的功能和邏輯。在不同的鎖的實作中,tryAcquire()方法的邏輯是不一樣的,上面的示例是以公平鎖的實作舉例的。
第二步:addWaiter()
addWaiter()方法不一定會執行到,它依賴第一步tryAcquire()的執行結果。如果線程擷取鎖成功,那麼tryAcquire()方法就會傳回true,此時就不會進行後面的第二步、第三步、第四步了。隻有當線程擷取鎖失敗了,tryAcquire()方法會傳回false,這個時候需要将線程加入到同步隊列中,是以需要執行後面的步驟。
- addWaiter()方法的邏輯相對比較簡單,它的作用就是将目前線程封裝成一個Node,然後加入到同步隊列中。如果同步隊列此時還沒有初始化(也就是AQS的head、tail屬性均為null),那麼它還會進行同步隊列的初始化。源碼如下:
private Node addWaiter(Node mode) {
// 先根據目前線程,建構一個Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 然後需要把目前線程代表的節點添加到隊列的尾部,此時隊尾可能是null,因為可能這是第一次出現多個線程争搶鎖,隊列還沒有初始化,即頭尾均為null
Node pred = tail;
if (pred != null) {
// 如果隊尾不為null,就直接将目前線程代表的node添加到隊尾,修改node節點中prev的指針指向
node.prev = pred;
// 利用CAS操作設定隊尾,如果設定成功,就修改倒數第二個節點的next的指針指向,然後方法return結束
// 在多個線程并發的情況下,CAS操作可能會失敗,如果失敗,就會進入到後面的邏輯
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果隊尾為null或者CAS設定隊尾失敗,就調用enq()方法将目前線程代表的節點加入到同步隊列當中
enq(node);
// 将目前線程代表的節點傳回
return node;
}
- 在addWaiter()方法中會先判斷尾結點是不是null,如果是null,就表示同步隊列還沒有被初始化,那麼就不會進入到if的邏輯中了,将會調用
方法進行同步隊列的初始化和目前線程的入隊操作。如果尾結點不為null,就表示同步隊列已經被初始化了,隻需要将目前線程所代表的節點加入到同步隊列隊尾即可。由于加入到隊尾這個操作存在并發的可能性,是以需要利用CAS操作(即enq(node)
方法)來保證原子性,如果CAS成功,就将舊的隊尾節點的next指針指向新的隊尾節點,這樣就完成了節點加入到隊尾的操作了,同時addWaiter()方法return結束。如果CAS操作失敗,那麼就會調用compareAndSetTail()
方法來實作入隊操作。enq(node)
-
方法的源碼如下:enq(node)
private Node enq(final Node node) {
// 無限循環
for (;;) {
Node t = tail;
// 如果尾結點為null,說明這是第一次出現多個線程同時争搶鎖,是以同步隊列沒有被初始化,tail和head此時均為null
if (t == null) { // Must initialize
// 設定head節點,注意此時是new了一個Node節點,節點的next,prev,thread屬性均為null
// 這是因為對于頭結點而言,作為隊列的頭部,它的next指向本來就應該是null
// 而頭結點的thread屬性為空,這是AQS同步器特意為之的
// 頭結點的prev屬性此時為空,當進行第二次for循環的時候,tail結點就不為空了,是以不會進入到if語句中,而是進入到else語句中,在這裡對頭結點的prev進行了指派
if (compareAndSetHead(new Node()))
// 第一次初始化隊列時,頭結點和尾結點是同一個結點
tail = head;
} else {
// 隊尾結點不為null,說明隊列已經進行了初始化,這個時候隻需要将目前線程表示的node結點加入到隊列的尾部
// 設定目前線程所代表的節點的prev的指針,使其指向尾結點
node.prev = t;
// 利用CAS操作設定隊尾結點
if (compareAndSetTail(t, node)) {
// 然後修改隊列中倒數第二個節點的next指針,使其指向隊尾結點
t.next = node;
// 然後傳回的是隊列中倒數第二個節點(也就是舊隊列中的尾結點)
return t;
}
}
}
}
- 在
方法中進行了一個for(;;)的無限循環,在循環中會先判斷隊尾是不是null,是null就表示同步隊列沒有被初始化,就采用CAS操作(即enq(node)
方法)來初始化隊列,并讓tail = head。注意:在建立頭結點時,頭結點的thread、prev、next屬性均為null,在進行第二次循環時,next屬性會被指派,但是thread、prev屬性始終是null。compareAndSetHead()
- 當同步隊列初始化完成後,後面進入for循環的時候,就不會進入到if語句塊中了,而是進入到else中,此時也是利用CAS操作(即
方法)将目前線程所代表的Node節點設定為隊尾,然後修改之前隊尾的next指針,維護好隊列節點中的雙向關聯關系,就将舊的隊尾節點傳回。compareAndSetTail()
- 在執行完enq()方法後,就會回到addWaiter()方法中,然後addWaiter()方法就會将目前線程所代表的Node節點傳回。
第三步:acquireQueued()
- 在執行完addWaiter()方法後,就會執行acquireQueued()方法。該方法的作用就是讓線程以不間斷的方式擷取鎖,如果擷取不到,就會一直阻塞在這個方法裡面,直到擷取到鎖,才會從該方法裡面傳回。
- acquireQueued()方法的源碼如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 無限for循環
for (;;) {
// 擷取目前線程所代表節點的前一個節點
final Node p = node.predecessor();
// 如果前一個節點是頭結點(即目前線程是隊列中的第二個節點),那麼就調用tryAcquire()方法讓目前線程去嘗試擷取鎖
if (p == head && tryAcquire(arg)) {
// 如果目前線程擷取鎖成功,那麼就将目前線程所代表的節點設定為頭結點(因為AQS的設計原則就是:隊列中占有鎖的線程一定是頭結點)
setHead(node);
// 然後将原來的頭節點的next指針設定為null,這樣頭節點就沒有任何引用了,有助于GC回收。
p.next = null; // help GC
failed = false;
// 傳回interrupted所代表的值,表示目前線程是不是通過中斷而醒來的
return interrupted;
}
// 如果前一個節點不是頭結點 或者 前一個節點是頭結點,但是目前節點調用tryAcquire()方法擷取鎖失敗,那麼就會執行到下面的if判斷
/**
* 在if判斷中,先調用了shouldParkAfterFailedAcquire()方法。
* 在第一次調用shouldParkAfterFailedAcquire()時,肯定傳回false(為什麼會傳回false,可以看下shouldParkAfterFailedAcquire()方法的源碼)
* 由于目前代碼是處于無限for循環當中的,是以當後面出現第二次代碼執行到這兒時,會再次調用shouldParkAfterFailedAcquire()方法,此時這個方法會傳回true。
* 當shouldParkAfterFailedAcquire()傳回true時,if判斷中就會再調用parkAndCheckInterrupt()方法,該方法會将目前線程進行阻塞,直到這個線程被喚醒或者被中斷。
* 是以當線程擷取不到鎖時,就會一直阻塞到這兒。直到被其他線程喚醒,才會繼續向下執行,當線程想來後,再次進入到目前代碼的無限for循環中,除非線程擷取到鎖,才會return傳回
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果在try中出現異常,那麼此時failed就會為true,就會取消擷取鎖
// failed的初始值為true,如果try語句中不出現異常,最終線程擷取到鎖後,就會将failed置為false,那麼下面的if判斷條件就不會成立,也就不執行cancelAcquire()方法了
// 如果上面的try語句存在異常,那麼就不會将failed設定為false,那麼就會執行cancelAcquire()方法
if (failed)
cancelAcquire(node);
}
}
- 在acquireQueued()方法中也是使用了無限for循環:for(;?。在該方法中會先擷取到目前線程的前一個節點。
-
- 如果前一個節點是頭結點,那麼就會讓目前線程調用tryAcquired()方法嘗試擷取鎖。如果擷取鎖成功,就将目前線程設定為頭結點,這裡設定頭結點時隻需要調用setHead(head)方法即可,因為肯定隻有一個線程擷取到鎖,是以不用考慮并發的問題。然後舊的頭結點的next引用置為null,便于GC。接着acquireQueued()方法直接傳回。如果目前線程調用tryAcquire()方法擷取鎖失敗,就會進入到下面2中的邏輯。
-
- 如果前一個節點不是頭結點或者是頭結點但是擷取鎖失敗,就會進入到後面的邏輯中,即執行到
。if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
- 如果前一個節點不是頭結點或者是頭結點但是擷取鎖失敗,就會進入到後面的邏輯中,即執行到
- 在if判斷中,會先執行
方法,該方法用來判斷目前線程是否應該park,即是否應該将目前線程阻塞。如果傳回true,就表示需要阻塞,那麼就會接着執行parkAndCheckInterrupt()方法,在shouldParkAfterFailedAcquire()
方法中,就會将目前線程阻塞起來,直到目前線程被喚醒并擷取到鎖以後,acquireQueued()方法才會傳回。parkAndCheckInterrupt()方法的源碼如下:parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
// 将目前線程挂起
LockSupport.park(this);
return Thread.interrupted();
}
- shouldParkAfterFailedAcquire()方法是一個比較有意思的方法,在第一次調用它的時候,它會傳回false,由于acquireQueued()方法裡有一個無限for循環,是以會進行第二次調用shouldParkAfterFailedAcquire()方法,這個時候它會傳回true。shouldParkAfterFailedAcquire()方法的源碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 在初始情況下,所有的Node節點的waitStatus均為0,因為在初始化時,waitStatus字段是int類型,我們沒有顯示給它指派,是以它預設是0
// 後面waitStatus字段會被修改
// 在第一進入到shouldParkAfterFailedAcquire()時,是以waitStatus是預設值0(因為此時沒有任何地方對它進行修改),是以ws=0
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果ws = -1,就傳回true
// 如果第一次進入shouldParkAfterFailedAcquire()方法時,waitStatus=0,那麼就會進入到後面的else語句中
// 在else語句中,會将waitStatus設定為-1,那麼當後面第二次進入到shouldParkAfterFailedAcquire()方法時,就會傳回true了
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果ws既不等于-1,也不大于0,就會進入到目前else語句中
// 此時會調用CAS方法将pred節點的waitStatus字段的值改為-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 經過shouldParkAfterFailedAcquire()方法後,隊列中所有節點的狀态的示意圖如下。
- 最終對于acquireQueued()方法而言,隻有線程擷取到了鎖或者被中斷,線程才會從這個方法裡面傳回,否則它會一直阻塞在裡面。
第四步:selfInterrupt()
- 當線程從acquireQueued()方法處傳回時,傳回值有兩種情況,如果傳回false,表示線程不是被中斷才喚醒的,是以此時在acquire()方法中,if判斷不成立,就不會執行selfInterrupt()方法,而是直接傳回。如果傳回true,則表示線程是被中斷才喚醒的,由于在parkAndCheckInterrupt()方法中調用了Thread.interrupted()方法,這會将線程的中斷辨別重置,是以此時需要傳回true,使acquire()方法中的if判斷成立,然後這樣就會調用selfInterrupt()方法,将線程的中斷辨別重新設定一下。最後acquire()方法傳回。
釋放鎖:release()
- 釋放鎖的代碼實作相對比較簡單,當持有鎖的線程釋放鎖後,會喚醒同步隊列中下一個
的節點去嘗試擷取鎖。還是以上面的Demo為例,當調用lock.unlock()時,最終會調用到AQS中的release()方法,release()方法的源碼如下。處于等待狀态
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 當釋放鎖成功以後,需要喚醒同步隊列中的其他線程
Node h = head;
// 當waitStatus!=0時,表示同步隊列中還有其他線程在等待擷取鎖
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 在release()方法中會先調用AQS子類的tryRelease()方法,在本文的示例中,就是調用ReentrantLock類中Sync的tryRelease()方法,該方法就是讓目前線程釋放鎖。方法源碼如下。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 判斷目前線程是不是持有鎖的線程,如果不是就抛出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 因為上面一步已經确認了是目前線程持有鎖,是以在修改state時,肯定是線程安全的
boolean free = false;
// 因為可能鎖被重入了,重入了幾次就需要釋放幾次鎖,是以這個地方需要判斷,隻有當state=0時才表示完全釋放了鎖。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- 在tryRelease()方法中,會先檢驗目前線程是不是持有鎖的線程,如果不是持有鎖的線程,就會抛出異常(這一點很容易了解,沒有擷取到鎖,卻去調用釋放鎖的方法,這個時候就告訴你出錯了)。然後再判斷state減去1後是否等于0,如果等于0,就表示鎖被釋放了,最終tryRelease()方法就會傳回true。如果state減去1後不為0,表示鎖還沒有被釋放,最終tryRelease()方法就會傳回false。因為對于重入鎖而言,每擷取一次鎖,state就會加1,擷取了幾次鎖,就應該釋放幾次,這樣當完全釋放完時,state才會等于0。
- 當tryRelease()傳回true時,在release()方法中,就會進入到if語句塊中。在if語句塊中,會先判斷同步隊列中是否有線程在等待擷取鎖,如果有,就調用
方法來喚醒下一個處于等待狀态的線程;如果沒有,tryRelease()就會直接傳回true。unparkSuccessor()
- 如何判斷同步隊列中是否有線程在等待擷取鎖的呢?是通過
這一個判斷條件來判斷的。h != null && h.waitStatus != 0
-
- 當頭節點等于null時,這個時候肯定沒有線程在等待擷取鎖,因為同步隊列的初始化是當有線程擷取不到鎖以後,将自己加入到同步隊列的時候才初始化同步隊列的(即給head、tail指派),如果頭結點為null,說明同步隊列沒有被初始化,即沒有線程在等待擷取鎖。
-
- 在acquire()方法中我們提到了shouldParkAfterFailedAcquire()方法,該方法會讓等待擷取鎖的節點的waitStatus的值等于-1,是以當
時,可以認為同步隊列中有線程在等待擷取鎖。h != null && h.waitStatus != 0
- 在acquire()方法中我們提到了shouldParkAfterFailedAcquire()方法,該方法會讓等待擷取鎖的節點的waitStatus的值等于-1,是以當
- 如果同步隊列中有線程在等待擷取鎖,那麼此時在release()方法中調用
方法去喚醒下一個等待狀态的節點。unparkSuccessor()方法的源碼如下unparkSuccessor()
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
/**
* 同步隊列中的節點鎖代表的線程,可能被取消了,此時這個節點的waitStatus=1
* 是以這個時候利用for循環從同步隊列尾部開始向前周遊,判斷節點是不是被取消了
* 正常情況下,當頭結點釋放鎖以後,應該喚醒同步隊列中的第二個節點,但是如果第二個節點的線程被取消了,此時就不能喚醒它了,
* 就應該判斷第三個節點,如果第三個節點也被取消了,再依次往後判斷,直到第一次出現沒有被取消的節點。如果都被取消了,此時s==null,是以不會喚醒任何線程
*/
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 在unparkSuccessor()方法中,會找到頭結點後
的節點,然後将其喚醒。為什麼是第一個waitStatus小于0
?因為waitStatus=1表示線程是取消狀态,沒資格擷取鎖。最後調用LockSupport.unpark()方法喚醒符合條件的線程,此時線程被喚醒後,就回到上面waitStatus小于等于0
的parkAndCheckInterrupt()方法處,接着就執行後面的邏輯了。擷取鎖流程中
總結
- 本文通過分析擷取鎖acquire()和釋放鎖release()方法的源碼,講解了線程擷取鎖和入隊、出隊的流程。對于AQS中其他擷取鎖和釋放鎖的方法,如可響應中斷的擷取鎖,支援逾時的擷取鎖以及共享式的擷取鎖,其方法的源碼和邏輯和acquire()方法的邏輯很相似,有興趣的朋友可以自己去閱讀下源碼。
- 在本文中提到了公平鎖、非公平鎖以及重入鎖的概念,關于這些會在下一篇文章ReentrantLock的源碼分析中詳細介紹。