天天看點

解密Java并發程式設計核心利器:AQS(AbstractQueuedSynchronizer)

作者:隻愛喝胡辣湯

AQS簡介

AQS,AbstractQueuedSynchronizer,抽象隊列同步器,是一個用來建構鎖和同步器的架構,java中常見的鎖如ReentrantLock、Semaphore底層都是基于AQS來實作的。

AQS核心思想

如果被請求的共享資源空閑,則将目前請求資源設定為有效的線程,并且将共享資源設定為鎖定狀态,如果被請求的共享資源被占用,則會将目前請求線程進行阻塞操作,并在共享資源釋放時會嘗試去喚醒。

AQS是用CLH隊列鎖實作的,将在暫時擷取不到鎖的線程加入到隊列中。

CLH(Craig,Landin,and Hagersten)隊列是一個虛拟的雙向隊列(虛拟的雙向隊列即不存在隊列執行個體,僅存在結點之間的關聯關系)。AQS是将每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實作鎖的配置設定。

共享變量

AQS中使用一個volatile修飾的state作為共享變量

private volatile int state;//共享變量,使用volatile修飾保證線程可見性           

共享方式

AQS定義了兩種資源共享方式

  • 獨占鎖(Exclusive):隻有一個線程可以執行。而獨占鎖又分為公平鎖和非公平鎖
    • 公平鎖:按照線程申請的順序,先到者先拿到鎖
    • 非公平鎖:當多個線程擷取鎖時,無視隊列中的排隊順序,誰先搶到就是誰的
  • 共享鎖(Share):多個線程可同時執行,如Semaphore/CountDownLatch

AQS資料結構

AbstractQueuedSynchronizer類底層的資料結構是使用CLH(Craig,Landin,and Hagersten)隊列是一個虛拟的雙向隊列(虛拟的雙向隊列即不存在隊列執行個體,僅存在結點之間的關聯關系)。AQS是将每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實作鎖的配置設定。其中Sync queue,即同步隊列,是雙向連結清單,包括head結點和tail結點,head結點主要用作後續的排程。而Condition queue不是必須的,其是一個單向連結清單,隻有當使用Condition時,才會存在此單向連結清單。并且可能會有多個Condition queue。

解密Java并發程式設計核心利器:AQS(AbstractQueuedSynchronizer)

AQS源碼分析

AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer抽象類

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable            

AbstractOwnableSynchronizer

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    
    // 版本序列号
    private static final long serialVersionUID = 3737899427754241961L;
    // 構造方法
    protected AbstractOwnableSynchronizer() { }
    // 獨占模式下的線程
    private transient Thread exclusiveOwnerThread;
    
    // 設定獨占線程 
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    
    // 擷取獨占線程 
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}           

AbstractOwnableSynchronizer中可以設定獨占線程,并且可以擷取獨占線程。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer類有兩個内部類,Node和ConditionObject類

AbstractQueuedSynchronizer -- Node

static final class Node {
    // 模式,分為共享與獨占
    // 共享模式
    static final Node SHARED = new Node();
    // 獨占模式
    static final Node EXCLUSIVE = null;        
    // 結點狀态
    // CANCELLED,值為1,表示目前的線程被取消
    // SIGNAL,值為-1,表示目前節點的後繼節點包含的線程需要運作,也就是unpark
    // CONDITION,值為-2,表示目前節點在等待condition,也就是在condition隊列中
    // PROPAGATE,值為-3,表示目前場景下後續的acquireShared能夠得以執行
    // 值為0,表示目前節點在sync隊列中,等待着擷取鎖
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;        

    // 結點狀态
    volatile int waitStatus;        
    // 前驅結點
    volatile Node prev;    
    // 後繼結點
    volatile Node next;        
    // 結點所對應的線程
    volatile Thread thread;        
    // 下一個等待者
    Node nextWaiter;
    
    // 結點是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 擷取前驅結點,若前驅結點為空,抛出異常
    final Node predecessor() throws NullPointerException {
        // 儲存前驅結點
        Node p = prev; 
        if (p == null) // 前驅結點為空,抛出異常
            throw new NullPointerException();
        else // 前驅結點不為空,傳回
            return p;
    }
    
    // 無參構造方法
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    // 構造方法
        Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 構造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
           

AbstractQueuedSynchronizer -- ConditionObject

// 内部類
public class ConditionObject implements Condition, java.io.Serializable {
    // 版本号
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    // condition隊列的頭節點
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    // condition隊列的尾結點
    private transient Node lastWaiter;

