天天看點

AQS(AbstractQueuedSynchronizer)解析

   定義

    AQS定義了一套多線程通路共享資源的同步器架構,許多同步類實作都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch。

資料結構

  它維護了一個state,代表共享資源,一個等待隊列,多線程調用資源被阻塞時候就會進入此隊列

AQS(AbstractQueuedSynchronizer)解析

   AQS使用了模闆方法模式,自定義的同步器在在實作時隻需要實作共享資源state的擷取與釋放即可,值域具體線程等待隊列的維護,AQS已經在頂層實作好了。

模闆方法主要有:

獨占式擷取

  • accquire
  • acquireInterruptibly
  • tryAcquireNanos

共享式擷取

  • acquireShared
  • acquireSharedInterruptibly
  • tryAcquireSharedNanos

獨占式釋放鎖

  • release

共享式釋放鎖

  • releaseShared

自定義同步器主要是實作以下方法

  • isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
  • tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
  • tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false

源碼解析

先介紹一下它的變量及内部類Node節點。

/** 隊列中的頭節點 */
    private transient volatile Node head;
    /** 隊列中的尾節點 */
    private transient volatile Node tail;
    /** 同步狀态、鎖數量 */
    private volatile int state;

    /** 擷取同步狀态 */
    protected final int getState() {
        return state;
    }

    /** 設定同步狀态 */
    protected final void setState(int newState) {
        state = newState;
    }

    /** 通過CAS設定同步狀态 */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
   
static final class Node {
        static final Node SHARED = new Node();
        /** 表示在獨占模式下等待 */
        static final Node EXCLUSIVE = null;
        /** 表示已經被舍棄 */
        static final int CANCELLED =  1;
        /** 表示後續節點需要被喚醒 */
        static final int SIGNAL    = -1;
        /** 節點處于等待隊列中 */
        static final int CONDITION = -2;
        /**
         * 代表後續結點會傳播喚醒的操作,共享模式下起作用
         */
        static final int PROPAGATE = -3;
        /** 節點狀态 */
        volatile int waitStatus;

        /**上一節點*/
        volatile Node prev;
        /**下一節點*/
        volatile Node next;

        /** node所屬的線程 */
        volatile Thread thread;
        /** 下一個等待的節點 */
        Node nextWaiter;
        /** 是否共享 */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

    }
           

獨占鎖

    我重點介紹的是獨占鎖,因為共享鎖用到的少。

  • 擷取獨占鎖
AQS(AbstractQueuedSynchronizer)解析

擷取獨占鎖流程

// 擷取鎖
    public final void acquire(int arg) {
        // tryAcquire擷取鎖,如果state==0,或者持有鎖的線程再次擷取鎖,那麼就得到鎖并修改state,可重入鎖也是在這裡提現出來的,由子類具體實作,
        // Node.EXCLUSIVE表示建立獨占模式節點
        // 如果擷取不到鎖就将目前線程加入到隊列
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

沒有擷取到鎖,是以将節點加入到隊列尾部

// 将節點插入等待隊列尾部
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 如果尾節點不為空,則快速将node插入到隊列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS設定目前節點為尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 否則,通過自旋方式直到成功
        enq(node);
        return node;
    }

    // 自旋将節點插入等待隊列尾部
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 如果尾節點為空,說明隊列中沒有節點,那麼就通過CAS初始化隊列,并且頭、尾節點指向同一個
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 否則将本節點插入隊列末尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

嘗試去擷取鎖,如果擷取不到,那麼就進入休眠

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            // 中斷标志
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); // 拿到前驅節點
                // 如果前驅節點是頭結點,那麼就嘗試擷取鎖    
                if (p == head && tryAcquire(arg)) { 
                    // 擷取到鎖
                    setHead(node);  // 将頭節點設定為該節點
                    p.next = null; // 将之前的頭結點的next設為null,表示從隊列中脫離
                    failed = false;
                    return interrupted;
                }
                // 沒擷取到鎖,那麼就進入waiting狀态,直到被unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    // 線程被中斷,那麼就改變狀态位
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


    // 由方法名就可以看出是 擷取鎖失敗後應該進入睡眠狀态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 擷取前驅節點的狀态
        if (ws == Node.SIGNAL)
            // 如果狀态為-1即下一節點需要被喚醒,那麼傳回true
            return true;
        if (ws > 0) {
            // 如果狀态>0即該節點已經被廢棄,那麼往前周遊,直到找到正常等待的節點,并排在它後面
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果前驅正常,那就把前驅的狀态設定成SIGNAL,告訴它下一節點需要被喚醒
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // 線程進入睡眠
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);    // 進入睡眠,等待被喚醒
        return Thread.interrupted(); //如果被喚醒,檢視自己是不是被中斷的
    }

    /**
     * 如果線程發現線程被中斷,那麼就中斷目前線程
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
           
  • 逾時等待擷取鎖

與擷取鎖邏輯一樣,隻不過多了時間判斷

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
	
	private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
		// 計算截止時間
        final long deadline = System.nanoTime() + nanosTimeout;
		// 将node添加到隊列
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
				// 擷取前驅節點,判斷是否可以擷取鎖
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
				// 計算是否逾時
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
					// 進入有逾時時間的睡眠
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           
  • 釋放鎖

釋放鎖邏輯比較簡單,大緻就是将state-1,喚醒下一個等待線程。

public final boolean release(int arg) {
        // tryRelease()将state -1,如果state變為0,則将擁有鎖的線程exclusiveOwnerThread置為null,由子類具體實作
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 如果頭節點不為空并且狀态不是0,那麼就去喚醒下一個節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

喚醒下一個節點

// 喚醒下一個沒有被廢棄的節點
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 如果線程等待狀态<0,那麼就CAS變為0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 如果下一個節點為空或者已經被廢棄,則從尾部向前擷取到狀為<=0的節點
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 喚醒這個節點的線程,然後去擷取鎖
        if (s != null)
            LockSupport.unpark(s.thread);
    }
           

共享鎖

讀寫鎖中的讀鎖,就是實作了共享鎖。

  • 擷取共享鎖
public final void acquireShared(int arg) {
        // tryAcquireShared嘗試擷取鎖,傳回小于0,則說明擷取鎖失敗,由自類實作
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
           

tryAcquireShared()方法實作可以看 https://blog.csdn.net/zgsxhdzxl/article/details/106071399

沒有擷取到鎖,則

private void doAcquireShared(int arg) {
        // 将節點加入到尾部
        final Node node = addWaiter(Node.SHARED);
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();
                // 前驅節點是頭結點,則去擷取鎖
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // 擷取鎖成功
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        } finally {
            if (interrupted)
                selfInterrupt();
        }
    }
           

擷取鎖成功之後,還會去喚醒後續的共享節點setHeadAndPropagate()

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 将node設定為新的首節點
        setHead(node);
        // 判斷後續節點是否需要被喚醒
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 下個節點是共享方式,則喚醒後續為共享方式的節點
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
           

這裡一直沒搞明白,為什麼會喚醒後續的共享節點,因為感覺循環隻會喚醒一個節點。

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 隻有結點狀态為SIGNAL才嘗試喚醒後繼結點
                if (ws == Node.SIGNAL) {
                    // 将waitStatus設定為0
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;    
                    // 喚醒後續的節點       
                    unparkSuccessor(h);
                }
                 // 如果狀态為0則更新狀态為PROPAGATE
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)                   
                break;
        }
    }
           
  • 釋放鎖
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
           

tryReleaseShared可以看 https://blog.csdn.net/zgsxhdzxl/article/details/106071399

繼續閱讀