天天看點

深入了解ReentrantLock原理&&Condition原理

ReentrantLock 介紹

一個可重入的互斥鎖,它具有與使用{synchronized}方法和語句通路的隐式螢幕鎖相同的基本行為和語義,但它具有可擴充的能力。

一個ReentrantLock會被最後一次成功鎖定(lock)的線程擁有,在還沒解鎖(unlock)之前。當鎖沒有被其他線程擁有的話,一個線程執行『lock』方法将會傳回,擷取鎖成功。一個方法将會立即的傳回,如果目前線程已經擁有了這個鎖。可以使用『isHeldByCurrentThread』和『getHoldCount』來檢查目前線程是否持有鎖。

這個類的構造方法會接受一個可選的“fairness”參數。當該參數設定為true時,在發生多線程競争時,鎖更傾向将使用權授予給最長等待時間的線程。另外,鎖不保證任何特定的通路順序。程式在多線程情況下使用公平鎖來通路的話可能表現出較低的吞吐量(如,較慢;經常慢很多)與比使用預設設定相比,但是在擷取鎖上有較小的時間差異,并保證不會有饑餓(線程)。然而需要注意的是,公平鎖并不保證線程排程的公平性。(也就是說,即使使用公平鎖,也無法確定線程排程器是公平的。如果線程排程器選擇忽略一個線程,而該線程為了這個鎖已經等待了很長時間,那麼就沒有機會公平地處理這個鎖了)

還需要注意的是,沒有時間參數的『tryLock()』方法是沒有信譽的公平設定。它将會成功如果鎖是可擷取的,即便有其他線程正在等待擷取鎖。

除了對Lock接口的實作外,這個類還定義了一系列的public和protected方法用于檢測lock的state。這些方法中的某些方法僅用于檢測和監控。

這個類的序列化行為同lock内置的行為是一樣的:一個反序列化的鎖的狀态(state)是未鎖定的(unlocked),無論它序列化時的狀态(state)是什麼。

這個鎖支援同一個線程最大遞歸擷取鎖2147483647(即,Integer.MAX_VALUE)次。如果嘗試擷取鎖的次數操作了這個限制,那麼一個Error擷取lock方法中抛出。

AbstractQueuedSynchronizer

ReentrantLock的公平鎖和非公平鎖都是基于AbstractQueuedSynchronizer(AQS)實作的。ReentrantLock使用的是AQS的排他鎖模式,由于AQS除了排他鎖模式還有共享鎖模式,本文僅對ReentrantLock涉及到的排他鎖模式部分的内容進行介紹,關于共享鎖模式的部分會在 深入了解CountDownLatch原理一文中介紹。

AQS提供一個架構用于實作依賴于先進先出(FIFO)等待隊列的阻塞鎖和同步器(信号量,事件等)。這個類被設計與作為一個有用的基類,一個依賴單一原子值為代表狀态的多種同步器的基類。子類必須将修改這個狀态值的方法定義為受保護的方法,并且該方法會根據對象(即,AbstractQueuedSynchronizer子類)被擷取和釋放的方式來定義這個狀态。根據這些,這個類的其他方法實作所有排隊和阻塞的機制。子類能夠維護其他的狀态屬性,但是隻有使用『getState』方法、『setState』方法以及『compareAndSetState』方法來原子性的修改 int 狀态值的操作才能遵循相關同步性。

等待隊列節點類 ——— Node

等待隊列是一個CLH鎖隊列的變體。CLH通常被用于自旋鎖(CLH鎖是一種基于連結清單的可擴充、高性能、公平的自旋鎖,申請線程隻在本地變量上自旋,它不斷輪詢前驅的狀态,如果發現前驅釋放了鎖就結束自旋。)。我們用它來代替阻塞同步器,但是使用相同的基本政策,該政策是持有一些關于一個線程在它前驅節點的控制資訊。一個“status”字段在每個節點中用于保持追蹤是否一個線程需要被阻塞。一個節點會得到通知當它的前驅節點被釋放時。隊列中的每一個節點都作為一個持有單一等待線程的特定通知風格的螢幕。狀态字段不會控制線程是否被授予鎖等。一個線程可能嘗試去擷取鎖如果它在隊列的第一個。但是首先這并不保證成功,它隻是給與了競争的權力(也就是說,隊列中第一個線程嘗試擷取鎖時,并不保證一定能得到鎖,它隻是有競争鎖的權力而已)。是以目前被釋放的競争者線程可能需要重新等待擷取鎖。

(這裡說的"隊列中的第一個的線程"指的時,從隊列頭開始往下的節點中,第一個node.thread != null的線程。因為,AQS隊列的head節點是一個虛節點,不是有個有效的等待節點,是以head節點的thread是為null的。)

為了排隊進入一個CLH鎖,你可以原子性的拼接節點到隊列中作為一個新的隊尾;對于出隊,你隻要設定頭字段。(即,入隊操作時新的節點會排在CLH鎖隊列的隊尾,而出隊操作就是将待出隊的node設定為head。由此可見,在AQS中維護的這個等待隊列,head是一個無效的節點。初始化時head是一個new Node()節點;在後期的操作中,需要出隊的節點就會設定到head中。)

+------+  prev +-----+       +-----+
     head |      | <---- |     | <---- |     |  tail
          +------+       +-----+       +-----+
                

插入到一個CLH隊列的請求隻是一個對“tail”的單個原子操作,是以有一個簡單的從未入隊到入隊的原子分割點。類似的,出隊調用隻需要修改“head”。然而,節點需要更多的工作來确定他們的後繼者是誰,部分是為了處理由于逾時和中斷而導緻的可能的取消。

(也就是說,一個node的後繼節點不一定就是node.next,因為隊列中的節點可能因為逾時或中斷而取消了,而這些取消的節點此時還沒被移除隊列(也許正在移除隊列的過程中),而一個node的後繼節點指的是一個未被取消的有效節點,是以在下面的操作中你就會發現,在尋找後繼節點時,尋找的都是目前節點後面第一個有效節點,即非取消節點。)

“prev”(前驅)連接配接(原始的CLH鎖是不使用前驅連接配接的),主要用于處理取消。如果一個節點被取消了,它的後驅(通常)會重連接配接到一個未被取消的前驅。

另外我們使用“next”連接配接去實作阻塞機制。每個節點的線程ID被它們自己的節點所持有,是以前驅節點通知下一個節點可以被喚醒,這是通過周遊下一個連結(即,next字段)來确定需要喚醒的線程。後繼節點的決定必須同‘新入隊的節點在設定它的前驅節點的“next”屬性操作(即,新入隊節點為newNode,在newNode的前驅節點preNewNode進行preNewNode.next = newNode操作)’産生競争。一個解決方法是必要的話當一個節點的後繼看起來是空的時候,從原子更新“tail”向前檢測。(或者換句話說,next連結是一個優化,是以我們通常不需要反向掃描。)

取消引入了對基本算法的一些保守性。當我們必須為其他節點的取消輪詢時,我們不需要留意一個取消的節點是在我們節點的前面還是後面。它的處理方式是總是根據取消的節點喚醒其後繼節點,允許它們去連接配接到一個新的前驅節點,除非我們能夠辨別一個未被取消的前驅節點來完成這個責任。

  • waitStatus
volatile int waitStatus;
                

狀态屬性,隻有如下值:

① SIGNAL:

static final int SIGNAL = -1;

這個節點的後繼(或者即将被阻塞)被阻塞(通過park阻塞)了,是以目前節點需要喚醒它的後繼當它被釋放或者取消時。為了避免競争,擷取方法必須首先表示他們需要一個通知信号,然後再原子性的嘗試擷取鎖,如果失敗,則阻塞。

也就是說,在擷取鎖的操作中,需要確定目前node的preNode的waitStatus狀态值為’SIGNAL’,才可以被阻塞,當擷取鎖失敗時。(『shouldParkAfterFailedAcquire』方法的用意就是這)

