天天看點

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

AQS介紹

AQS是同步鎖内實作同步的共同父類,如下UML圖能看出,ReentrantLock等鎖都是基于AQS。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

下面主要介紹獨占鎖(Exclusive)的實作。

(本文參照網上資料和jdk1.8源碼加上自己了解,可能會有一些論述錯誤的地方,望諒解并指正)

一、背景知識

AQS内部會維護一個雙向連結清單,來存儲争取鎖的線程。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

(圖檔來自網絡)

Node節點的屬性除了nextWaiter都是volatile的。

第二點,解釋一下節點的兩種狀态,獨占和共享: Exclusive(獨占,隻有一個線程能執行,如ReentrantLock),Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。

AQS存在一個屬性為volatile的state,這個state在不同的實作上可以有不同的含義。

  • ReentrantLock的state表示是否已鎖定,當調用lock時,state就會加一(可重入),unlock就會減一。
  • CountdownLatch的state表示倒計時的個數,每次countDown會減一。

第三點,線程的中斷是個什麼?

Java中斷機制是一種協作機制,也就是說通過中斷并不能直接終止另一個線程,而需要被中斷的線程自己進行中斷。這好比是家裡的父母叮囑在外的子女要注意身體,但子女是否注意身體,怎麼注意身體則完全取決于自己。這并不是使線程進入阻塞狀态。

第四點 waitStatus的狀态

源碼裡面說了很大一段,總結成下面幾個意思。

注意,status表示的都是目前節點後面的節點狀态

共有以下五種狀态:

  • CANCELLED(1):表示目前結點已取消排程。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀态,進入該狀态後的結點将不會再變化。
  • SIGNAL(-1):表示後繼結點在等待目前結點喚醒,也就是說後繼節點被阻塞了。後繼結點入隊時,會将前繼結點的狀态更新為SIGNAL。
  • CONDITION(-2):表示結點等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀态的結點将從等待隊列轉移到同步隊列中,等待擷取同步鎖。
  • PROPAGATE(-3):傳播。共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。
  • 0:新結點入隊時的預設狀态。

注意,負值表示結點處于有效等待狀态,而正值表示結點已被取消。是以源碼中很多地方用>0、<0來判斷結點的狀态是否正常。

二、AQS的加鎖方法

2.1、acquire(int arg)

此方法是獨占模式下線程擷取共享資源的頂層入口。如果擷取到資源,線程直接傳回,否則進入等待隊列,直到擷取到資源為止,且整個過程忽略中斷的影響。比如ReentrantLock的lock方法就會調用acquire。

public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           
  • tryAcquire會去擷取鎖,如果成功就傳回true,否則傳回false。
  • addWaiter會把目前線程加入隊列的尾部,并且是Exclusive。
  • acquireQueued用于擷取資源,如果線程被中斷過傳回true,此時就會執行下面的selfInterrupt()。

以ReentrantLock為例:

//FairSync
final void lock() {
            acquire(1);
        }
           

直接調用acquire。

2.2、tryAcquire(int acquires)

AQS并沒有具體的實作tryAcquires,它讓子類自行決定實作的方式。

//aqs并沒有實作
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
           
這裡之是以沒有定義成abstract,是因為獨占模式下隻用實作tryAcquire-tryRelease,而共享模式下隻用實作tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實作另一模式下的接口。說到底,Doug Lea還是站在咱們開發者的角度,盡量減少不必要的工作量。

以ReentrantLock的tryAcquire為例。

//FairSync調用的是父類Sync的tryAcquire
//ReentrantLock,lock傳入參數qcquires=1
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //ReentrantLock的state表示上鎖的次數
            //0表示未上鎖,如果沒上鎖,cas上鎖,然後把獨占線程設定為目前的
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果目前線程就是那個獨占的,也就是之前上鎖的,現在是重入鎖,state(c)加1(acquires)
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //擷取不到鎖,傳回false
            return false;
        }
           

2.3、addWaiter(Node mode)

