天天看點

AQS總結

java并發程式設計核心JUC,JUC的核心是AQS,而AQS的核心是Unsafe使用的CAS(compare and swap)。

AQS全稱:AbstractQuenedSynchronizer抽象的隊列式同步器。

AQS定義了一套多線程通路共享資源的同步器架構,許多同步類實作都依賴于它。如:ReentrantLock/Semaphore/CountDownLatch...。

大白話,AQS就是基于CLH隊列,用volatile修飾共享變量state,線程通過CAS去改變狀态符,成功則擷取鎖成功,失敗則進入等待隊列,等待被喚醒。

深入了解,多線程通路共享資源涉及步驟(ReentrantLock為例):

搶鎖

// 嘗試擷取鎖 成功傳回true,失敗傳回false
// 失敗則進行入隊操作
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

        // 公平鎖與非公平鎖實作不同 公平鎖
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 判斷是否已鎖
            if (c == 0) {
                // 是否有隊列有隊列直接傳回false 搶鎖失敗
                // cas搶鎖,搶鎖成功設定目前線程
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 重入鎖
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        // 非公平鎖直接強鎖
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 判斷是否已鎖
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 重入鎖
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
           

入隊

// 入隊前先建立節點
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    // 建立節點
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 尾節點存在 證明隊列存在 直接放入隊列尾部
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 
        enq(node);
        return node;
    }
    // 隊列不存在情況 建立兩個節點,頭結點作為 目前持有鎖的節點
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    // 入隊操作:總結就是我是下一個頭結點就跳出循環 否則一直循環等待我是下一個頭結點
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 入隊失敗标記
        try {
            boolean interrupted = false;// 中斷标記
            // 自旋
            for (;;) {
                // 目前節點的前節點是頭結點證明直接下一個就能輪到他 進行擷取鎖,點嘗試擷取鎖,擷取成功把自己設定為頭節點,之前的頭結點需要GC回收
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 不是下一個要處理的節點,就通過park()進入等待狀态。直到被喚醒
                // 如果自己可以休息了,就通過park()進入waiting狀态,直到被unpark()。如果不可中斷的情況下被中斷了,那麼會從park()中醒過來,發現拿不到資源,進而繼續進入park()等待。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    // 設定自己等待waiting 如果
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 等待标記
        if (ws == Node.SIGNAL) // 前節點 正處于等待标記(停止休息)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { // 辨別前節點取消或者放棄擷取鎖
            /*
             * Predecessor was cancelled. Skip over predecessors and indicate retry.
             *  如果前節點放棄了,那就一直往前找,直到找到最近一個正常等待的狀态,并排在它的後邊。
             *  注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鍊,稍後就會被保安大叔趕走了(GC回收)!
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             * 如果前節點正常,那就把前節點的狀态設定成SIGNAL,告訴它拿完号後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    // 真正處于等待狀态
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 調用park()使線程進入waiting狀态
        return Thread.interrupted();
    }
           
AQS總結
AQS總結
// 釋放鎖
    public void unlock() {
        sync.release(1);
    }
    // 先進行狀态修改再進行頭節點的下一個節點喚醒操作
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    // 鎖狀态設定為 狀态 - 1(重入鎖多次釋放)
    protected final boolean tryRelease(int releases) {
	int c = getState() - releases;
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
    }
    // waitStatus頭結點等待設定為0 頭節點的下一個節點unpark 被喚醒。
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
           
// 出隊在 入隊操作時 發現目前節點的頭結點 并能成功搶鎖 則頭結點出隊。p.next = null; // help GC
           
// 鎖釋放頭結點的下一個節點被喚醒。
           

繼續閱讀