② CANCELLED:

static final int CANCELLED = 1;

這個節點由于逾時或中斷被取消了。節點不會離開(改變)這個狀态。尤其,一個被取消的線程不再會被阻塞了。

③ CONDITION:

static final int CONDITION = -2;

這個節點目前在一個條件隊列中。它将不會被用于當做一個同步隊列的節點直到它被轉移到同步隊列中,轉移的同時狀态值(waitStatus)将會被設定為0。(這裡使用這個值将不會做任何事情與該字段其他值對比,隻是為了簡化機制)。

④ PROPAGATE:

static final int PROPAGATE = -3;

一個releaseShared操作必須被廣播給其他節點。(隻有頭節點的)該值會在doReleaseShared方法中被設定去確定持續的廣播,即便其他操作的介入。

⑤ 0:不是上面的值的情況。

這個值使用數值排列以簡化使用。非負的值表示該節點不需要信号(通知)。是以,大部分代碼不需要去檢查這個特殊的值,隻是為了辨別。

對于正常的節點該字段會被初始化為0,競争節點該值為CONDITION。這個值使用CAS修改(或者可能的話,無競争的volatile寫)。

  • prev
volatile Node prev
                

連接配接到前驅節點,目前節點/線程依賴與這個節點waitStatus的檢測。配置設定發生在入隊時,并在出隊時清空(為了GC)。并且,一個前驅的取消,我們将短路當發現一個未被取消的節點時,未被取消的節點總是存在因為頭節點不能被取消:隻有在擷取鎖操作成功的情況下一個節點才會成為頭節點。一個被取消的線程絕不會擷取成功,一個線程隻能被它自己取消,不能被其他線程取消。

  • next
volatile Node next
                

連接配接到後繼的節點,該節點是目前的節點/線程釋放喚醒的節點。配置設定發生在入隊時,在繞過取消的前驅節點時進行調整,并在出隊列時清空(為了GC的緣故)。一個入隊操作(enq)不會被配置設定到前驅節點的next字段,直到tail成功指向目前節點之後(通過CAS來将tail指向目前節點。『enq』方法實作中,會先将node.prev = oldTailNode;在需要在CAS成功之後,即tail = node之後,再将oldTailNode.next = node;),是以當看到next字段為null時并不意味着目前節點是隊列的尾部了。無論如何,如果一個next字段顯示為null,我們能夠從隊列尾向前掃描進行複核。被取消的節點的next字段會被設定為它自己,而不是一個null,這使得isOnSyncQueue方法更簡單。

  • thread
volatile Thread thread
                

這個節點的入隊線程。在建構時初始化,在使用完後清除。

  • nextWaiter
Node nextWaiter
                

連結下一個等待條件的節點,或者一個指定的SHARED值。因為隻有持有排他鎖時能通路條件隊列,是以我們隻需要一個簡單的單連結清單來維持正在等待條件的節點。它們接下來會被轉換到隊列中以去重新擷取鎖。因為隻有排他鎖才有conditions,是以我們使用給一個特殊值儲存的字段來表示共享模式。

也就是說,nextWaiter用于在排他鎖模式下表示正在等待條件的下一個節點,因為隻有排他鎖模式有conditions;是以在共享鎖模式下,我們使用’SHARED’這個特殊值來表示該字段。

源碼分析

初始化
  • 初始化 ———— 公平鎖:
ReentrantLock lock = new ReentrantLock(true)
                
  • 初始化 ———— 非公平鎖:
ReentrantLock lock = new ReentrantLock()
                

ReentrantLock lock = new ReentrantLock(false)

lock
public void lock() {
    sync.lock();
}
                

擷取鎖。

如果其他線程沒有持有鎖的話,擷取鎖并且立即傳回,設定鎖被持有的次數為1.

如果目前線程已經持有鎖了,那麼隻有鎖的次數加1,并且方法立即傳回。

如果其他線程持有了鎖,那麼目前線程會由于線程排程變得不可用,并處于休眠狀态直到目前線程擷取到鎖,此時目前線程持有鎖的次數被設定為1次。

  • 公平鎖『lock()』方法的實作:

    『FairSync#lock()』

final void lock() {
    acquire(1);
}
                

調用『acquire』在再次嘗試擷取鎖失敗的情況下,會将目前線程入隊至等待隊列。該方法會在成功擷取鎖的情況下才會傳回。是以該方法是可能導緻阻塞的(線程挂起)。

  • 非公平鎖『lock()』方法的實作:

    『NonfairSync#lock()』

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
                

① 嘗試擷取鎖,若『compareAndSetState(0, 1)』操作成功(這步操作有兩層意思。第一,目前state為0,說明目前鎖沒有被任何線程持有;第二,原子性的将state從’0’修改為’1’成功,說明目前線程成功擷取了這個鎖),則說明目前線程成功擷取鎖。那麼設定鎖的持有者為目前線程(『setExclusiveOwnerThread(Thread.currentThread())』)。

那麼此時,AQS state為1,鎖的owner為目前線程。結束方法。

② 如果擷取鎖失敗,則調用『acquire』在再次嘗試擷取鎖失敗的情況下,會将目前線程入隊至等待隊列。該方法會在成功擷取鎖的情況下才會傳回。是以該方法是可能導緻阻塞的(線程挂起)。

  • 公共方法『AbstractQueuedSynchronizer#acquire』
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
                

在排他模式下嘗試獲,忽略中斷。該方法的實作至少執行一次『tryAcquire』方法,如果成功擷取鎖則傳回。否則,線程将入隊等待隊列中,可能會重複阻塞和解除阻塞,以及調用『tryAcquire』直到成功擷取鎖。這個方法能被用于實作『Lock#lock』

① 執行『tryAcquire』來嘗試擷取鎖,如果成功(即,傳回true)。則傳回true退出方法;否則到第②步

② 執行『acquireQueued(addWaiter(Node.EXCLUSIVE), arg)』

『addWaiter(Node.EXCLUSIVE)』:為目前線程建立一個排他模式的Node,并将這個節點加入等待隊列尾部。

『acquireQueued』:已經入隊的節點排隊等待擷取鎖。

③ 如果在嘗試擷取鎖的過程中發現線程被标志位了中斷。因為是通過『Thread.interrupted()』方法來檢測的目前線程是否有被标志位中斷,該方法會清除中斷标志,是以如果線程在嘗試擷取鎖的過程中發現被辨別為了中斷,則需要重新調用『Thread.currentThread().interrupt();』重新将中斷标志置位。

該方法是排他模式下擷取鎖的方法,并且該方法忽略中斷,也就說中斷不會導緻該方法的結束。首先,會嘗試通過不公平的方式立即搶占該鎖(『tryAcquire』),如果擷取鎖成功,則結束方法。否則,将目前線程加入到等待擷取鎖的隊列中,如果目前線程還未入隊的話。此後就需要在隊列中排隊擷取鎖了,而這就不同于前面非公平的方式了,它會根據FIFO的公平方式來嘗試擷取這個鎖。而這個方法會一直“阻塞”直到成功擷取到鎖了才會傳回。注意,這裡的“阻塞”并不是指線程一直被挂起這,它可能被喚醒,然後同其他線程(比如,那麼嘗試非公平擷取該鎖的線程)競争這個鎖,如果失敗,它會繼續被挂起,等待被喚醒,再重新嘗試擷取鎖,直到成功。

同時注意,關于中斷的操作。因為該方法是不可中斷的方法,是以若在該方法的執行過程中線程被标志位了中斷,我們需要確定這個标志位不會因為方法的調用而被清除,也就是我們不進行中斷,但是外層的邏輯可能會對中斷做相關的處理,我們不應該影響中斷的狀态,即,“私自”在不進行中斷的情況下将中斷标志清除了。