    /**
        * Creates a new {@code ConditionObject} instance.
        */
    // 構造方法
    public ConditionObject() { }

    // Internal methods

    /**
        * Adds a new waiter to wait queue.
        * @return its new wait node
        */
    // 添加新的waiter到wait隊列
    private Node addConditionWaiter() {
        // 儲存尾結點
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) { // 尾結點不為空,并且尾結點的狀态不為CONDITION
            // 清除狀态為CONDITION的結點
            unlinkCancelledWaiters(); 
            // 将最後一個結點重新指派給t
            t = lastWaiter;
        }
        // 建立一個結點
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null) // 尾結點為空
            // 設定condition隊列的頭節點
            firstWaiter = node;
        else // 尾結點不為空
            // 設定為節點的nextWaiter域為node結點
            t.nextWaiter = node;
        // 更新condition隊列的尾結點
        lastWaiter = node;
        return node;
    }

    /**
        * Removes and transfers nodes until hit non-cancelled one or
        * null. Split out from signal in part to encourage compilers
        * to inline the case of no waiters.
        * @param first (non-null) the first node on condition queue
        */
    private void doSignal(Node first) {
        // 循環
        do {
            if ( (firstWaiter = first.nextWaiter) == null) // 該節點的nextWaiter為空
                // 設定尾結點為空
                lastWaiter = null;
            // 設定first結點的nextWaiter域
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null); // 将結點從condition隊列轉移到sync隊列失敗并且condition隊列中的頭節點不為空,一直循環
    }

    /**
        * Removes and transfers all nodes.
        * @param first (non-null) the first node on condition queue
        */
    private void doSignalAll(Node first) {
        // condition隊列的頭節點尾結點都設定為空
        lastWaiter = firstWaiter = null;
        // 循環
        do {
            // 擷取first結點的nextWaiter域結點
            Node next = first.nextWaiter;
            // 設定first結點的nextWaiter域為空
            first.nextWaiter = null;
            // 将first結點從condition隊列轉移到sync隊列
            transferForSignal(first);
            // 重新設定first
            first = next;
        } while (first != null);
    }

    /**
        * Unlinks cancelled waiter nodes from condition queue.
        * Called only while holding lock. This is called when
        * cancellation occurred during condition wait, and upon
        * insertion of a new waiter when lastWaiter is seen to have
        * been cancelled. This method is needed to avoid garbage
        * retention in the absence of signals. So even though it may
        * require a full traversal, it comes into play only when
        * timeouts or cancellations occur in the absence of
        * signals. It traverses all nodes rather than stopping at a
        * particular target to unlink all pointers to garbage nodes
        * without requiring many re-traversals during cancellation
        * storms.
        */
    // 從condition隊列中清除狀态為CANCEL的結點
    private void unlinkCancelledWaiters() {
        // 儲存condition隊列頭節點
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) { // t不為空
            // 下一個結點
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) { // t結點的狀态不為CONDTION狀态
                // 設定t節點的nextWaiter域為空
                t.nextWaiter = null;
                if (trail == null) // trail為空
                    // 重新設定condition隊列的頭節點
                    firstWaiter = next;
                else // trail不為空
                    // 設定trail結點的nextWaiter域為next結點
                    trail.nextWaiter = next;
                if (next == null) // next結點為空
                    // 設定condition隊列的尾結點
                    lastWaiter = trail;
            }
            else // t結點的狀态為CONDTION狀态
                // 設定trail結點
                trail = t;
            // 設定t結點
            t = next;
        }
    }

    // public methods

    /**
        * Moves the longest-waiting thread, if one exists, from the
        * wait queue for this condition to the wait queue for the
        * owning lock.
        *
        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
        *         returns {@code false}
        */
    // 喚醒一個等待線程。如果所有的線程都在等待此條件,則選擇其中的一個喚醒。在從 await 傳回之前,該線程必須重新擷取鎖。
    public final void signal() {
        if (!isHeldExclusively()) // 不被目前線程獨占,抛出異常
            throw new IllegalMonitorStateException();
        // 儲存condition隊列頭節點
        Node first = firstWaiter;
        if (first != null) // 頭節點不為空
            // 喚醒一個等待線程
            doSignal(first);
    }

    /**
        * Moves all threads from the wait queue for this condition to
        * the wait queue for the owning lock.
        *
        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
        *         returns {@code false}
        */
    // 喚醒所有等待線程。如果所有的線程都在等待此條件,則喚醒所有線程。在從 await 傳回之前,每個線程都必須重新擷取鎖。
    public final void signalAll() {
        if (!isHeldExclusively()) // 不被目前線程獨占,抛出異常
            throw new IllegalMonitorStateException();
        // 儲存condition隊列頭節點
        Node first = firstWaiter;
        if (first != null) // 頭節點不為空
            // 喚醒所有等待線程
            doSignalAll(first);
    }

    /**
        * Implements uninterruptible condition wait.
        * <ol>
        * <li> Save lock state returned by {@link #getState}.
        * <li> Invoke {@link #release} with saved state as argument,
        *      throwing IllegalMonitorStateException if it fails.
        * <li> Block until signalled.
        * <li> Reacquire by invoking specialized version of
        *      {@link #acquire} with saved state as argument.
        * </ol>
        */
    // 等待,目前線程在接到信号之前一直處于等待狀态,不響應中斷
    public final void awaitUninterruptibly() {
        // 添加一個結點到等待隊列
        Node node = addConditionWaiter();
        // 擷取釋放的狀态
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        while (!isOnSyncQueue(node)) { // 
            // 阻塞目前線程
            LockSupport.park(this);
            if (Thread.interrupted()) // 目前線程被中斷
                // 設定interrupted狀态
                interrupted = true; 
        }
        if (acquireQueued(node, savedState) || interrupted) // 
            selfInterrupt();
    }

    /*
        * For interruptible waits, we need to track whether to throw
        * InterruptedException, if interrupted while blocked on
        * condition, versus reinterrupt current thread, if
        * interrupted while blocked waiting to re-acquire.
        */

    /** Mode meaning to reinterrupt on exit from wait */
    private static final int REINTERRUPT =  1;
    /** Mode meaning to throw InterruptedException on exit from wait */
    private static final int THROW_IE    = -1;

    /**
        * Checks for interrupt, returning THROW_IE if interrupted
        * before signalled, REINTERRUPT if after signalled, or
        * 0 if not interrupted.
        */
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0; 
    }

    /**
        * Throws InterruptedException, reinterrupts current thread, or
        * does nothing, depending on mode.
        */
    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

    /**
        * Implements interruptible condition wait.
        * <ol>
        * <li> If current thread is interrupted, throw InterruptedException.
        * <li> Save lock state returned by {@link #getState}.
        * <li> Invoke {@link #release} with saved state as argument,
        *      throwing IllegalMonitorStateException if it fails.
        * <li> Block until signalled or interrupted.
        * <li> Reacquire by invoking specialized version of
        *      {@link #acquire} with saved state as argument.
        * <li> If interrupted while blocked in step 4, throw InterruptedException.
        * </ol>
        */
    // // 等待,目前線程在接到信号或被中斷之前一直處于等待狀态
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) // 目前線程被中斷,抛出異常
            throw new InterruptedException();
        // 在wait隊列上添加一個結點
        Node node = addConditionWaiter();
        // 
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 阻塞目前線程
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 檢查結點等待時的中斷類型
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    /**
        * Implements timed condition wait.
        * <ol>
        * <li> If current thread is interrupted, throw InterruptedException.
        * <li> Save lock state returned by {@link #getState}.
        * <li> Invoke {@link #release} with saved state as argument,
        *      throwing IllegalMonitorStateException if it fails.
        * <li> Block until signalled, interrupted, or timed out.
        * <li> Reacquire by invoking specialized version of
        *      {@link #acquire} with saved state as argument.
        * <li> If interrupted while blocked in step 4, throw InterruptedException.
        * </ol>
        */
    // 等待,目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态 
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    /**
        * Implements absolute timed condition wait.
        * <ol>
        * <li> If current thread is interrupted, throw InterruptedException.
        * <li> Save lock state returned by {@link #getState}.
        * <li> Invoke {@link #release} with saved state as argument,
        *      throwing IllegalMonitorStateException if it fails.
        * <li> Block until signalled, interrupted, or timed out.
        * <li> Reacquire by invoking specialized version of
        *      {@link #acquire} with saved state as argument.
        * <li> If interrupted while blocked in step 4, throw InterruptedException.
        * <li> If timed out while blocked in step 4, return false, else true.
        * </ol>
        */
    // 等待,目前線程在接到信号、被中斷或到達指定最後期限之前一直處于等待狀态
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (System.currentTimeMillis() > abstime) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    /**
        * Implements timed condition wait.
        * <ol>
        * <li> If current thread is interrupted, throw InterruptedException.
        * <li> Save lock state returned by {@link #getState}.
        * <li> Invoke {@link #release} with saved state as argument,
        *      throwing IllegalMonitorStateException if it fails.
        * <li> Block until signalled, interrupted, or timed out.
        * <li> Reacquire by invoking specialized version of
        *      {@link #acquire} with saved state as argument.
        * <li> If interrupted while blocked in step 4, throw InterruptedException.
        * <li> If timed out while blocked in step 4, return false, else true.
        * </ol>
        */
    // 等待,目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。此方法在行為上等效于: awaitNanos(unit.toNanos(time)) > 0
    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    //  support for instrumentation

    /**
        * Returns true if this condition was created by the given
        * synchronization object.
        *
        * @return {@code true} if owned
        */
    final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
        return sync == AbstractQueuedSynchronizer.this;
    }

    /**
        * Queries whether any threads are waiting on this condition.
        * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
        *
        * @return {@code true} if there are any waiting threads
        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
        *         returns {@code false}
        */
    //  查詢是否有正在等待此條件的任何線程
    protected final boolean hasWaiters() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                return true;
        }
        return false;
    }

    /**
        * Returns an estimate of the number of threads waiting on
        * this condition.
        * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
        *
        * @return the estimated number of waiting threads
        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
        *         returns {@code false}
        */
    // 傳回正在等待此條件的線程數估計值
    protected final int getWaitQueueLength() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int n = 0;
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                ++n;
        }
        return n;
    }

    /**
        * Returns a collection containing those threads that may be
        * waiting on this Condition.
        * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
        *
        * @return the collection of threads
        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
        *         returns {@code false}
        */
    // 傳回包含那些可能正在等待此條件的線程集合
    protected final Collection<Thread> getWaitingThreads() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION) {
                Thread t = w.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }
}
           

