天天看點

隊列同步器(AQS)源碼分析

掃描下方二維碼或者微信搜尋公衆号

菜鳥飛呀飛

,即可關注微信公衆号,閱讀更多

Spring源碼分析

Java并發程式設計

文章。
隊列同步器(AQS)源碼分析

文章目錄

    • 擷取鎖:acquire()
      • 第一步:tryAcquire()
      • 第二步:addWaiter()
      • 第三步:acquireQueued()
      • 第四步:selfInterrupt()
    • 釋放鎖:release()
    • 總結
在上一篇文章中分析了隊列同步器(AQS)的設計原理,這一篇文章将結合具體的源代碼來分析AQS。如果了解了上一篇文章中介紹的同步隊列的資料結構,那麼源碼看起來相對比較好了解。讀過Spring源碼的朋友應該能感受到Spring中方法和變量的命名是非正常範的,見名知意。與Spring相比,JDK中某些類的源碼,命名則不是那麼規範,裡面存在各種簡寫,單個字母命名變量,有時候看起來十分燒腦。其中AQS中就存在這種情況,是以如果不了解AQS的設計原理,直接看源碼,就會顯得比較困難。
  • AQS中擷取鎖和釋放鎖的方法很多,今天以acquire()和release()為例來進行源碼分析,其他方法的邏輯與這兩個方法邏輯基本一樣。
public static void main(String[] args) {
    // 公平鎖
    ReentrantLock lock = new ReentrantLock(true);
    try{
        // 加鎖
        lock.lock();
        System.out.println("Hello World");
    }finally {
        // 釋放鎖
        lock.unlock();
    }
}
           
  • 以上面demo為例,先建立了一個公平鎖,然後調用lock()方法來加鎖,最後調用unlock()方法釋放鎖。在lock()方法中會調用到Sync的lock()方法,由于我們建立的是公平鎖,是以這個時候調用的是FailSync的lock()方法。
static final class FairSync extends Sync {

    final void lock() {
        // 調用acquire()擷取同步狀态
        acquire(1);
    }
}
           

擷取鎖:acquire()

  • 在FailSync的lock()方法中就會調用到AQS的模闆方法:

    acquire()

    。最終的加鎖邏輯就是在acquire()方法中實作的。

    acquire()

    的源碼如下:
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 調用selfInterrupt()方法重新設定中斷辨別
        selfInterrupt();
}
           
  • 這個方法就3行代碼,但是邏輯相當複雜。可以拆分為四段邏輯:第一步:調用tryAcquire()方法;第二步:執行addWaiter()方法;第三步:執行acquireQueued()方法;第四步:執行selfInterrupt()方法。下面将圍繞這4步展開分析。我在

    acquire()

    這個方法上簡單寫了些注釋,也可以直接參考。
public final void acquire(int arg) {
    /*
     * acquire()的作用是擷取同步狀态,這裡同步狀态的含義等價于鎖
     * tryAcquire()方法是嘗試擷取同步狀态,如果該方法傳回true,表示擷取同步狀态成功;傳回false表示擷取同步狀态失敗
     * 第1種情況:當tryAcquire()傳回true時,acquire()方法會直接結束。
     *           因為此時!tryAcquire(arg) 就等于false,而&&判斷具有短路作用,當因為&&判斷前面的條件判斷為false,&&後面的判斷就不會進行了,是以此時就不會執行後面的if語句,此時方法就直接傳回了。
     *           這種情況表示線程擷取鎖成功了
     * 第2中情況:當tryAcquire()傳回false時,表示線程沒有擷取到鎖,
     *           這個時候就需要将線程加入到同步隊列中了。此時 !tryAcquire() == true,是以會進行&&後面的判斷,即acquireQueued()方法的判斷,在進行acquireQueued()方法判斷之前會先執行addWaiter()方法。
     *
     *           addWaiter()方法傳回的是目前線程代表的節點,這個方法的作用是将目前線程放入到同步隊列中。
     *           然後再調用acquireQueued()方法,在這個方法中,會先判斷目前線程代表的節點是不是第二個節點,如果是就會嘗試擷取鎖,如果擷取不到鎖,線程就會被阻塞;如果擷取到鎖,就會傳回。
     *           如果目前線程代表的節點不是第二個節點,那麼就會直接阻塞,隻有當擷取到鎖後,acquireQueued()方法才會傳回
     *
     *           acquireQueued()方法如果傳回的是true,表示線程是被中斷後醒來的,此時if的條件判斷成功,就會執行selfInterrupt()方法,該方法的作用就是将目前線程的中斷辨別位設定為中斷狀态。
     *           如果acquireQueued()方法傳回的是false,表示線程不是被中斷後醒來的,是正常喚醒,此時if的條件判斷不會成功。acquire()方法執行結束
     *
     * 總結:隻有當線程擷取到鎖時,acquire()方法才會結束;如果線程沒有擷取到鎖,那麼它就會一直阻塞在acquireQueued()方法中,那麼acquire()方法就一直不結束。
     *
     */

    // tryAcquire()方法是AQS中定義的一個方法,它需要同步元件的具體實作類來重寫該方法。是以在公平鎖的同步元件FairSync和非公平鎖的同步組NonfairSync中,tryAcquire()方法的實作代碼邏輯是不一樣的。
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 調用selfInterrupt()方法重新設定中斷辨別
        selfInterrupt();
}
           