先繼續來看公平鎖和非公平鎖對『tryAcquire』方法的實作

tryAcquire 這類型的方法都不會導緻阻塞(即,線程挂起)。它會嘗試擷取鎖,如果失敗就傳回false。

  • FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
                

公平版本的『tryAcquire』方法。目前線程沒有權限擷取鎖,除非遞歸調用到沒有等待者了,或者目前線程就是第一個嘗試擷取鎖的線程(即,等待隊列中沒有等待擷取鎖的線程)。

① 擷取AQS state,即,目前鎖被擷取的次數。如果為’0’,則說明目前鎖沒有被任何線程擷取,那麼執行『hasQueuedPredecessors』方法判斷目前線程之前是否有比它等待更久準備擷取鎖的線程:

a)如果有則方法結束,傳回false;

b)如果沒有,則說明目前線程前面沒有另一個比它等待更久的時間在等待擷取這個鎖的線程,則嘗試通過CAS的方式讓目前的線程擷取鎖。如果成功則設定持有鎖的線程為目前線程(『setExclusiveOwnerThread(current)』),然後方法結束傳回true。

② 如果AQS state > 0,則說明目前鎖已經被某個線程所持有了,那麼判斷這個持有鎖的線程是否就是目前線程(『current == getExclusiveOwnerThread()』),如果是的話,嘗試進行再次擷取這個鎖(ReentrantLock是一個可重入的鎖)如果擷取鎖的次數沒有超過上限的話(即,c + acquires > 0),則更新state的值為最終該鎖被目前線程擷取的次數,然後方法結束傳回true;否則,如果目前線程擷取這個鎖的次數超過了上限則或抛出Error異常。再者如果目前線程不是持有鎖的線程,則方法結束傳回false。

『AbstractQueuedSynchronizer#hasQueuedPredecessors』

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
                

查詢是否有線程已經比目前線程等待更長的時間在等待擷取鎖。

這個方法的調用等價于(但更高效與):

『getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()』

即,可見如下任一情況,則說明目前線程前面沒有比它等待更久的需要擷取鎖的線程:

a)當隊列中第一個等待擷取鎖的線程是目前線程時

b)等待隊列為空時。即,目前沒有其他線程等待擷取鎖。

注意,因為中斷和逾時導緻的取消可能發生在任何時候,該方法傳回‘true’不能保證其他線程會比目前線程更早獲得到鎖。同樣,由于隊列是空的,在目前方法傳回‘false’之後,另一個線程可能會赢得一個入隊競争。

這個方法被涉及用于一個公平的同步器,以避免闖入。如果這個方法傳回’true’,那麼像是一個(公平)同步器的『tryAcquire』方法應該傳回’false’,以及『tryAcquireShared』方法需要傳回一個負數值(除非這是一個可重入的鎖,因為可重入鎖,擷取鎖的結果還需要判斷目前線程是否就是已經擷取鎖的線程了,如果是,則在沒有超過同一線程可擷取鎖的次數上限的情況下,目前線程可以再次擷取這個鎖)。比如,一個公平的、可重入的、排他模式下的『tryAcquire』方法,可能看起來像是這樣的:

深入了解ReentrantLock原理&amp;&amp;Condition原理

傳回:

a)true:如果目前線程前面有排隊等待的線程

b)false:如果目前線程是第一個等待擷取鎖的線程(即,一般就是head.next);或者等待隊列為空。

該方法的正确性依賴于head在tail之前被初始化,以及head.next的精确性,如果目前線程是隊列中第一個等待擷取鎖的線程的時候。

① tail節點的擷取一定先于head節點的擷取。因為head節點的初始化在tail節點之前,那麼基于目前的tail值,你一定能擷取到有效的head值。這麼做能保證接下來流程的正确性。舉個反例來說明這麼做的必要性:如果你按『Node h = head; Node t = tail;』的順序來對h、t進行指派,那麼可能出現你在操作這兩步的時候有其他的線程正在執行隊列的初始化操作,那麼就可能的帶一個『h==null』,而『tail!=null』的情況(這種情況下,是不對的,因為tail!=null的話,head一定也不為null了),這使得『h != t』判斷為true,認為當下是一個非空的等待隊列,那麼接着執行『s = h.next』就會抛出NPE異常了。但是當『Node t = tail; Node h = head;』按初始化相反的順序來指派的話,則不會有問題,因為我們保證了在tail取值的情況下,head的正确性。我們接下看下面的步驟,來說明為什麼這麼做就可以了。

② 在擷取完t、h之後,我們接着先判斷『h != t』,該判斷的用意在于,判斷目前的隊列是否為空。如果為true則說明,目前隊列非空。如果為false 則說明目前隊列為空,為空的話,方法就直接結束了,并傳回false。

但是請注意,當『h != t』為true時,其實是有兩種情況的:

a)當tail和head都非空時,說明此時等待隊列已經完成了初始化,head和tail都指向了其隊列的頭和隊列的尾。

b)當“tail==null”同時“head != null”,則說明,此時隊列正在被其他線程初始化,目前我們擷取的h、t是初始化未完成的中間狀态。但是沒關系,下面的流程會對此請進行判斷。

③ 當『h != t』傳回’true’的話,繼續判斷『(s = h.next) == null || s.thread != Thread.currentThread()』。這裡的兩個判斷分别對應了兩種情況:

a)『(s = h.next) == null』傳回’true’,則說明當擷取的h、t為初始化的中間狀态,因為第一個線程入隊的時候,會先初始化隊列,然後才對head的next值進行指派,是以我們需要“s = h.next”是否為null進行判斷,如果為’null’,則說明目前等待隊列正在被初始化,并且有一個線程正在入隊的操作中。是以此時方法直接結束,并且傳回true。

b)如果『h != t』并且『(s = h.next) != null』,則說明目前線程已經被初始化好了,并且等待隊列中的第一個等待擷取鎖的線程也已經入隊了。那麼接着我們就判斷這個在等待隊列中第一個等待擷取鎖的線程是不是目前線程『s.thread != Thread.currentThread()』,如果是的話,方法結束并傳回false,表示目前線程前面沒有比它等待更久擷取這個鎖的線程了;否則方法結束傳回true,表示目前線程前面有比它等待更久希望擷取這個鎖的線程。

『AbstractQueuedSynchronizer#addWaiter』

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
                

根據給定的模式建立目前線程的節點,并将建立好的節點入隊(加入等待隊列尾部)。

首先在隊列非空的情況下會嘗試一次快速入隊,也就是通過嘗試一次CAS操作入隊,如果CAS操作失敗,則調用enq方法進行“自旋+CAS”方法将建立好的節點加入隊列尾。

在排他模式下,将節點加入到鎖的同步隊列時,Node的mode(即,waitStatus)為’EXCLUSIVE’。waitStatus是用于在排他鎖模式下當節點處于條件隊列時表示下一個等待條件的節點,是以在加入到鎖的同步隊列中(而非條件隊列),我們使用’EXCLUSIVE’這個特殊值來表示該字段。本文主要圍繞共享鎖模式的介紹,就不對其進行展開了,關于排他鎖的内容會在“ReentrantLock源碼解析”一文中介紹。

  • NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
                

對父類AQS tryAcquire方法的重寫。調用『nonfairTryAcquire(acquires)』方法,非公平的嘗試擷取這個可重入的排他鎖

『nonfairTryAcquire』

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
                

執行不公平的『tryLock』。『tryAcquire』在子類中實作,但是都需要不公平的嘗試在『tryLock』方法中。

① 擷取state值,如果為’0’,則說明目前沒有線程占用鎖,那麼調用CAS來嘗試将state的值從0修改為’acquires’的值,

a)如果成功則說明目前線程成功擷取到了這個不公平鎖,那麼通過『setExclusiveOwnerThread(current)』方法來标志目前線程為持有鎖的線程,方法結束,傳回true;

