天天看點

AQS源碼解讀(二)——從acquireQueued探索獨占鎖實作原理,如何阻塞?如何喚醒?

天青色等煙雨,而我在等你,微信公衆号搜尋:徐同學呀,持續更新肝貨,快來關注我,和我一起學習吧~

更多JUC源碼解讀系列文章請持續關注JUC源碼解讀文章目錄JDK8 !

文章目錄

    • 一、前言
    • 二、獨占模式擷取鎖
      • 1、acquireQueued做的不隻是阻塞線程
      • 2、shouldParkAfterFailedAcquire通過前驅判斷是否應該阻塞
      • 3、parkAndCheckInterrupt阻塞線程
    • 三、獨占模式可中斷擷取鎖
      • doAcquireInterruptibly阻塞線程并響應中斷
    • 四、獨占模式可逾時擷取鎖
      • doAcquireNanos阻塞線程自動喚醒響應中斷
    • 五、獨占模式釋放鎖
      • unparkSuccessor喚醒後繼線程
    • 六、取消擷取鎖cancelAcquire
    • 七、總結

一、前言

在AQS中,獨占鎖又稱為互斥鎖,其擷取與釋放鎖的過程由兩個模闆方法實作:

acquire

release

,其中兩個函數

tryAcquire

tryRelease

,AQS并沒有給出具體實作,需要子類根據實際情況自行實作,如

ReentrantLock

中實作了公平與非公平的

tryAcquire

一個線程擷取鎖,無非就是對state變量進行CAS修改,修改成功則擷取鎖,修改失敗則進入隊列,而AQS就是負責線程進入同步隊列以後的邏輯,如何出入隊列?如何阻塞?如何喚醒?一切的核心都在AQS裡。

二、獨占模式擷取鎖

tryAcquire

,AQS中沒有給出具體實作,暫且不談,主要看

acquireQueued

tryAcquire

擷取鎖失敗,傳回false,則執行

acquireQueued

,進入同步隊列流程。

addWaiter

是進入隊列的操作,其主要流程是建立節點然後将新節點CAS排到隊列尾部,而

acquireQueued

的職責是線程進入隊列之後的操作,繼續擷取鎖?還是阻塞?

public final void acquire(int arg) {
    //若沒有搶到鎖,則進入同步隊列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //自己中斷自己(補償機制)
        selfInterrupt();
}
           

1、acquireQueued做的不隻是阻塞線程

acquireQueued

要做的不隻是單純的阻塞線程,還有被喚醒或者自旋擷取鎖後出隊列。

  1. 擷取node節點的前驅節點,判斷其是否是head,是則繼續搶鎖(可能剛入隊列就排在head後面,也有可能自旋後,有其他節點擷取鎖出隊列,而使得node排在head後面),搶鎖成功則出隊換頭。
  2. node的前驅節點不是head或者搶鎖失敗,進入阻塞判斷

    shouldParkAfterFailedAcquire

  3. 判斷應該放心阻塞,調用

    parkAndCheckInterrupt

    阻塞目前線程。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //拿node的前一個節點
            final Node p = node.predecessor();
            //若p是頭節點,,說明自己排在隊列的第一個嘗試搶鎖
            if (p == head && tryAcquire(arg)) {
                //node成為新的head
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //拿到鎖了傳回false
                return interrupted;
            }
            //1.應該阻塞,調用parkAndCheckInterrupt阻塞線程
            //2.不應該阻塞,再給一次搶鎖的機會
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //基本不可能走到這一步,除非是系統級别的異常導緻擷取鎖失敗for循環意外退出,
            cancelAcquire(node);
    }
}
           

2、shouldParkAfterFailedAcquire通過前驅判斷是否應該阻塞

阻塞判斷

shouldParkAfterFailedAcquire