AbstractQueuedSynchronizer

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {    
    // 版本号
    private static final long serialVersionUID = 7373984972572414691L;    
    // 頭節點
    private transient volatile Node head;    
    // 尾結點
    private transient volatile Node tail;    
    // 狀态
    private volatile int state;    
    // 自旋時間
    static final long spinForTimeoutThreshold = 1000L;
    
    // Unsafe類執行個體
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state記憶體偏移位址
    private static final long stateOffset;
    // head記憶體偏移位址
    private static final long headOffset;
    // state記憶體偏移位址
    private static final long tailOffset;
    // tail記憶體偏移位址
    private static final long waitStatusOffset;
    // next記憶體偏移位址
    private static final long nextOffset;
    // 靜态初始化塊
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
}
           

AQS核心方法

acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
             

一個線程的執行流程

  • 首先調用tryAcquire,此方法的線程會試圖在獨占模式下擷取對象狀态。此方法應該查詢是否允許它在獨占模式下擷取對象狀态,但是這個在AbstractQueuedSynchronizer會抛出一個異常,預設是在子類中去實作
  • 如果tryAcquire失敗,則調用addWaiter方法,将目前線程封裝成為一個Node并放入CLH的隊尾
  • 調用acquireQueued方法,這個方法會不斷的去嘗試擷取資源進行加鎖操作,如果成功傳回true,如果失敗傳回false