b)如果失敗,則說明有其他線程先擷取到了這個鎖,那麼目前線程擷取鎖失敗。方法結束,傳回false。

② "state != 0",則說明目前鎖已經被某個線程所持有了,那麼判斷目前的線程是否就是持有鎖的那個線程(『if (current == getExclusiveOwnerThread())』)。

a)如果持有鎖的線程就是目前線程,因為ReentrantLock是一個可重入的鎖,是以接下來繼續判斷預計遞歸擷取鎖的次數是否超過了限制的值(即,“nextc < 0”則說明預計遞歸擷取鎖的次數超過了限制值Integer.MAX_VALUE了),那麼會抛出“Error”異常;否則将目前state的值設定為最新擷取鎖的次數(注意,這裡不需要使用CAS的方式來修改state了,因為能操作到這裡的一定就是目前持有鎖的線程了,是以是不會發送多線程競争的情況)。然後方法結束,傳回true;

b)如果持有鎖的線程不是目前線程,那麼目前線程擷取鎖失敗。方法結束,傳回false。

  • 『FairSync#lock』 VS 『NonfairSync#lock』

    a)在公平鎖的模式下,所有擷取鎖的線程必須是按照調用lock方法先後順序來決定的,嚴格的說當有多個線程同時嘗試擷取同一個鎖時,多個線程最終擷取鎖的先後順序是由入隊等待隊列的順序決定的,當然,第一個擷取鎖的線程是無需入隊的,等待隊列是用于存儲那些嘗試擷取鎖失敗後的節點。并且按照FIFO的順序讓隊列中的節點依次擷取鎖。

    b)在非公平模式下,當執行lock時,無論目前等待隊列中是否有等待擷取鎖的線程了,目前線程都會嘗試直接去擷取鎖。

    👆兩點從『FairSync#lock』與『NonfairSync#lock』 實作的不同,以及『FairSync#tryAcquire』與『NonfairSync#tryAcquire』方法實作的不同中都能表現出來。

對于非公平鎖:首先會嘗試立即搶占擷取鎖(若鎖目前沒有被任何線程持有時,并且此時它會和目前同時首次嘗試擷取該鎖的線程以及等待隊列中嘗試擷取該鎖的線程競争),如果擷取鎖失敗,則會被入隊到等待隊列中,此時就需要排隊等待擷取鎖了。

在排他鎖模式下,等待隊列中的第一個等待擷取鎖的線程(即,前驅節點是head節點的節點),僅代表這個節點目前有競争擷取鎖的權力,并不代表它會成功擷取鎖。因為它可能會同非公平擷取鎖的操作産生競争。

繼續回到『AbstractQueuedSynchronizer#acquire』方法,繼續展開裡面的實作。我們接着看『acquireQueued』方法是如何實作将已經入隊的節點排隊等待擷取鎖的。

『AbstractQueuedSynchronizer#acquireQueued』

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
                

用于已經入隊的線程在排他不可中斷模式下擷取鎖。同時也被用于給條件(condition)的wait方法來擷取鎖。

該方法不會因為中斷的發送而傳回,隻有在擷取鎖的情況下才會傳回,但是如果在等待擷取鎖的過程中,目前線程被辨別為了中斷,則在方法傳回的時候傳回true,否則方法傳回是傳回false。

① 擷取目前節點的前驅節點,如果前驅節點是head節點,則說明目前節點是隊列中第一個等待擷取鎖的節點,那麼就執行『tryAcquire』方法嘗試擷取排他鎖,如果擷取排他鎖成功(即,tryAcquire方法傳回true)則調用『setHead(node)』将目前節點設定為頭節點。然後将p(即,舊的head節點)的next置null,有助于p被垃圾收集器收集。然後辨別failed為false。結束方法調用,傳回interrupted(該字段表示在等待擷取鎖的過程中,目前線程是否有被表示為中斷了)。

② 如果目前節點的前驅節點不是head節點,說明該節點前面已經有等待着擷取這個排他鎖的節點;或者目前節點的前驅節點是head節點,但是目前節點擷取鎖失敗了,那麼執行『shouldParkAfterFailedAcquire』方法,若該方法傳回true,則說明本次擷取排他鎖失敗需要阻塞/挂起目前線程,那麼就調用『LockSupport.park(this);』将目前線程挂起,直到被喚醒,并且若挂起期間該線程被标志為了中斷狀态,則将interrupted辨別為true。

③ 當目前節點經過多次喚醒與挂起,終于成功擷取鎖後,則退出方法,并傳回目前線程是否有被中斷的标志。如果目前節點因為某些原因沒有成功擷取到鎖,卻要結束該方法了,那麼調用『cancelAcquire(node)』方法将目前節點從等待隊列中移除。因為方法結束了,說明目前節點不會被操作再去嘗試擷取鎖了,那麼就不應該作為一個有效節點放在等待隊列中,應該被辨別為無效的節點後從隊列中移除。

『AbstractQueuedSynchronizer#setHead』

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
                

将node設定為隊列的頭節點,當它出隊時。該方法隻能被擷取方法調用。僅将無用字段清空(即,置為null)以便于GC并廢除不必要的通知和遞歸。

『AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire』

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
                

檢查并修改一個節點的狀态,當該節點擷取鎖失敗時。傳回true如果線程需要阻塞。這是主要的信号通知控制在所有的擷取鎖循環中。要求’pred’ == ‘node.prev’

① 如果pred.waitStatus == Node.SIGNAL。則說明node的前驅節點已經被要求去通知釋放它的後繼節點,是以node可以安全的被挂起(park)。然後,退出方法,傳回true。

② 如果pred.waitStatus > 0。則說明node的前驅節點被取消了。那麼跳過這個前驅節點并重新标志一個有效的前驅節點(即,waitStatus <= 0 的節點可作為有效的前驅節點),然後,退出方法,傳回false。

③ 其他情況下,即pred.waitStatus為’0’或’PROPAGATE’。表示我們需要一個通知信号(即,目前的node需要喚醒的通知),但是目前還不能挂起node。調用『compareAndSetWaitStatus(pred, ws, Node.SIGNAL)』方法通過CAS的方式來修改前驅節點的waitStatus為“SIGNAL”。退出方法,傳回false。

我們需要一個通知信号,主要是因為目前線程要被挂起了(park)。而如果waitStatus已經是’SIGNAL’的話就無需修改,直接挂起就好,而如果waitStatus是’CANCELLED’的話,說明prev已經被取消了,是個無效節點了,那麼無需修改這個無效節點的waitStatus,而是需要先找到一個有效的prev。是以,剩下的情況就隻有當waitStatus為’0’和’PROPAGAET’了(注意,waitStatus為’CONDITION’是節點不在等待隊列中,是以當下情況waitStatus不可能為’CONDITION’),這是我們需要将prev的waitStatus使用CAS的方式修改為’SIGNAL’,而且隻有修改成功的情況下,目前的線程才能安全被挂起。

還值得注意的時,是以該方法的CAS操作都是沒有自旋的,是以當它操作完CAS後都會傳回false,在外層的方法中會使用自旋,當發現傳回的是false時,會再次調用該方法,以檢查保證有目前node有一個有效的prev,并且其waitStatus為’SIGNAL’,在此情況下目前的線程才會被挂起(park)。

unlock
public void unlock() {
    sync.release(1);
}
                

嘗試去釋放這個鎖。

如果目前線程是持有這個鎖的線程,那麼将持有次數減少1。如果釋放後目前的鎖被持有的次數為0,那麼鎖被釋放。如果目前線程不是持有鎖的線程,那麼抛出“IllegalMonitorStateException”異常。

『AbstractQueuedSynchronizer#release』

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
                

排他模式下的釋放。該方法的實作通過解除一個或多個線程的的阻塞,如果『tryRelase』方法傳回true的話。

該方法能夠被用于實作『Lock#unlock』。