第一步:tryAcquire()

  • tryAcquire()是由AQS的子類重寫的方法,是以代碼邏輯和具體的鎖的實作有關系,由于本次Demo中使用的是公平鎖,是以它最終會調用FairSync類的tryAcquire()方法。tryAcquire()方法的作用就是擷取同步狀态(也就是擷取鎖),如果目前線程成功擷取到鎖,那麼就會将AQS中的同步狀态state加1,然後傳回true,如果沒有擷取到鎖,将會傳回false。FairSync類上tryAcquire()方法的源碼如下:
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 擷取同步狀态state的值(在AQS中,state就相當于鎖,如果線程能成功修改state的值,那麼就表示該線程擷取到了鎖)
        int c = getState();
        if (c == 0) {
            // 如果c等于0,表示還沒有任何線程擷取到鎖

            /**
             * 此時可能存在多個線程同時執行到這兒,均滿足c==0這個條件。
             * 在if條件中,會先調用hasQueuedPredecessors()方法來判斷隊列中是否已經有線程在排隊,該方法傳回true表示有線程在排隊,傳回false表示沒有線程在排隊
             * 第1種情況:hasQueuedPredecessors()傳回true,表示有線程排隊,
             *           此時 !hasQueuedPredecessors() == false,由于&& 運算符的短路作用,if的條件判斷為false,那麼就不會進入到if語句中,tryAcquire()方法就會傳回false
             *
             * 第2種情況:hasQueuedPredecessors()傳回false,表示沒有線程排隊
             *           此時 !hasQueuedPredecessors() == true, 那麼就會進行&&後面的判斷,就會調用compareAndSetState()方法去進行修改state字段的值
             *           compareAndSetState()方法是一個CAS方法,它會對state字段進行修改,它的傳回值結果又需要分兩種情況
             *           第 i 種情況:對state字段進行CAS修改成功,就會傳回true,此時if的條件判斷就為true了,就會進入到if語句中,同時也表示目前線程擷取到了鎖。那麼最終tryAcquire()方法會傳回true
             *           第 ii 種情況:如果對state字段進行CAS修改失敗,說明在這一瞬間,已經有其他線程擷取到了鎖,那麼if的條件判斷就為false了,就不會進入到if語句塊中,最終tryAcquire()方法會傳回false。
             */
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 目前線程成功修改了state字段的值,那麼就表示目前線程擷取到了鎖,那麼就将AQS中鎖的擁有者設定為目前線程,然後傳回true。
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果c等于0,則表示已經有線程擷取到了鎖,那麼這個時候,就需要判斷擷取到鎖的線程是不是目前線程
        else if (current == getExclusiveOwnerThread()) {
            // 如果是目前線程,那麼就将state的值加1,這就是鎖的重入
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            // 因為此時肯定隻有一個線程擷取到了鎖,隻有擷取到鎖的線程才會執行到這行代碼,是以可以直接調用setState(nextc)方法來修改state的值,這兒不會存線上程安全的問題。
            setState(nextc);
            // 然後傳回true,表示目前線程擷取到了鎖
            return true;
        }
        // 如果state不等于0,且目前線程也等于已經擷取到鎖的線程,那麼就傳回false,表示目前線程沒有擷取到鎖
        return false;
    }
}
           
  • 在tryAcquire()方法中,先判斷了同步狀态state是不是等于0。
  • 1.

    如果等于0,就表示目前還沒有線程持有到鎖,那麼這個時候就會先調用

    hasQueuedPredecessors()

    方法判斷同步隊列中有沒有等待擷取鎖的線程,如果有線程在排隊,那麼目前線程肯定擷取鎖失敗(因為AQS的設計的原則是FIFO,既然前面有人已經在排隊了,你就不能插隊,老老實實去後面排隊去),那麼tryAcquire()方法會傳回false。如果同步隊列中沒有線程排隊,那麼就讓目前線程對state進行CAS操作,如果設定成功,就表示目前擷取到鎖了,傳回true;如果CAS失敗,表示在這一瞬間,鎖被其他線程搶走了,那麼目前線程就擷取鎖失敗,就傳回false。
  • 2.

    如果不等于0,就表示已經有線程擷取到鎖了,那麼此時就會去判斷目前線程是不是等于已經持有鎖的線程(

    getExclusiveOwnerThread()方法的作用就是傳回已經持有鎖的線程

    ),如果相等,就讓state加1,這就是所謂的重入鎖,重入鎖就是允許同一個線程多次擷取到鎖。
  • 3.

    state既不等于0,目前線程也不等于持有鎖的線程,那麼就傳回false,表示目前線程擷取到鎖失敗。
  • 就這樣,通過tryAcquire()方法,就實作了線程能否擷取到鎖的功能和邏輯。在不同的鎖的實作中,tryAcquire()方法的邏輯是不一樣的,上面的示例是以公平鎖的實作舉例的。