addWaiter

// 添加等待者
private Node addWaiter(Node mode) {
    // 新生成一個結點,預設為獨占模式
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 儲存尾結點
    Node pred = tail;
    if (pred != null) { // 尾結點不為空,即已經被初始化
        // 将node結點的prev域連接配接到尾結點
        node.prev = pred; 
        if (compareAndSetTail(pred, node)) { // 比較pred是否為尾結點,是則将尾結點設定為node 
            // 設定尾結點的next域為node
            pred.next = node;
            return node; // 傳回新生成的結點
        }
    }
    enq(node); // 尾結點為空(即還沒有被初始化過),或者是compareAndSetTail操作失敗,則入隊列
    return node;
}
           

acquireQueue

// sync隊列中的結點在獨占且忽略中斷的模式下擷取(資源)
final boolean acquireQueued(final Node node, int arg) {
    // 标志
    boolean failed = true;
    try {
        // 中斷标志
        boolean interrupted = false;
        for (;;) { // 無限循環
            // 擷取node節點的前驅結點
            final Node p = node.predecessor(); 
            if (p == head && tryAcquire(arg)) { // 前驅為頭節點并且成功獲得鎖
                setHead(node); // 設定頭節點
                p.next = null; // help GC
                failed = false; // 設定标志
                return interrupted; 
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

首先擷取目前節點的前驅節點,如果前驅節點是頭節點并且能夠擷取(資源),代表該目前節點能夠占有鎖,設定頭節點為目前節點,傳回。否則,調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法,首先,我們看shouldParkAfterFailedAcquire方法

shouldParkAfterFailedAcquire

// 當擷取(資源)失敗後,檢查并且更新結點狀态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 擷取前驅結點的狀态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 狀态為SIGNAL,為-1
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        // 可以進行park操作
        return true; 
    if (ws > 0) { // 表示狀态為CANCELLED,為1
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0); // 找到pred結點前面最近的一個狀态不為CANCELLED的結點
        // 指派pred結點的next域
        pred.next = node; 
    } else { // 為PROPAGATE -3 或者是0 表示無狀态,(為CONDITION -2時,表示此節點在condition queue中) 
        /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
        // 比較并設定前驅結點的狀态為SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
    }
    // 不能進行park操作
    return false;
}           

隻有當該節點的前驅結點的狀态為SIGNAL時,才可以對該結點所封裝的線程進行park操作。否則,将不能進行park操作。再看parkAndCheckInterrupt方法

parkAndCheckInterrupt

// 進行park操作并且傳回該線程是否被中斷
private final boolean parkAndCheckInterrupt() {
    // 在許可可用之前禁用目前線程,并且設定了blocker
    LockSupport.park(this);
    return Thread.interrupted(); // 目前線程是否已被中斷,并清除中斷标記位
}
           

parkAndCheckInterrupt方法裡的邏輯是首先執行park操作,即禁用目前線程,然後傳回該線程是否已經被中斷

cancelAcquire

// 取消繼續擷取(資源)
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    // node為空,傳回
    if (node == null)
        return;
    // 設定node結點的thread為空
    node.thread = null;

    // Skip cancelled predecessors
    // 儲存node的前驅結點
    Node pred = node.prev;
    while (pred.waitStatus > 0) // 找到node前驅結點中第一個狀态小于0的結點,即不為CANCELLED狀态的結點
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 擷取pred結點的下一個結點
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 設定node結點的狀态為CANCELLED
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) { // node結點為尾結點,則設定尾結點為pred結點
        // 比較并設定pred結點的next節點為null
        compareAndSetNext(pred, predNext, null); 
    } else { // node結點不為尾結點,或者比較設定不成功
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) { // (pred結點不為頭節點,并且pred結點的狀态為SIGNAL)或者 
                                // pred結點狀态小于等于0,并且比較并設定等待狀态為SIGNAL成功,并且pred結點所封裝的線程不為空
            // 儲存結點的後繼
            Node next = node.next;
            if (next != null && next.waitStatus <= 0) // 後繼不為空并且後繼的狀态小于等于0
                compareAndSetNext(pred, predNext, next); // 比較并設定pred.next = next;
        } else {
            unparkSuccessor(node); // 釋放node的前一個結點
        }

        node.next = node; // help GC
    }
}
           