① 調用『tryRelease(arg)』方法來嘗試設定狀态值來反映一個排他模式下的釋放。如果操作成功,則進入步驟[2];否則,如果操作失敗,則方法結束,傳回false。

② 在成功釋放給定的狀态值後,擷取等待隊列的頭節點。如果頭節點不為null并且頭節點的waitStatus!=0(頭節點的waitStatus要麼是’0’,要麼是’SIGNAL’),那麼執行『unparkSuccessor(h)』來喚醒頭節點的後繼節點。(節點被喚醒後,會繼續acquireQueued方法中流程)

③ 隻要『tryRelease(arg)』釋放操作成功,無論是否需要喚醒頭結點的後繼節點,方法結束都會傳回true。

『tryRelease』

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
                

嘗試通過設定狀态值來反映一個在排他模式下的釋放操作。

①『Thread.currentThread() != getExclusiveOwnerThread()』如果執行釋放操作的線程不是持有鎖的線程,那麼直接抛出“IllegalMonitorStateException”異常,方法結束;否則

② 如果執行目前釋放操作的線程是持有鎖的線程,那麼

a)計算新state的值,即目前釋放操作後state的值,如果state為0,則說明目前線程完成釋放了對該鎖的持有,那麼将鎖的持有者重置為null(即,『setExclusiveOwnerThread(null)』)。然後通過『setState(c);』将AQS的state值設定為這個新的state值(即,0),結束方法,傳回true,表示該鎖現在沒有線程持有,可以被重新擷取。

b)如果新state的值不為0(即,大于0),則說明目前的線程并未完成釋放該鎖(因為reentrantLock是一個可重入的鎖,是以一個線程可以多次擷取這鎖,而state值就表示這一線程擷取鎖的次數),那麼通過『setState(c);』将AQS的state值設定為這個新的state值,結束方法,傳回false。

可見對于『tryRelease』方法,釋放鎖操作失敗是通過抛出“IllegalMonitorStateException”異常來表示的。該方法無論傳回‘true’還是‘false’都表示本次的釋放操作完成了。傳回‘true’表示的是鎖已經被目前線程完全釋放了,其他線程可以繼續争奪這個鎖了,在完全釋放鎖的時候也會将鎖中持有者字段重新置null;傳回‘false’表示的是目前釋放操作完成後,該線程還繼續持有這該鎖,此時其他線程是無法擷取到這個鎖的。

同時,我們可以知道,釋放操作隻能有持有鎖的線程來完成,是以對于AQS state字段(一個volatile字段)的修改,不需要使用CAS來完成,隻需要直接設定修改就好。

『AbstractQueuedSynchronizer#unparkSuccessor』

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
                 
<span class="token comment">/*
 * Thread to unpark is held in successor, which is normally
 * just the next node.  But if cancelled or apparently null,
 * traverse backwards from tail to find the actual
 * non-cancelled successor.
 */</span>
<span class="token class-name">Node</span> s <span class="token operator">=</span> node<span class="token punctuation">.</span>next<span class="token punctuation">;</span>
<span class="token keyword">if</span> <span class="token punctuation">(</span>s <span class="token operator">==</span> <span class="token keyword">null</span> <span class="token operator">||</span> s<span class="token punctuation">.</span>waitStatus <span class="token operator">&gt;</span> <span class="token number">0</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
    s <span class="token operator">=</span> <span class="token keyword">null</span><span class="token punctuation">;</span>
    <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token class-name">Node</span> t <span class="token operator">=</span> tail<span class="token punctuation">;</span> t <span class="token operator">!=</span> <span class="token keyword">null</span> <span class="token operator">&amp;&amp;</span> t <span class="token operator">!=</span> node<span class="token punctuation">;</span> t <span class="token operator">=</span> t<span class="token punctuation">.</span>prev<span class="token punctuation">)</span>
        <span class="token keyword">if</span> <span class="token punctuation">(</span>t<span class="token punctuation">.</span>waitStatus <span class="token operator">&lt;=</span> <span class="token number">0</span><span class="token punctuation">)</span>
            s <span class="token operator">=</span> t<span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token keyword">if</span> <span class="token punctuation">(</span>s <span class="token operator">!=</span> <span class="token keyword">null</span><span class="token punctuation">)</span>
    <span class="token class-name">LockSupport</span><span class="token punctuation">.</span><span class="token function">unpark</span><span class="token punctuation">(</span>s<span class="token punctuation">.</span>thread<span class="token punctuation">)</span><span class="token punctuation">;</span>
                

}

喚醒後繼節點,如果存在的話

① 如果狀态值是負數,則在預期發信号通知時清除這個負數狀态值。如果狀态被等待的線程修改了或者清除負數狀态值失敗是允許。

② 後繼節點的線程被喚醒,後繼節點通常就是下一個節點。但是如果下一個節點被取消了或者下一個節點為null,則從隊列尾(tail)往前周遊去找真實的未取消的後繼節點。

『(s == null || s.waitStatus > 0)』:說明下一個節點為null或被取消了(waitStatus允許的狀态值中,隻有’CANCELLED’是>0的)。那麼,就從隊列尾(tail)開始向前周遊,擷取第一個非空且未被取消的節點。如果存在這樣的一個後繼節點的話(即,“s != null”),則執行『LockSupport.unpark(s.thread);』操作來喚醒這個節點的線程,此時等待隊列中第一個等待的線程就會被重新啟動,流程會回到『acquireQueued』方法,該線程會重新重試擷取該鎖,如果成功acquireQueued方法傳回,否則線程會再次被挂起,等待下次喚醒後再去再去競争擷取鎖。

Q:關于node的waitStatus為’CANCELLED’的情況?

A:關于node的waitStatus為’CANCELLED’的情況:比如,當這個node被中斷了,或者設定的逾時時間到了,那麼說明這個線程擷取鎖失敗,那麼此時就應該将其設定為cancelled,因為如果該線程還需要擷取鎖的話,會重新調用擷取鎖的方法,而擷取鎖的方法就是建立一個新的node的。是以,那麼線程擷取鎖失敗的時候就會将這個node的waitStatus設定為’CANCELLED’,一個被取消的線程絕不會擷取鎖成功,一個線程隻能被它自己取消,不能被其他線程取消。

Q:關于node為null的情況?

A:關于node為null的情況:比如,一個入隊操作(enq)不會被配置設定到前驅節點的next字段,直到tail成功指向目前節點之後(通過CAS來将tail指向目前節點。『enq』方法實作中,會先将node.prev = oldTailNode;在需要在CAS成功之後,即tail = node之後,再将oldTailNode.next = node;),是以當看到next字段為null時并不意味着目前節點是隊列的尾部了。無論如何,如果一個next字段顯示為null,我們能夠從隊列尾向前掃描進行複核。

Q:對于ReentrantLock無論是公平鎖還是非公平鎖,在入隊時waitStatus都是什麼??

能确定的是從條件等待隊列轉移到鎖的同步隊列的時候,節點的waitStatus是’0’。

A:無論是公平鎖還是非公平鎖,在建構一個node的時候,waitStatus都是預設值’0’。然後在将node入隊到鎖的等待隊列中後就會執行『acquireQueued』來等待擷取鎖,而該方法會修改目前節點的前驅節點的waitStatus(即,『shouldParkAfterFailedAcquire(p, node)』方法)。在目前節點無法擷取鎖的時候需要被挂起前會将其前驅節點的waitStatus設定為’Node.SIGNAL’。這樣在釋放操作中(『release』),如果釋放後發現鎖的state為’0’,則說明鎖目前可以被其他線程擷取了,那麼就會擷取鎖的等待隊列的head節點,如果head節點的waitStatus!=0(即,head的waitStatus為’Node.SIGNAL’或’Node.PROPAGATE’,其中’Node.PROPAGATE’是共享模式下head節點的waitStatus可能的值,在排他模式下,head節點的waitStatus是’Node.SIGNAL’或’0’),那麼說明head節點後面有等待喚醒擷取鎖的線程,那麼調用『unparkSuccessor』方法來喚醒head節點的後繼節點。