第二步:addWaiter()

  addWaiter()方法不一定會執行到,它依賴第一步tryAcquire()的執行結果。如果線程擷取鎖成功,那麼tryAcquire()方法就會傳回true,此時就不會進行後面的第二步、第三步、第四步了。隻有當線程擷取鎖失敗了,tryAcquire()方法會傳回false,這個時候需要将線程加入到同步隊列中,是以需要執行後面的步驟。
  • addWaiter()方法的邏輯相對比較簡單,它的作用就是将目前線程封裝成一個Node,然後加入到同步隊列中。如果同步隊列此時還沒有初始化(也就是AQS的head、tail屬性均為null),那麼它還會進行同步隊列的初始化。源碼如下:
private Node addWaiter(Node mode) {
    // 先根據目前線程,建構一個Node節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 然後需要把目前線程代表的節點添加到隊列的尾部,此時隊尾可能是null,因為可能這是第一次出現多個線程争搶鎖,隊列還沒有初始化,即頭尾均為null
    Node pred = tail;
    if (pred != null) {
        // 如果隊尾不為null,就直接将目前線程代表的node添加到隊尾,修改node節點中prev的指針指向
        node.prev = pred;
        // 利用CAS操作設定隊尾,如果設定成功,就修改倒數第二個節點的next的指針指向,然後方法return結束
        // 在多個線程并發的情況下,CAS操作可能會失敗,如果失敗,就會進入到後面的邏輯
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果隊尾為null或者CAS設定隊尾失敗,就調用enq()方法将目前線程代表的節點加入到同步隊列當中
    enq(node);
    // 将目前線程代表的節點傳回
    return node;
}
           
  • 在addWaiter()方法中會先判斷尾結點是不是null,如果是null,就表示同步隊列還沒有被初始化,那麼就不會進入到if的邏輯中了,将會調用

    enq(node)

    方法進行同步隊列的初始化和目前線程的入隊操作。如果尾結點不為null,就表示同步隊列已經被初始化了,隻需要将目前線程所代表的節點加入到同步隊列隊尾即可。由于加入到隊尾這個操作存在并發的可能性,是以需要利用CAS操作(即

    compareAndSetTail()

    方法)來保證原子性,如果CAS成功,就将舊的隊尾節點的next指針指向新的隊尾節點,這樣就完成了節點加入到隊尾的操作了,同時addWaiter()方法return結束。如果CAS操作失敗,那麼就會調用

    enq(node)

    方法來實作入隊操作。
  • enq(node)

    方法的源碼如下:
private Node enq(final Node node) {
    // 無限循環
    for (;;) {
        Node t = tail;
        // 如果尾結點為null,說明這是第一次出現多個線程同時争搶鎖,是以同步隊列沒有被初始化,tail和head此時均為null
        if (t == null) { // Must initialize
            // 設定head節點,注意此時是new了一個Node節點,節點的next,prev,thread屬性均為null
            // 這是因為對于頭結點而言,作為隊列的頭部,它的next指向本來就應該是null
            // 而頭結點的thread屬性為空,這是AQS同步器特意為之的
            // 頭結點的prev屬性此時為空,當進行第二次for循環的時候,tail結點就不為空了,是以不會進入到if語句中,而是進入到else語句中,在這裡對頭結點的prev進行了指派
            if (compareAndSetHead(new Node()))
                // 第一次初始化隊列時,頭結點和尾結點是同一個結點
                tail = head;
        } else {
            // 隊尾結點不為null,說明隊列已經進行了初始化,這個時候隻需要将目前線程表示的node結點加入到隊列的尾部

            // 設定目前線程所代表的節點的prev的指針,使其指向尾結點
            node.prev = t;
            // 利用CAS操作設定隊尾結點
            if (compareAndSetTail(t, node)) {
                // 然後修改隊列中倒數第二個節點的next指針,使其指向隊尾結點
                t.next = node;
                // 然後傳回的是隊列中倒數第二個節點(也就是舊隊列中的尾結點)
                return t;
            }
        }
    }
}
           
  • enq(node)

    方法中進行了一個for(;;)的無限循環,在循環中會先判斷隊尾是不是null,是null就表示同步隊列沒有被初始化,就采用CAS操作(即

    compareAndSetHead()

    方法)來初始化隊列,并讓tail = head。注意:在建立頭結點時,頭結點的thread、prev、next屬性均為null,在進行第二次循環時,next屬性會被指派,但是thread、prev屬性始終是null。
  • 當同步隊列初始化完成後,後面進入for循環的時候,就不會進入到if語句塊中了,而是進入到else中,此時也是利用CAS操作(即

    compareAndSetTail()

    方法)将目前線程所代表的Node節點設定為隊尾,然後修改之前隊尾的next指針,維護好隊列節點中的雙向關聯關系,就将舊的隊尾節點傳回。
  • 在執行完enq()方法後,就會回到addWaiter()方法中,然後addWaiter()方法就會将目前線程所代表的Node節點傳回。

第三步:acquireQueued()

  • 在執行完addWaiter()方法後,就會執行acquireQueued()方法。該方法的作用就是讓線程以不間斷的方式擷取鎖,如果擷取不到,就會一直阻塞在這個方法裡面,直到擷取到鎖,才會從該方法裡面傳回。
  • acquireQueued()方法的源碼如下:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 無限for循環
        for (;;) {
            // 擷取目前線程所代表節點的前一個節點
            final Node p = node.predecessor();
            // 如果前一個節點是頭結點(即目前線程是隊列中的第二個節點),那麼就調用tryAcquire()方法讓目前線程去嘗試擷取鎖
            if (p == head && tryAcquire(arg)) {
                // 如果目前線程擷取鎖成功,那麼就将目前線程所代表的節點設定為頭結點(因為AQS的設計原則就是:隊列中占有鎖的線程一定是頭結點)
                setHead(node);
                // 然後将原來的頭節點的next指針設定為null,這樣頭節點就沒有任何引用了,有助于GC回收。
                p.next = null; // help GC
                failed = false;
                // 傳回interrupted所代表的值,表示目前線程是不是通過中斷而醒來的
                return interrupted;
            }
            // 如果前一個節點不是頭結點 或者 前一個節點是頭結點,但是目前節點調用tryAcquire()方法擷取鎖失敗,那麼就會執行到下面的if判斷
            /**
             * 在if判斷中,先調用了shouldParkAfterFailedAcquire()方法。
             * 在第一次調用shouldParkAfterFailedAcquire()時,肯定傳回false(為什麼會傳回false,可以看下shouldParkAfterFailedAcquire()方法的源碼)
             * 由于目前代碼是處于無限for循環當中的,是以當後面出現第二次代碼執行到這兒時,會再次調用shouldParkAfterFailedAcquire()方法,此時這個方法會傳回true。
             * 當shouldParkAfterFailedAcquire()傳回true時,if判斷中就會再調用parkAndCheckInterrupt()方法,該方法會将目前線程進行阻塞,直到這個線程被喚醒或者被中斷。
             * 是以當線程擷取不到鎖時,就會一直阻塞到這兒。直到被其他線程喚醒,才會繼續向下執行,當線程想來後,再次進入到目前代碼的無限for循環中,除非線程擷取到鎖,才會return傳回
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果在try中出現異常,那麼此時failed就會為true,就會取消擷取鎖
        // failed的初始值為true,如果try語句中不出現異常,最終線程擷取到鎖後,就會将failed置為false,那麼下面的if判斷條件就不會成立,也就不執行cancelAcquire()方法了
        // 如果上面的try語句存在異常,那麼就不會将failed設定為false,那麼就會執行cancelAcquire()方法
        if (failed)
            cancelAcquire(node);
    }
}
           
  • 在acquireQueued()方法中也是使用了無限for循環:for(;?。在該方法中會先擷取到目前線程的前一個節點。
    1. 如果前一個節點是頭結點,那麼就會讓目前線程調用tryAcquired()方法嘗試擷取鎖。如果擷取鎖成功,就将目前線程設定為頭結點,這裡設定頭結點時隻需要調用setHead(head)方法即可,因為肯定隻有一個線程擷取到鎖,是以不用考慮并發的問題。然後舊的頭結點的next引用置為null,便于GC。接着acquireQueued()方法直接傳回。如果目前線程調用tryAcquire()方法擷取鎖失敗,就會進入到下面2中的邏輯。
    1. 如果前一個節點不是頭結點或者是頭結點但是擷取鎖失敗,就會進入到後面的邏輯中,即執行到

      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

  • 在if判斷中,會先執行

    shouldParkAfterFailedAcquire()

    方法,該方法用來判斷目前線程是否應該park,即是否應該将目前線程阻塞。如果傳回true,就表示需要阻塞,那麼就會接着執行parkAndCheckInterrupt()方法,在

    parkAndCheckInterrupt()

    方法中,就會将目前線程阻塞起來,直到目前線程被喚醒并擷取到鎖以後,acquireQueued()方法才會傳回。parkAndCheckInterrupt()方法的源碼如下:
private final boolean parkAndCheckInterrupt() {
    // 将目前線程挂起
    LockSupport.park(this);
    return Thread.interrupted();
}
           
  • shouldParkAfterFailedAcquire()方法是一個比較有意思的方法,在第一次調用它的時候,它會傳回false,由于acquireQueued()方法裡有一個無限for循環,是以會進行第二次調用shouldParkAfterFailedAcquire()方法,這個時候它會傳回true。shouldParkAfterFailedAcquire()方法的源碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 在初始情況下,所有的Node節點的waitStatus均為0,因為在初始化時,waitStatus字段是int類型,我們沒有顯示給它指派,是以它預設是0
    // 後面waitStatus字段會被修改

    // 在第一進入到shouldParkAfterFailedAcquire()時,是以waitStatus是預設值0(因為此時沒有任何地方對它進行修改),是以ws=0
    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL)
        // 如果ws = -1,就傳回true
        // 如果第一次進入shouldParkAfterFailedAcquire()方法時,waitStatus=0,那麼就會進入到後面的else語句中
        // 在else語句中,會将waitStatus設定為-1,那麼當後面第二次進入到shouldParkAfterFailedAcquire()方法時,就會傳回true了
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 如果ws既不等于-1,也不大于0,就會進入到目前else語句中
        // 此時會調用CAS方法将pred節點的waitStatus字段的值改為-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
           
  • 經過shouldParkAfterFailedAcquire()方法後,隊列中所有節點的狀态的示意圖如下。
    隊列同步器(AQS)源碼分析
  • 最終對于acquireQueued()方法而言,隻有線程擷取到了鎖或者被中斷,線程才會從這個方法裡面傳回,否則它會一直阻塞在裡面。

第四步:selfInterrupt()

  • 當線程從acquireQueued()方法處傳回時,傳回值有兩種情況,如果傳回false,表示線程不是被中斷才喚醒的,是以此時在acquire()方法中,if判斷不成立,就不會執行selfInterrupt()方法,而是直接傳回。如果傳回true,則表示線程是被中斷才喚醒的,由于在parkAndCheckInterrupt()方法中調用了Thread.interrupted()方法,這會将線程的中斷辨別重置,是以此時需要傳回true,使acquire()方法中的if判斷成立,然後這樣就會調用selfInterrupt()方法,将線程的中斷辨別重新設定一下。最後acquire()方法傳回。

釋放鎖:release()

  • 釋放鎖的代碼實作相對比較簡單,當持有鎖的線程釋放鎖後,會喚醒同步隊列中下一個

    處于等待狀态

    的節點去嘗試擷取鎖。還是以上面的Demo為例,當調用lock.unlock()時,最終會調用到AQS中的release()方法,release()方法的源碼如下。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 當釋放鎖成功以後,需要喚醒同步隊列中的其他線程
        Node h = head;
        // 當waitStatus!=0時,表示同步隊列中還有其他線程在等待擷取鎖
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           
  • 在release()方法中會先調用AQS子類的tryRelease()方法,在本文的示例中,就是調用ReentrantLock類中Sync的tryRelease()方法,該方法就是讓目前線程釋放鎖。方法源碼如下。
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 判斷目前線程是不是持有鎖的線程,如果不是就抛出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 因為上面一步已經确認了是目前線程持有鎖,是以在修改state時,肯定是線程安全的
    boolean free = false;
    // 因為可能鎖被重入了,重入了幾次就需要釋放幾次鎖,是以這個地方需要判斷,隻有當state=0時才表示完全釋放了鎖。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           
  • 在tryRelease()方法中,會先檢驗目前線程是不是持有鎖的線程,如果不是持有鎖的線程,就會抛出異常(這一點很容易了解,沒有擷取到鎖,卻去調用釋放鎖的方法,這個時候就告訴你出錯了)。然後再判斷state減去1後是否等于0,如果等于0,就表示鎖被釋放了,最終tryRelease()方法就會傳回true。如果state減去1後不為0,表示鎖還沒有被釋放,最終tryRelease()方法就會傳回false。因為對于重入鎖而言,每擷取一次鎖,state就會加1,擷取了幾次鎖,就應該釋放幾次,這樣當完全釋放完時,state才會等于0。
  • 當tryRelease()傳回true時,在release()方法中,就會進入到if語句塊中。在if語句塊中,會先判斷同步隊列中是否有線程在等待擷取鎖,如果有,就調用

    unparkSuccessor()

    方法來喚醒下一個處于等待狀态的線程;如果沒有,tryRelease()就會直接傳回true。
  • 如何判斷同步隊列中是否有線程在等待擷取鎖的呢?是通過

    h != null && h.waitStatus != 0

    這一個判斷條件來判斷的。
    1. 當頭節點等于null時,這個時候肯定沒有線程在等待擷取鎖,因為同步隊列的初始化是當有線程擷取不到鎖以後,将自己加入到同步隊列的時候才初始化同步隊列的(即給head、tail指派),如果頭結點為null,說明同步隊列沒有被初始化,即沒有線程在等待擷取鎖。
    1. 在acquire()方法中我們提到了shouldParkAfterFailedAcquire()方法,該方法會讓等待擷取鎖的節點的waitStatus的值等于-1,是以當

      h != null && h.waitStatus != 0

      時,可以認為同步隊列中有線程在等待擷取鎖。
  • 如果同步隊列中有線程在等待擷取鎖,那麼此時在release()方法中調用

    unparkSuccessor()

    方法去喚醒下一個等待狀态的節點。unparkSuccessor()方法的源碼如下
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        /**
         * 同步隊列中的節點鎖代表的線程,可能被取消了,此時這個節點的waitStatus=1
         * 是以這個時候利用for循環從同步隊列尾部開始向前周遊,判斷節點是不是被取消了
         * 正常情況下,當頭結點釋放鎖以後,應該喚醒同步隊列中的第二個節點,但是如果第二個節點的線程被取消了,此時就不能喚醒它了,
         * 就應該判斷第三個節點,如果第三個節點也被取消了,再依次往後判斷,直到第一次出現沒有被取消的節點。如果都被取消了,此時s==null,是以不會喚醒任何線程
         */
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
           
  • 在unparkSuccessor()方法中,會找到頭結點後

    第一個waitStatus小于0

    的節點,然後将其喚醒。為什麼是

    waitStatus小于等于0

    ?因為waitStatus=1表示線程是取消狀态,沒資格擷取鎖。最後調用LockSupport.unpark()方法喚醒符合條件的線程,此時線程被喚醒後,就回到上面

    擷取鎖流程中

    的parkAndCheckInterrupt()方法處,接着就執行後面的邏輯了。

總結

  • 本文通過分析擷取鎖acquire()和釋放鎖release()方法的源碼,講解了線程擷取鎖和入隊、出隊的流程。對于AQS中其他擷取鎖和釋放鎖的方法,如可響應中斷的擷取鎖,支援逾時的擷取鎖以及共享式的擷取鎖,其方法的源碼和邏輯和acquire()方法的邏輯很相似,有興趣的朋友可以自己去閱讀下源碼。
  • 在本文中提到了公平鎖、非公平鎖以及重入鎖的概念,關于這些會在下一篇文章ReentrantLock的源碼分析中詳細介紹。
隊列同步器(AQS)源碼分析