,什麼情況下應該阻塞線程?什麼情況下應該再給一次搶鎖的機會?3種情況:

  1. 判斷node的前驅節點

    waitStatus=SIGNAL

    (這裡和CLH鎖自旋檢測前驅狀态一樣),node的線程就放心阻塞,因為會在下次某個線程釋放鎖後,被node前驅喚醒。
  2. node的前驅節點

    waitStatus>0

    waitStatus

    隻有

    CANCELLED>0

    ,是以node前驅節點被取消了,剔除取消節點,給node連結一個正常的前驅,然後再自旋一次。
  3. node的前驅節點

    waitStatus=0

    或者

    waitStatus=PROPAGATE

    waitStatus=PROPAGATE

    是共享鎖傳播的情況暫時不考慮),此時将node前驅節點

    waitStatus

    設定為

    SIGNAL

    ,然後再給一次自旋的機會。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         * node拿鎖失敗,前驅節點的狀态是SIGNAL,node節點可以放心的阻塞,
         * 因為下次會被喚醒
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         * pred節點被取消了,跳過pred
         */
        do {
            //pred = pred.prev;
            //node.pred = pred;
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node; 
        //跳過取消節點,給node找一個正常的前驅,然後再循環一次
    } else {
        /* 0 -3
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
           

3、parkAndCheckInterrupt阻塞線程

shouldParkAfterFailedAcquire

判斷應該阻塞線程,則調用

parkAndCheckInterrupt

,其内部調用

LockSupport.park(this)

阻塞目前線程。

LockSupport

UNSAFE

中的

park

unpark

進行了封裝,其能精準阻塞一個線程,也能精準喚醒一個線程(不同于

wait

notify

)。阻塞喚醒會導緻線程進行上下文切換。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
           

三、獨占模式可中斷擷取鎖

可中斷擷取鎖

acquireInterruptibly

和不可中斷擷取鎖

acquire

邏輯類似,差別在于

acquireInterruptibly

可響應

InterruptedException

中斷異常(外部調用

Thread#interrupt

,就可能會導緻中斷)。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        //有被中斷  抛異常
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
           

doAcquireInterruptibly阻塞線程并響應中斷

doAcquireInterruptibly

進入隊列以後的邏輯也與

acquireQueued

差不多,差別也是在于

doAcquireInterruptibly

可響應

InterruptedException

中斷異常。

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //建立node,入隊列
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            //判斷前驅是否是head
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            //1.應該阻塞,調用parkAndCheckInterrupt阻塞線程
            //2.不應該阻塞,再給一次搶鎖的機會
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //遇到中斷抛了異常InterruptedException
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            //沒有擷取鎖,被中斷,取消節點
            cancelAcquire(node);
    }
}
           

四、獨占模式可逾時擷取鎖

可逾時擷取鎖,不僅可以響應中斷,還可以将線程阻塞一段時間,自動喚醒。

tryAcquireNanos

可傳入一個納秒機關的時間

nanosTimeout

,可逾時的邏輯在

doAcquireNanos

中。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        //響應中斷
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
           

doAcquireNanos阻塞線程自動喚醒響應中斷

doAcquireNanos

acquireQueued

邏輯類似,但是也可以響應中斷,同時還可以讓線程阻塞一段時間自動喚醒,如果逾時了還沒擷取鎖則傳回false。

doAcquireNanos

還有一個非常不同之處,就是即使

shouldParkAfterFailedAcquire

判斷應該阻塞了,也有可能不阻塞,還會再自旋一段時間,這個自旋的時長有一個門檻值

spinForTimeoutThreshold = 1000L

,1000納秒,自旋了1000納秒後還沒有擷取鎖,且此時也判斷應該阻塞了,就讓線程休眠一段時間。

線程喚醒,有可能是自動喚醒,有可能是被其他釋放鎖的線程喚醒,喚醒後又被中斷過則抛出異常

InterruptedException

,如果沒有中斷,則繼續循環剛才的流程(判斷前驅是否是head,判斷是否逾時,判斷是否應該阻塞)。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        //已經逾時直接傳回false,擷取鎖失敗
        return false;
    //計算deadline
    final long deadline = System.nanoTime() + nanosTimeout;
    //入隊列
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            //判斷前驅是否是head
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            //逾時傳回false,擷取鎖失敗
            if (nanosTimeout <= 0L)
                return false;
            //1.應該阻塞,調用parkAndCheckInterrupt阻塞線程
            //2.不應該阻塞,再給一次搶鎖的機會
            //3.自旋1000納秒,還沒有擷取鎖就休眠一段時間。1毫秒=1*1000*1000納秒
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                //阻塞一段時間
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                //響應中斷
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

五、獨占模式釋放鎖

釋放鎖的流程很簡單,

tryRelease

需要子類實作,暫時不考慮,當

tryRelease

釋放鎖成功後喚醒後繼節點。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //head不為空,head不是初始化,釋放鎖成功後喚醒後繼節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           

unparkSuccessor喚醒後繼線程

喚醒後繼的條件是

h != null && h.waitStatus != 0

,head不為null且head的狀态不是初始狀态,則喚醒後繼。在獨占模式下

h.waitStatus

可能等于0,-1:

  • h.waitStatus=0

    ,線程釋放鎖,同步隊列中的節點可能剛入隊列,還沒有阻塞,是以無需喚醒隊列。
  • h.waitStatus=-1

    ,head後繼應該正常喚醒。

unparkSuccessor

喚醒後繼需要做如下兩步:

  • h.waitStatus < 0

    node.waitStatus

    重置為0。
  • 喚醒後繼節點,若後繼節點為空或者被取消,則從tail向前找一個距離head最近的正常的節點喚醒。
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    //喚醒後繼節點的線程,若為空,從tail往後周遊找一個距離head最近的正常的節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                //這裡找到的正常節點,并沒有傳回,而是繼續往前找
                s = t;
    }
    if (s != null)
        //喚醒線程
        LockSupport.unpark(s.thread);
}
           

node後繼是null或者取消狀态,為什麼要從tail向前找,而不是直接從head向後找到第一個正常的節點就可以傳回了?目的在于為了照顧剛入隊列的節點。看入隊列操作

addWaiter

節點入隊不是一個原子操作, 雖然用了

compareAndSetTail

操作保證了目前節點被設定成尾節點,但是隻能保證,此時step1和step2是執行完成的,有可能在step3還沒有來的及執行到的時候,有其他線程調用了

unparkSuccessor

方法,此時

pred.next

的值還沒有被設定成node,是以從head往後周遊可能周遊不到尾節點,但是因為尾節點此時已經設定完成,

node.prev = pred

也被執行過了,是以如果從tail往前周遊,新加的尾節點就可以周遊到了,并且可以通過它一直往前找。

總結來說,之是以從tail往前周遊,是因為在多線程并發條件下,如果一個節點的next屬性為null, 并不能保證它就是尾節點(可能是因為新加的尾節點還沒來得及執行

pred.next = node

), 但是一個節點如果能入隊, 則它的prev屬性一定是有值的,是以反向查找一定是最精确的。(這裡需要感謝這篇文章《逐行分析AQS源碼(2)——獨占鎖的釋放》,之前喚醒節點從tail往前找我了解錯了,看了這位部落客分析,才更了解了源碼的邏輯。)

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        //設定node節點的上一個節點是tail
        node.prev = pred; //step1
        //cas設定tail指針指向node
        if (compareAndSetTail(pred, node)) {//step2
            pred.next = node; //step3
            //mode進入尾部成功,傳回
            return node;
        }
    }
    enq(node);
    return node;
}
           

六、取消擷取鎖cancelAcquire

什麼時候可以取消擷取鎖呢?AQS并沒有對外開放取消節點的操作權限,而是在可中斷擷取鎖的時候,中斷導緻擷取失敗,則會被取消擷取鎖。

取消擷取鎖的條件是雙向隊列,一個節點可知前驅和後繼,如單向的

CLH

隊列鎖就不能有取消的動作。而且取消這個動作還是相對複雜的,但是目的很簡單,就是剔除隊列中的取消節點。

  • 首先會将node的thread設定為空。
  • 其次檢查node前驅是否是取消狀态,是則循環跳過,一直為node找一個正常的前驅。
  • 接着

    node.waitStatus

    設定為

    CANCELLED

  • 判斷node是否在尾部,是則tail指針前移到node前驅上,node前驅成為新的tail,其next指針(

    predNext

    )設定為null。
  • 若node不是在尾部,判斷其前驅是否是head以及是否是正常節點。node前驅不是head且正常節點,則将node後繼連結到node前驅next指針(

    predNext

    )上(

    Node next = node.next;compareAndSetNext(pred, predNext, next);

    ),進而使node被剔除。
  • 若node不是在尾部,且node前驅是head,則喚醒node的後繼。node前驅不是head,但是不正常節點(剛好被取消的),則也喚醒node的後繼,這時的喚醒不是為了讓node後繼擷取鎖,而是為node的後繼連結一個正常的前驅(node後繼自旋判斷阻塞時

    shouldParkAfterFailedAcquire

    ,會連結一個正常的前驅)。

若node的前驅是取消狀态,在跳過取消節點,找到一個正确前驅連結給node,此時新找到的前驅

pred.next

不會指向node,是以

pred.next

就不是node。類似如下的三角關系。

AQS源碼解讀(二)——從acquireQueued探索獨占鎖實作原理,如何阻塞?如何喚醒?
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        //pred = pred.prev;
        //node.prev = pred;
        node.prev = pred = pred.prev;
    //如果node的前驅也是取消節點,則pred.next就不是node
    Node predNext = pred.next;
    
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    //如果node在尾部,tail前移
    if (node == tail && compareAndSetTail(node, pred)) {
        //node設定為null
        compareAndSetNext(pred, predNext, null);
    } else {
        //node不在尾部
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        //前繼節點是個正常阻塞節點
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                //node後繼成為pred的後繼
                compareAndSetNext(pred, predNext, next);
        } else {
            // 如果node的前驅是個head,則喚醒node後繼,
            //node前繼節點不是一個正常的節點,喚醒後繼節點
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}
           

七、總結

  • AQS實作的鎖終歸還是個自旋鎖(

    for (;;)

    ),雖然不是無限自旋,也是給了一定的自旋次數,然後再阻塞。
  • AQS之是以是CLH的變種就在于判斷阻塞時

    shouldParkAfterFailedAcquire

    ,自旋檢查前驅節點的狀态。
  • 取消擷取鎖必須是雙向隊列,且隻在可中斷擷取鎖時,中斷導緻擷取鎖失敗後取消節點。取消過程中有可能喚醒取消節點的後繼,但不一定是讓其擷取鎖,而是讓其連結一個正常的前驅。

PS: 如若文章中有錯誤了解,歡迎批評指正,同時非常期待你的評論、點贊和收藏。我是徐同學,願與你共同進步!

AQS源碼解讀(二)——從acquireQueued探索獨占鎖實作原理,如何阻塞?如何喚醒?

繼續閱讀