在排他鎖模式下,head節點的waitStatus不是在該節點被設定為head節點的時候修改的。而是如果有節點入隊到等待隊列中,并且此時該節點無法擷取鎖,那麼會将其前驅節點的waitStatus設定為’Node.SIGNAL’後,該節點對應的線程就被挂起了。是以也就是說,如果head節點後還有節點等待擷取鎖,那麼此時head節點的waitStatus自然會使’Node.SIGNAL’,這是在head節點的後繼節點入隊後等待擷取鎖的過程中設定的。而将一個節點設定為head節點,僅是将該節點指派給head節點,并将thread和prev屬性會被置null。

ConditionObject ———— 條件對象

條件對象用來管理那些已經獲得了一個鎖,但是卻不能做有用工作的線程。

Condition實作是AbstractQueuedSynchronizer作為一個Lock實作的基礎。

該類的方法文檔描述了機制,而不是從鎖和條件(Condition)使用者的觀點來指定行為規範。該類所暴露的版本通常需要伴随着依賴于描述相關AbstractQueuedSynchronizer的條件語義的文檔。

這個類是可序列化的,但是所有字段都是transient的,是以反序列化的conditions沒有等待者。

注意,關于在ConditionObject中的描述,若無特殊說明“等待隊列”均指“條件等待隊列”,同鎖的等待隊列不同!

條件等待隊列中有效節點的waitStatus隻能是“Node.CONDITION”,這說明,如果發現條件等待隊列中的節點waitStatus!=“Node.CONDITION”,則說明這個節點被取消等待條件了,那麼應該将其出條件等待隊列中移除。

// 等待隊列的頭節點
private transient Node firstWaiter;
                

// 等待隊列的尾節點

private transient Node lastWaiter;

  • 條件對象初始化
Condition condition = lock.newCondition()
                

『newCondition』

public Condition newCondition() {
    return sync.newCondition();
}
                

傳回一個用于這個Lock執行個體的Condition執行個體。

傳回的Condition執行個體與内置的螢幕鎖一起使用時,支援同Object監控器方法(『Object#wait()』、『Object#notify』、『Object#notifyAll』)相同的用法。

如果這個鎖沒有被持有的話任何Condition等待(『Condition#await()』)或者通知(『Condition#signal』)方法被調用,那麼一個“IllegalMonitorStateException”異常将會抛出。(也就說是說,Condition是用在鎖已經被持有的情況下)

當一個condition的等待方法被調用(『Condition#await()』),鎖會被釋放,并且在這個方法傳回之前,鎖會被重新擷取并且鎖的持有次數會重新存儲為這個方法被調用到時候的值。

如果一個線程在等待期間被中斷了,那麼等待将會結束,一個“InterruptedException”異常将會抛出,并且線程的中斷狀态将被清除。

等待線程被以先進先出(FIFO)的順序被通知。

等待獲得鎖的線程和調用await方法的線程存在本質上的不同。一旦一個線程調用await方法,它進入該條件的等待集。當鎖可用時,該線程不能馬上解除阻塞。相反,它處于阻塞狀态,直到另一個線程調用同一條件上的signalAll方法時為止。這一調用重新激活因為這一條件而等待的所有線程。當這些線程從等待集當中移出時,它們再次成為可運作的,排程器将再次激活它們。同時,它們将試圖重新進入該對象。一旦鎖成為可用的,它們中的某個将從await調用傳回,獲得該鎖并從被阻塞的地方繼續執行。

  • await

    『AbstractQueuedSynchronizer#await』

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
                

實作可中斷的條件等待。

1.如果目前線程被中斷了,則抛出“InterruptedException”異常。

2.儲存通過『getState』傳回的鎖的狀态(state)

3.調用『release』方法,使用儲存的state作為參數,如果該方法失敗則抛出“IllegalMonitorStateException”異常

4.線程阻塞直到收到喚醒通知或者線程被中斷

5.通過調用使用儲存狀态為參數的指定版本的『acquire』方法來重新擷取鎖。

6.如果在步驟4中線程在堵塞的時候被中斷了,那麼抛出“InterruptedException”異常。

① 首先一進入該方法會先判斷目前線程是否被标志為了中斷狀态,如果是則抛出“InterruptedException”異常,并且中斷狀态被清除。

② 調用『addConditionWaiter』添加一個新的等待節點到條件等待隊列中。

③ 調用『fullyRelease(node)』使用鎖目前的狀态值執行釋放,并傳回這個狀态值。(即,該方法調用完後,并執行成功的話,那麼此時其他線程可以去擷取這個鎖了,可見await方法會使目前線程放棄對鎖的持有。同時傳回鎖在釋放前的狀态值。)

④ 自旋判斷建立的等待節點是否在所的同步隊列中了,如果沒有(則說明節點還未被信号通知,以從條件等待隊列中轉移到鎖的同步隊列中),則執行『LockSupport.park(this);』挂起目前線程,線程會一直被挂起直到被信号通知喚醒(『signal()』或『signalAll()』方法會将節點從條件等待隊列轉移到鎖的同步隊列中,并根據加入到同步隊列後得到的前驅節點的waitStatus,可能會去喚醒目前線程;或者當鎖的同步等待隊列中的線程依次擷取鎖并釋放後,直到輪到目前線程成為同步隊列中第一個等待擷取鎖的線程時,目前線程或被喚醒)。接着判斷線程是否發生中斷,如果發送中斷,則退出自旋;否則繼續自旋重新執行本步驟的流程,直至新建立的等待節點被轉移到鎖的同步隊列中。

⑤ 執行『acquireQueued(node, savedState)』方法在排他模式下從等待隊列中的線程擷取鎖,并且在擷取鎖後将鎖的狀态值設定為給定’savedState’(由于’savedState’就是上面因條件等待而釋放鎖前該線程擷取鎖的次數,而在該線程重新擷取鎖,繼續await之後的流程時,保持了該線程在await之前持有鎖的狀态)。并且該方法會在擷取鎖的情況下才會傳回:

a)若在等待擷取鎖的過程中,目前線程被辨別為了中斷,則在方法傳回的時候傳回true;接着判斷interruptMode是否等于“THROW_IE”,如果為true,則說明節點的等待在得到喚醒通知之前就被取消了,此時interruptMode為“THROW_IE”;否則interruptMode!=THROW_IE,則說明節點的等待在得到喚醒通知之後才被取消了,那麼設定interruptMode為“REINTERRUPT”,繼續步驟[6]

b)若在等待擷取鎖的過程中,目前線程未被辨別為中斷,則繼續步驟[6]

(這裡一個正常的未被中斷的流程就是,await的節點對應的線程會在步驟[4]被挂起,然後在某一個時刻因為signalAll()方法調用,該節點被轉移到了鎖的等待隊列中。然後當該線程為鎖的等待隊列中第一個等待擷取鎖的線程時,會被它的前驅節點喚醒,此時節點被喚醒,判斷得到已經在等待隊列中了,那麼結束步驟[4]的自旋,進入的步驟[5],調用『acquireQueued(node, savedState)』嘗試擷取鎖,此時節點已經具有擷取鎖的權限了,如果成功擷取鎖流程繼續,否則節點會被再次挂起,acquireQueued方法會阻塞直到目前線程擷取鎖的情況下傳回。)

⑥ 如果node節點的nextWaiter非null,那麼執行『unlinkCancelledWaiters();』來清除等待隊列中被取消的節點。

因為,如果node節點是通過signal/signalAll信号通知而從條件等待隊列轉移到鎖的同步隊列的話,那麼node的nextWaiter是為null(在signal/signalAll方法中會将該字段置為null);否則如果是因為中斷而将節點從條件等待隊列轉移到鎖的同步隊列的話,此時nextWaiter是不會被重置的,它依舊指向該節點在條件等待隊列中的下一個節點。

⑦ 如果中斷模式标志不為’0’(即,“interruptMode != 0”),則根據給定的中斷模式(interruptMode)在等待結束後報告中斷(『reportInterruptAfterWait(interruptMode)』)

是以,從『await()』方法中,我們可以得知下面幾點:

  1. 建立一個條件等待節點,并加入條件等待隊列尾部。
  2. 徹底釋放目前線程所持有鎖(因為,首先隻有在持有鎖的情況下才可以執行await操作,再者ReentrantLock是一個可重入的鎖,是以同一個線程可以多次擷取鎖),這樣鎖就可以被其他線程擷取。但會記錄這個線程在徹底釋放鎖之前持有該鎖的次數(即,鎖的state值)
  3. 在該線程再次擷取該鎖時,會将鎖的state設定為釋放之前的值。即,從await()條件等待傳回的時候,目前線程對鎖持有的狀态同await()等待條件之前是一緻的。
  4. 節點從條件等待隊列中轉移到鎖的同步隊列是兩種情況:

    a)收到了信号通知,即signal/signalAll

    b)在未收到信号通知之前,檢測到了目前線程被中斷的标志。

  5. 在目前線程重新擷取到鎖,準備從await方法傳回的時候,await方法的傳回也分兩種情況:

    a)在條件等待中的節點是通過signal/signalAll信号通知轉移到鎖的同步隊列的,然後再在同步隊列中根據FIFO的順序來重新擷取到了該鎖。那麼此時await方法正常傳回。(在信号通知之後線程可能被标志位中斷,但這不影響方法的正常傳回)

    b)在條件等待中節點是因為目前線程被标志為了中斷而将其轉移到了鎖的同步隊列中,這樣在目前線程再次重新擷取鎖時,方法會異常傳回,即抛出“InterruptedException”異常。

    接下來對,『await』中的源碼細節進一步展開

    『AbstractQueuedSynchronizer#addConditionWaiter』

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
                

