天天看點

AbstractQueuedSynchronizer中條件(Condition)等待(await)、通知喚醒(signal)實作的源碼分析1. 概述2. 分析

1. 概述

在AbstractQueuedSynchronizer中,有兩個FIFO隊列,一個是同步隊列,用來排隊申請同步狀态,還有一個是條件等待隊列,當調用了await()系列的方法後,就會在等待隊列尾部插入一個節點,通知喚醒的時候會把這個節點從等待隊列轉移到同步隊列。

本文主要描述條件等待隊列以及等待、通知機制的實作,關于同步隊列的相關操作和實作分析,可以在這篇部落格了解。

2. 分析

ReentrantLock可以實作條件等待,我們可以先調用newCondition()方法生成一個條件對象ConditionObject,然後調用ConditionObject的await()方法即可實作條件等待。

一路跟蹤源碼到AbstractQueuedSynchronizer内部類ConditionObject的await()方法。

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //在等待隊列的尾部添加一個節點
            Node node = addConditionWaiter();
            //釋放同步隊列中目前節點持有的同步狀态,在這裡會将同步隊列中對應的節點移除
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                //如果不在同步隊列裡,阻塞目前線程,等待喚醒
                LockSupport.park(this);
                //線程被喚醒,是否把這個節點添加到同步隊列中,檢查目前線程的interrupt中斷标記
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //在同步隊列中進行自旋等待同步狀态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //清除無效的等待節點
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            //進行中斷标記
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }           

首先進行的是addConditionWaiter()方法。

/**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }           

這個方法中,先取到最後一個等待節點t,如果t不為null或者t的狀态不為CONDITION,觸發一次“删除取消等待的節點”操作,也就是unlinkCancelledWaiters()方法,隻要節點的狀态不為CONDITION,該節點就可以被認為取消等待了(等待結束)。删除結束,就新建立一個CONDITION狀态的節點加入到隊列末尾。注意條件等待隊列和同步隊列中的不同,條件等待隊列中的節點隻知道它的下一個節點,并不知道它的上一個節點。

到這裡addConditionWaiter()方法結束,此時已經增加了一個節點到條件等待隊列,并且将新增加的節點傳回。再回到await()方法,接下來需要同步隊列中目前持有同步狀态的節點(頭部節點,對應了目前線程)進行資源的釋放,也就是fullyRelease()方法,進入fullyRelease(),其實就是調用了release()方法,并傳回了釋放前的加鎖的次數。release()方法解析參見這篇部落格。

再傳回到await()方法,接下來是一個while循環,判斷這個節點是否在同步隊列上,如果不在,就進入循環體,通過LockSupport.park()阻塞線程,當線程被喚醒,執行while中的if判斷。

/**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

    /**
     * Transfers node, if necessary, to sync queue after a cancelled wait.
     * Returns true if thread was cancelled before being signalled.
     *
     * @param node the node
     * @return true if cancelled before the node was signalled
     */
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }           

如果目前線程被中斷,則調用transferAfterCancelledWait決定外層await()方法是應該抛出InterruptedException異常還是重新調用中斷方法。transferAfterCancelledWait()方法中,如果CAS設定成功,說明中斷之前沒有調用signal()方法(因為signal方法會對節點狀态進行CAS設定,從CONDITION到0),将目前節點添加到同步隊列;否則,循環檢查目前節點是否在同步隊列中,如果不在,就把CPU讓出來,直到被放到同步隊列上時,中止循環,傳回false。

然後回到await()方法,調用acquireQueued(),acquireQueued()中進行自旋等待同步資源(這篇部落格),讓被喚醒的節點進行排隊,并進行中斷标記,如果有下一個條件等待者,這裡會觸發一次“取消等待節點清理”操作。再最後根據中斷标記決定是抛出中斷異常,還是僅僅調用中斷方法。

接着在來看signal()方法,signal()方法喚醒等待的線程後,while中LockSupport.park(this);之後的代碼才可以執行。

/**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }           

從signal()方法跟蹤到doSignal()方法,這裡主要進行的操作是,将等待隊列中的第一個等待者移動到同步隊列,當成一個全新的尾節點來進行處理,具體流程也是增加到尾節點、然後進行自旋擷取(這篇部落格),并且這裡将firstWaiter指向原本firstWaiter的下一個等待者,使原本的firstWaiter成為了一個垃圾對象(可被回收)。

signalAll方法也很好了解,直接将所有的等待者喚醒。

繼續閱讀