該方法完成的功能就是取消目前線程對資源的擷取,即設定該結點的狀态為CANCELLED

unparkSuccessor

// 釋放後繼結點
private void unparkSuccessor(Node node) {
    /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
    // 擷取node結點的等待狀态
    int ws = node.waitStatus;
    if (ws < 0) // 狀态值小于0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
        // 比較并且設定結點等待狀态,設定為0
        compareAndSetWaitStatus(node, ws, 0);

    /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
    // 擷取node節點的下一個結點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // 下一個結點為空或者下一個節點的等待狀态大于0,即為CANCELLED
        // s指派為空
        s = null; 
        // 從尾結點開始從後往前開始周遊
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0) // 找到等待狀态小于等于0的結點,找到最前的狀态小于等于0的結點
                // 儲存結點
                s = t;
    }
    if (s != null) // 該結點不為為空,釋放許可
        LockSupport.unpark(s.thread);
}
           

該方法的作用就是為了釋放node節點的後繼結點。

release方法

以獨占鎖為例

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 釋放成功
        // 儲存頭節點
        Node h = head; 
        if (h != null && h.waitStatus != 0) // 頭節點不為空并且頭節點狀态不為0
            unparkSuccessor(h); //釋放頭節點的後繼結點
        return true;
    }
    return false;
}
           

AQS中的設計模式

AQS中的設計是基于模闆方法模式的,如果需要自定義同步器可以通過繼承AbstractQueuedSynchronizer來實作

繼續閱讀