添加一個新的等待者到等待隊列中。

①『Node t = lastWaiter;』:擷取等待隊列的中的最後一個節點,

②『t != null && t.waitStatus != Node.CONDITION』如果這個節點被取消了,那麼調用『unlinkCancelledWaiters();』方法将等待隊列中被取消的節點移除。并重新擷取等待隊列中的最後一個節點(『t = lastWaiter』)

③ 為目前線程建立一個waitStatus為“Node.CONDITION”的節點。

④ 将新建立好的節點加入到等待隊列的尾部:

a)如目前等待隊列為空(即,上面擷取的t為null,也就是說,當等待隊列尾指針為null時,則說明此時等待隊列為空)那麼需要先初始化firstWaiter,将其指向這個新建立的節點。然後将lastWaiter也指向這個新建立的節點。此時等待隊列中隻有一個節點,firstWaiter和lastWaiter都指向這個節點。

b)将等待隊列中最後一個節點的next屬性指向目前這個新建立的節點,然後将lastWaiter指向目前這個新建立的節點。

⑤ 傳回新建立的等待節點。

『AbstractQueuedSynchronizer#fullyRelease』

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
                

使用目前的狀态值執行釋放;傳回儲存的狀态值。

若操作失敗,則取消節點,并抛出異常。

①『int savedState = getState();』擷取目前鎖的狀态值

② 使用擷取的狀态值執行釋放操作『release(savedState)』,如果操作成功,則方法結束,傳回釋放使用的儲存的狀态值;如果操作失敗,則抛出“IllegalMonitorStateException”異常,并取消node節點,即,将節點node的waitStatus設定為“Node.CANCELLED”。

『AbstractQueuedSynchronizer#isOnSyncQueue』

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}
                

傳回true,如果一個節點總是初始化于條件隊列中,并且目前在同步隊列中等待擷取鎖。

① 如果node的waitStatus為“Node.CONDITION”或者node的prev為null,則說明node節點目前還沒有入隊同步隊列,方法結束,傳回false;否則步驟[2]

② 接着判斷『if (node.next != null)』,如果為true,則說明node已經入隊完畢,則方法結束,傳回true。否則步驟[3]

③ 調用『findNodeFromTail(node)』從同步隊列尾開始尋找節點。此時,node.prev非null,但是由于通過CAS将節點入隊的操作可能失敗導緻目前節點還未在同步隊列中(即,節點入隊操作還未完成)。是以我們需要從同步隊列尾部開始向前周遊以明确該節點是否在同步隊列中。在這種方法的調用中,節點總是靠近尾部,除非CAS失敗(不太可能),否則節點将在同步隊列尾部附近,是以我們幾乎不會經曆很多周遊。

『AbstractQueuedSynchronizer#findNodeFromTail』

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}
                

從同步隊列尾向前查詢節點,如果節點在同步隊列中,則傳回true。

僅在『isOnSyncQueue』方法内調用該方法。

從鎖的等待隊列尾部開始向前周遊,如果找到node節點則傳回true;否則周遊完整個等待隊列也就沒法找到node節點,則傳回false。

『AbstractQueuedSynchronizer#checkInterruptWhileWaiting』

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
                

用于檢測中斷,如果中斷在信号通知之前發生則傳回“THROW_IE”,若中斷在信号通知之後發生則傳回“REINTERRUPT”,或者如果沒有被中斷則傳回“0”。

『AbstractQueuedSynchronizer#transferAfterCancelledWait』

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
                

如果需要的話,在取消等待後,将節點轉移到同步隊列。如果線程在被信号通知之前取消等待了則傳回true。

① 通過CAS的方式将節點的狀态從“Node.CONDITION”修改為“0”,如果成功,則說明節點此時還沒有收到信号通知,此時将節點的waitStatus從“Node.CONDITION”修改為“0”就是在被信号通知前取消了節點對條件的等待,接着調用『enq(node)』将節點入隊到鎖的等待隊列中,并結束方法,傳回true。

② CAS操作失敗,則說明該等待條件的節點被其他線程信号通知了(一般是signalAll),那麼自旋調用『isOnSyncQueue(node)』以確定節點入隊(鎖的等待隊列)完成後退出自旋(因為取消等待條件期間一個未完成的轉換是罕見且瞬間的時期,是以使用自旋即可)。然後方法結束,傳回false。

也就是說,首先該方法會確定node從條件等待隊列轉移到鎖的同步隊列中。node是因為該方法的執行而從條件等待隊列轉移到鎖的同步隊列的話,則傳回true;否則如果node是因為signal/signalAll信号通知而從條件等待隊列轉移到鎖的同步隊列的話,則傳回false。

『AbstractQueuedSynchronizer#enq』

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
                

使用自旋鎖的方式(自旋+CAS)插入節點到等待隊列,如果等待隊列為空則初始化隊列。

初始化隊列:建立一個空節點(即,new Node()),将head和tail都指向這個節點。

然後才是将我們待插入的節點插入,即:emptyNode -> newNode. head指向emptyNode,tail指向newNode。

『AbstractQueuedSynchronizer#unlinkCancelledWaiters』

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
                

從等待隊列中解除被取消等待節點的連接配接。該方法僅在持有鎖的時候調用。這個方法調用發生在取消發生在等待條件期間,并根據一個新的等待節點插入時lastWaiter看起來已經被取消了。這個方法需要去避免垃圾的滞留在沒有信号通知的時候。是以即便它可能需要一個完全周遊,這僅會在逾時和取消發生在缺少通知的情況下發生。它會周遊所有的節點而非停止在一個指定的目标,以便在取消風暴期間不需要多次重新周遊就可以将所有的垃圾節點解除連結。

該方法會從firstWaiter開始周遊整個等待隊列,将被取消(即,waitStatus != Node.CONDITION)的節點從等待隊列中移除。