private Node addWaiter(Node mode) {
    //mode模式有獨占和共享兩種
    //Node.EXCLUSIVE for exclusive, Node.SHARED for shared
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 先試試最快速的直接插到隊尾,如果失敗,走下面的enq
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
           

2.4、enq(final Node node)

能看出是CAS自旋加入隊尾。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize,隊列為空,還沒初始化,需要初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
                // 初始化之後還會再進行一遍循環
                //這個連結清單的建立方式是先建立了一個僞頭節點,然後從下一個節點開始才是真正的node
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

2.5、acquireQueued(final Node node, int arg)

tryAcquire失敗,addWaiter加入隊列。那麼下一步就應該是讓他休息,等待别人喚醒他。

注意,正常情況下,該方法會一直阻塞,直到拿到鎖才會傳回(parkAndCheckInterrupt方法就是幹這個的),但是如果抛出異常,就會移除節點,繼續上抛異常。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//标記是否成功拿到資源
        try {
            boolean interrupted = false;//标記是否被中斷過
            //注意自旋
            for (;;) {
                final Node p = node.predecessor();//拿到前驅節點
                if (p == head && tryAcquire(arg)) {
                    // 如果前一個節點是head,head是僞頭結點,那麼目前節點就是真正地第一個節點
                    // 再tryAcquire拿到資源,才能進入這裡
                    setHead(node);
                    p.next = null; // help GC
                    // setHead把prev斷了,現在要讓head的next斷掉,友善gc
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //沒得拿到資源,那應該讓他wait了
                    // 如果被中斷過,要修改狀态
                    interrupted = true;
            }
        } finally {
            if (failed)
            // 如果等待過程中沒有成功擷取資源(如timeout,或者可中斷的情況下被中斷了),那麼取消結點在隊列中的等待。
                cancelAcquire(node);
        }
    }
           

先看shouldParkAfterFailedAcquire和parkAndCheckInterrupt。在這之前,建議了解中斷什麼意思,開篇就提到了。(本人到這裡時候已經被中斷整蒙了,是以去查了一下,寫在了開頭)

2.5.1、 shouldParkAfterFailedAcquire(Node,Node)

檢查是否可以進入waiting狀态。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // pred前驅節點和node目前節點
    // 目前節點的狀态儲存在前驅節點當中
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             // 前驅節點已經是signal,也就是前驅節點釋放時會通知自己,可以安全的park
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             // 大于0隻有一個狀态,為cancelled。
             //前驅節點取消了,那就順着鍊向前找到一個非cancelled的狀态(正常等待),然後排在他後面
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //如果前驅正常,那就把前驅的狀态設定成SIGNAL,因為前驅節點可能是new狀态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

總之,前一個節點必須是signal,自己才能去休息。

2.5.2、 parkAndCheckInterrupt()

檢查過可以休息了,現在真正要去休息了。此方法會讓線程真正的waiting。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
           

park會進入waiting狀态。此時,隻有unpark和interrupt可以喚醒他。

2.5.3、小結

acquiredQueued()分為下面幾步:

  • 節點進入隊尾,找到一個安全休息點
  • 調用park進入waiting,隻有unpark和interrupt能喚醒
  • 被喚醒後,看自己是不是有資格能拿到号。如果拿到,head指向目前結點,并傳回從入隊到拿到号的整個過程中是否被中斷過;如果沒拿到,繼續流程1

2.6.總結

再回到acquire()

public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           
  • 先調用tryAcquire嘗試擷取,成功就直接傳回了
  • 如果擷取失敗,需要加入隊列,調用addWaiter
  • acquireQueued讓線程休息,輪到自己擷取,并且擷取到資源後才傳回,如果被中斷過,傳回true,否則是false
  • 線程在等待過程中被中斷過,不響應,隻有在擷取資源後,才進行selfInterrupt,把中斷補上

下面是流程圖

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

(圖源網絡)

三、解鎖

3.1、 release(int arg)

與acquire對應,release是解鎖時候會調用的。以ReentrantLock為例:

public void unlock() {
        sync.release(1);
    }
           

下面是release源碼:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //tryRelease需要子類實作
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

用tryRelease釋放鎖,tryRelease需要實作AQS的類自己重寫。

3.2、 tryRelease(int arg)

AQS的tryRelease同tryAcquire一樣都隻是抛出異常。下面以ReentrantLock的具體實作為例。

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
           

ReentrantLock的state是加鎖的層數,傳入參數releases是1,是以第一行把鎖的層數減一。如果c為0,說明沒有鎖了,傳回true,并且設定目前的獨占線程是null。

3.3、 unparkSuccessor(Node)

喚醒目前節點的後繼節點。

private void unparkSuccessor(Node node) {
    //node是目前節點
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)//更新head狀态為0,head,狀态為0說明下一個可以被喚醒了
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;//下一個要喚醒的節點
        if (s == null || s.waitStatus > 0) {
            //如果為空或者被取消了
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//從後向前找
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒
    }
           

unpark會喚醒等待隊列中最靠前的,未被取消的線程。

3.4、 繼續acquireQueued

另一個線程被喚醒之後,會從acquireQueued的阻塞位置恢複。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // 2.在這裡tryAcquire能夠擷取到鎖了,是以進來
                    setHead(node);
                    // 3.把目前節點設定為頭結點
                    p.next = null; // help GC
                    failed = false;
                    // 4.傳回
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
                // 1.從這裡恢複,然後繼續循環
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

四、示例

假設ABC三個線程競争,以ReentrantLock的公平鎖為例,下面是隊列的變化關系:

step1

假設最開始A線程先擷取鎖,沒有發生競争,不需要等待,是以隊列此時是空的。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

step2

現在B線程來擷取鎖,發現state是1,即已經加鎖了。AQS會先建立一個僞頭節點,然後把B進入隊列。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

step3

最後C線程擷取,同樣需要排隊。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

step4

A線程釋放鎖,會調用unparkSuccessor()把頭結點的waitStatus置0,表示後續節點可以為喚醒了。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

step5

B開始執行。首先tryAcquire能擷取到鎖,setHead(B),并把B的waitStatus設為-1,表示下一個被阻塞。此時隊列狀态如下:

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

step6

B釋放鎖,unparkSuccessor()喚醒首節點。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

step7

C擷取鎖同理。

AQS源碼解析AQS介紹一、背景知識二、AQS的加鎖方法三、解鎖四、示例五、總結

step8

C釋放鎖,什麼也不用做,因為C是最後一個節點,C離開之後,隊列為空。

五、總結

對于獨占鎖來說,自己實作自定義鎖繼承AQS基本隻用實作tryAcquire和tryRelease兩個方法(還有isHeldExclusively(),用condition時候才必須重寫)。這兩個方法内容是加鎖和删除鎖時候的邏輯部分,交由程式員自己編寫。其他的對隊列的方法,AQS都已經實作完了,可以直接拿來用。并且,AQS實作的隊列是一種雙向隊列,目前節點的狀态由前一節點保持,而且還利用一個僞頭節點。

本篇文章為記錄自己學習AQS的過程,如有不對,還請指正

參考文章:

https://segmentfault.com/a/1190000015804888

https://www.cnblogs.com/waterystone/p/4920797.html

https://blog.csdn.net/sscout/article/details/102616722