①『Node t = firstWaiter;』:擷取等待隊列的頭結點。

② 從頭節點開始周遊等待隊列。

③『Node next = t.nextWaiter;』擷取目前周遊節點的下一個節點。

④ 如果目前節點被取消了(即,『t.waitStatus != Node.CONDITION』),那麼将目前節點的next字段置null(便于垃圾回收)。然後判斷『trail == null』,如果為true,則說明目前是頭節點被取消了,那麼設定『firstWaiter=next』,即目前節點的下一個節點。此時,next節點可能是一個有效節點,也可能是一個被取消的節點(如果是被取消的節點,會在下一次循環的時候再次重新設定firstWaiter),也可能是一個null(如果為null,接下來就會退出循環,說明等待隊列為空了);如果『trail == null』為false,則說明此周遊到的被取消的節點不是頭節點,并且trail指向了周遊到目前為止等待隊列中最後一個有效的等待節點,那麼執行『trail.nextWaiter = next;』以将目前正在被周遊的節點從等待隊列中解除連接配接。接着判斷『next == null』,若為true,則說明目前周遊的被取消的節點是等待隊列的最後一個節點,那麼執行『lastWaiter = trail;』将lastWaiter指向最後一個有效的等待節點。

⑤ 如果目前節點沒有被取消(即,『t.waitStatus == Node.CONDITION』),那麼将trail置為t,這說明了trail指向了在周遊等待隊列過程中的最後一個有效的等待節點。

⑥ 将t置為next,即目前周遊節點的下一個節點。繼續步驟[3],直至整個等待隊列節點都周遊完(即,next為null)。

signal/signalAll
  • ** signal**
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
                 

将目前的條件等待隊列中将等待時間最長的線程移動到鎖的等待隊列中,如果存在這麼一個線程的話。

① 判斷執行該方法的目前線程是否是持有排他鎖的線程,如果不是則抛出“IllegalMonitorStateException”異常。

② 當執行該方法的線程是持有排他鎖的線程時,擷取條件等待隊列中的第一個等待節點,若這個節點不為null,則執行『doSignal(first)』來信号通知這個節點。

注意,因為條件等待節點是按照FIFO的順序操作節點的,也就是新的等待節點總是會添加對隊列尾部,是以隊列頭節點就是等待最長時間的節點。

『AbstractQueuedSynchronizer#doSignal』

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
                

删除并轉移節點,直到命中一個未被取消的節點或者節點為null(節點為null,說明等待隊列中已經沒有一個有效的節點了,即等待隊列要麼為空,要麼等待隊列中的節點都是被取消的節點)。

① 根據給定的first節點,為起始周遊直至擷取第一個有效等待節點,并信号通知該節點。

② 将目前節點的下一個等待節點(nextWaiter)設定為firstWaiter,然後判斷firstWaiter是否為null,如果為null則說明目前節點已經是條件等待隊列中的最後一個節點了,那麼将lastWaiter也置為null。

③ 将目前周遊節點的nextWaiter置為null(以便于目前節點在方法結束後被垃圾收集器回收)

④ 執行『transferForSignal』将節點從條件等待隊列轉移到同步隊列隊列中,如果操作成功,則目前循環結束,方法傳回;如果操作失敗,那麼繼續從頭節點開始循環步驟[2],直到成功轉移一個節點或者條件等待隊列為空為止。

  • signalAll
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
                

将條件等待隊列中的所有線程轉移到鎖的等待隊列中。

① 判斷執行該方法的目前線程是否是持有排他鎖的線程,如果不是則抛出“IllegalMonitorStateException”異常。

② 當執行該方法的線程是持有排他鎖的線程時,擷取條件等待隊列中的第一個等待節點,若這個節點不為null,則執行『doSignalAll(first)』來信号通知所有的節點。

『AbstractQueuedSynchronizer#doSignalAll』

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
                

删除并轉移所有的節點。

① 将lastWaiter和firstWaiter置為null

② 從給定的節點為起始,開始周遊節點,調用『transferForSignal(first);』來将節點從條件等待隊列中轉移到鎖的等待隊列中。

『isHeldExclusively』

protected final boolean isHeldExclusively() {
    // While we must in general read state before owner,
    // we don't need to do so to check if current thread is owner
    return getExclusiveOwnerThread() == Thread.currentThread();
}
/code>                

判斷執行該方法的目前線程是否是持有排他鎖的線程。

如下情況,傳回’true’:

a)執行該方法的線程就是持有排他鎖的線程。

如下情況,傳回’false’:

a)執行該方法的線程不是持有排他鎖的線程。

b)目前排他鎖沒有被任何線程所持有。

『AbstractQueuedSynchronizer#transferForSignal』

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
                 
<span class="token comment">/*
 * Splice onto queue and try to set waitStatus of predecessor to
 * indicate that thread is (probably) waiting. If cancelled or
 * attempt to set waitStatus fails, wake up to resync (in which
 * case the waitStatus can be transiently and harmlessly wrong).
 */</span>
<span class="token class-name">Node</span> p <span class="token operator">=</span> <span class="token function">enq</span><span class="token punctuation">(</span>node<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">int</span> ws <span class="token operator">=</span> p<span class="token punctuation">.</span>waitStatus<span class="token punctuation">;</span>
<span class="token keyword">if</span> <span class="token punctuation">(</span>ws <span class="token operator">&gt;</span> <span class="token number">0</span> <span class="token operator">||</span> <span class="token operator">!</span><span class="token function">compareAndSetWaitStatus</span><span class="token punctuation">(</span>p<span class="token punctuation">,</span> ws<span class="token punctuation">,</span> <span class="token class-name">Node</span><span class="token punctuation">.</span>SIGNAL<span class="token punctuation">)</span><span class="token punctuation">)</span>
    <span class="token class-name">LockSupport</span><span class="token punctuation">.</span><span class="token function">unpark</span><span class="token punctuation">(</span>node<span class="token punctuation">.</span>thread<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">return</span> <span class="token boolean">true</span><span class="token punctuation">;</span>
                

}

從條件等待隊列轉移一個節點到同步隊列中。如果成功傳回true

傳回:

a)true:成功從條件隊列中轉移一個節點到同步隊列中

b)false:在釋放信号通知之前,該節點被取消了。

① 通過CAS的方式将需要轉移的節點的狀态從“Node.CONDITION”修改為“0”。如果CAS操作失敗,則說明這個節點的已經被取消了。那麼方法結束,傳回false。

② 将修改完狀态後的節點加入到鎖等待隊列中(『enq(node)』),并得到加入到等待隊列後,目前節點的前驅節點。

③ 若前驅節點的"waitStatus > 0”(即,waitStatus為“CANCELLED”)或者通過CAS的方式将前驅節點的waitStatus修改為“SIGNAL”失敗,則調用『LockSupport.unpark(node.thread);』将目前線程喚醒(喚醒後的線程會繼續await中被挂起之後的流程)。

④ 否則,"waitStatus <= 0”并且通過CAS成功将前驅節點的waitStatus修改為了“SIGNAL”,以此來辨別目前線程正在等待擷取鎖。

  • 『signal』vs『signalAll』

    signal解除的是條件等待隊列中第一個有效的節點(即,節點的waitStatus為“CONDITION”),這比解除所有線程的阻塞更加有效,但也存在危險。如果signal的線程發現自己仍然不能運作,那麼它再次被阻塞(await)。如果沒有其他線程再次調用signal,那麼系統就死鎖了。

signal/signalAll方法本質上隻是将條件等待隊列中的節點轉移到鎖的同步隊列中。是以,不能任務signal/signalAll方法調用後就會使得線程擷取鎖,線程什麼時候擷取鎖,就是根據鎖的同步隊列FIFO的順序來決定的,隻有同步隊列中的第一個線程才有權利去争奪擷取鎖。