天天看點

淺析抽象隊列同步器(AQS)

淺析抽象隊列同步器(AQS)

目錄

淺析抽象隊列同步器(AQS)

什麼是AQS

AQS的原理

state狀态

AQS的共享資源狀态:獨占式和共享式

添加鎖和釋放鎖

什麼是ReentrantLock

實作

如何使用

底層實作

除非我不想赢,否則沒人能讓我輸。

複習多線程并發包總結

什麼是AQS

​ AQS(AbstractQueuedSynchronizer)是一個抽象隊列同步器,通過維護一個共享資源狀态(volatile int state)和一個FIFO線程等待隊列(底層是雙向連結清單)來實作一個多線程通路共享資源的同步架構。許多同步類的實作都依賴于AQS,例如常用的ReentrantLock,CountDownLatch,Semaphore。

關于ReentrantLock,CountDownLatch,Semaphore的用法可參考:

[常用的三種同步類] https://blog.csdn.net/qq_42107430/article/details/103854488

JDK1.8源碼:

   /**
     * The synchronization state.
     */
    private volatile int state;
​
    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev; //前驅節點
        volatile Node next; //後繼節點
        volatile Thread thread;//目前線程
        Node nextWaiter; //存儲在condition隊列中的後繼節點
        //是否為共享鎖
        final boolean isShared() { 
            return nextWaiter == SHARED;
        }
​
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
​
        Node() {    // Used to establish initial head or SHARED marker
        }
        //将線程構造成一個Node,添加到等待隊列
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        //這個方法會在Condition隊列使用,後續單獨寫一篇文章分析condition
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
           

AQS的原理

​ AQS為每個共享資源變量設定一個共享資源鎖,線程在需要通路共享資源時首先需要去擷取共享資源縮。

如果擷取成功,便可以在目前線程中使用該共享資源,如果擷取不成功,則将該線程放入線程等待隊列,等待下一次資源排程。

state狀态

​ AQS維護了一個volatile int 類型的變量,用于表示目前的同步狀态。Volatile雖然不能保證操作的原子性,但是可以保證操作的可見性。

state的通路方式有以下三種,均是原子操作

  • getState()
  • setState()
  • compareAndSetState()
/**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }
​
    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }
​
    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
           

AQS的共享資源狀态:獨占式和共享式

​ AQS定義兩種資源共享方式:獨占式(Exclusive)和共享式(Share)。

  • 獨占式:隻有一個線程能執行,如ReentrantLock
  • 共享式:共享,多個線程可同時執行,如Semaphore/CountDownLatch

  AQS隻是一個架構,定義了一個接口,具體的資源擷取、釋放都交由自定義同步器實作。

不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了。自定義同步器實作時主要實作以下幾種方法:

  • isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
  • tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
  • tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。

以ReentrantLock為例,state初始化為0,表示未鎖定狀态。A線程lock()時,會調用tryAcquire()獨占該鎖并将state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會擷取該鎖。當然,釋放鎖之前,A線程自己是可以重複擷取此鎖的(state會累加),這就是可重入的概念。但要注意,擷取多少次就要釋放多麼次,這樣才能保證state是能回到零态的。否則程式運作時會報錯。

再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一緻)。這N個子線程是并行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數傳回,繼續執行後續動作。

  一般來說,自定義同步器要麼是獨占方法,要麼是共享方式,他們也隻需實作tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實作獨占和共享兩種方式,如ReentrantReadWriteLock。

添加鎖和釋放鎖

當出現鎖競争以及釋放鎖的時候,AQS同步隊列中的節點會發生變化。

添加節點

添加節點時會涉及到兩個變化

  • 新的線程封裝成Node節點追加到同步隊列中,設定prev節點以及修改目前節點的前置節點的next節點指向自己
  • 通過CAS講tail重新指向新的尾部節點

移除節點

head節點表示擷取鎖成功的節點,當頭結點在釋放同步狀态時,會喚醒後繼節點,如果後繼節點獲得鎖成功,會把自己設定為頭結點

這個過程也是涉及到兩個變化

  • 修改head節點指向下一個獲得鎖的節點
  • 新的獲得鎖的節點,将prev的指針指向null
了解了AQS是什麼,原理實作後,我們結合ReentrantLock來深入了解AQS是如何實作線程安全的。

什麼是ReentrantLock

Java中除了使用關鍵字synchronized外,還可以使用ReentrantLock實作獨占鎖的功能。而且ReentrantLock相比synchronized而言功能更加豐富,使用起來更為靈活,也更适合複雜的并發場景。

實作

ReentrantLock繼承了Lock接口并實作了在接口中定義的方法。是一個可重入的獨占鎖。通過自定義抽象隊列同步器來實作。

Lock接口JDK源碼

void lock() // 如果鎖可用就獲得鎖,如果鎖不可用就阻塞直到鎖釋放
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的線程可中斷,抛出 java.lang.InterruptedException異常
boolean tryLock() // 非阻塞擷取鎖;嘗試擷取鎖,如果成功傳回true
boolean tryLock(long timeout, TimeUnit timeUnit) //帶有逾時時間的擷取鎖方法
void unlock() // 釋放鎖
           

如何使用

public class ReentrantLockDemo {
    private static int count=0;
    static Lock lock=new ReentrantLock();
    public static void inc(){
        lock.lock();
        try {
            Thread.sleep(1);
            count++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }
           

這段代碼主要做一件事,就是通過一個靜态的

incr()

方法對共享變量

count

做連續遞增,在沒有加同步鎖的情況下多線程通路這個方法一定會存線上程安全問題。是以用到了

ReentrantLock

來實作同步鎖,并且在finally語句塊中顯式釋放鎖。

底層實作

ReentrantLock.lock()

public void lock() {
        sync.lock();
    }
           

可以看到lock()方法底層調用的是sync的lock()方法。

sync是一個靜态内部類,通過繼承AQS并實作了共享資源state的擷取和釋放的方式。

Sync

   abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
​
        /*定義一個抽象方法,由具體的子類去實作*/
        abstract void lock();
​
        /**
         * 實作非公平的tryAcquire擷取資源
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//擷取目前線程
            int c = getState();//擷取同步狀态
            if (c == 0) {//如果狀态為0 CAS設定acquires
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);//如果設定成功,設定目前線程為排他所有者線程
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//如果目前線程就是排他線程
                int nextc = c + acquires;
                if (nextc < 0) // overflow 溢出報錯
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);//重新設定state
                return true;
            }
            return false;
        }
​
      /*
       * 嘗試釋放資源
       */
        protected final boolean tryRelease(int releases) {
          //計算要更新的同步狀态
            int c = getState() - releases;
          //如果目前線程不是排他線程 報錯
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
          //如果狀态為0,設定獨占排他線程為null,傳回true
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
          //更新同步狀态
            setState(c);
            return free;
        }
​
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
​
        /*擷取目前線程*/
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
​
       /*擷取目前state狀态*/
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
​
      /*判斷是否被鎖定*/
        final boolean isLocked() {
            return getState() != 0;
        }
​
    }
           

Sync又有兩個具體的實作,分别是NofairSync(非公平鎖),FairSync(公平鎖)。

  • 公平鎖 表示所有線程嚴格按照FIFO來擷取鎖
  • 非公平鎖 表示可以存在搶占鎖的功能,也就是說不管目前隊列上是否存在其他線程等待,新線程都有機會搶占鎖

NofairSync

​
    /**
     * Sync object for non-fair locks 非公平鎖
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
​
        /**
         * 執行鎖定
         */
        final void lock() {
          //首先通過CAS設定state,如果成功,設定目前線程為排他線程(非公平的關鍵)
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else//如果失敗再去嘗試獲得鎖 關于acquire的具體講解在下面
                acquire(1);
        }
​
      /*調用父類sync的非公平tryAcquire擷取資源*/
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
           

lock()方法簡單解釋一下

  • 由于這裡是非公平鎖,是以調用lock方法時,先去通過cas去搶占鎖
  • 如果搶占鎖成功,儲存獲得鎖成功的目前線程
  • 搶占鎖失敗,調用acquire來走鎖競争邏輯
compareAndSetState調用的是Unsafe類的compareAndSetState方法進行原子操作
return unsafe.compareAndSetState(this, stateOffset, expect, update);
           

UnsafeUnsafe類是在sun.misc包下,不屬于Java标準。但是很多Java的基礎類庫,包括一些被廣泛使用的高性能開發庫都是基于Unsafe類開發的,比如Netty、Hadoop、Kafka等;Unsafe可認為是Java中留下的後門,提供了一些低層次操作,如直接記憶體通路、線程排程等

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
           

這個是一個native方法, 第一個參數為需要改變的對象,第二個為偏移量(即之前求出來的headOffset的值),第三個參數為期待的值,第四個為更新後的值整個方法的作用是如果目前時刻的值等于預期值var4相等,則更新為新的期望值 var5,如果更新成功,則傳回true,否則傳回false;

FairSync

/**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
​
      //嘗試擷取鎖
        final void lock() {
            acquire(1);
        }
​
        /**
         * 公平版本的tryAcquire 
         * Fair version of tryAcquire. 
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
           

acquire

acquire是AQS中的方法,如果CAS操作未能成功,說明state已經不為0,此時繼續acquire(1)操作,這裡大家思考一下,acquire方法中的1的參數是用來做什麼呢?如果沒猜中,往前面回顧一下state這個概念

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

這個方法的主要邏輯是

  • 通過tryAcquire嘗試擷取獨占鎖,如果成功傳回true,失敗傳回false
  • 如果tryAcquire失敗,則會通過addWaiter方法将目前線程封裝成Node添加到AQS隊列尾部
  • acquireQueued,将Node作為參數,通過自旋去嘗試擷取鎖。

addWaiter

private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE
        //将目前線程封裝成Node,并且mode為獨占鎖
        Node node = new Node(Thread.currentThread(), mode); 
        // Try the fast path of enq; backup to full enq on failure
        // tail是AQS的中表示同步隊列隊尾的屬性,剛開始為null,是以進行enq(node)方法
        Node pred = tail;
        if (pred != null) { //tail不為空的情況,說明隊列中存在節點資料
            node.prev = pred;  //講目前線程的Node的prev節點指向tail
            if (compareAndSetTail(pred, node)) {//通過cas講node添加到AQS隊列
                pred.next = node;//cas成功,把舊的tail的next指針指向新的tail
                return node;
            }
        }
        enq(node); //tail=null,将node添加到同步隊列中
        return node;
    }
           

ReentrantLock.unlock()

public void unlock() {
        sync.release(1);
    }
           

release

1 釋放鎖 ;2 喚醒park的線程

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           

tryRelease

這個動作可以認為就是一個設定鎖狀态的操作,而且是将狀态減掉傳入的參數值(參數是1),如果結果狀态為0,就将排它鎖的Owner設定為null,以使得其它的線程有機會進行執行。在排它鎖中,加鎖的時候狀态會增加1(當然可以自己修改這個值),在解鎖的時候減掉1,同一個鎖,在可以重入後,可能會被疊加為2、3、4這些值,隻有unlock()的次數與lock()的次數對應才會将Owner線程設定為空,而且也隻有這種情況下才會傳回true。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 這裡是将鎖的數量減1
    if (Thread.currentThread() != getExclusiveOwnerThread())// 如果釋放的線程和擷取鎖的線程不是同一個,抛出非法螢幕狀态異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { 
// 由于重入的關系,不是每次釋放鎖c都等于0,
    // 直到最後一次釋放鎖時,才會把目前線程釋放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           

unparkSuccessor

在方法unparkSuccessor(Node)中,就意味着真正要釋放鎖了,它傳入的是head節點(head節點是占用鎖的節點),目前線程被釋放之後,需要喚醒下一個節點的線程

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {//判斷後繼節點是否為空或者是否是取消狀态,
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0) //然後從隊列尾部向前周遊找到最前面的一個waitStatus小于0的節點, 至于為什麼從尾部開始向前周遊,因為在doAcquireInterruptibly.cancelAcquire方法的處理過程中隻設定了next的變化,沒有設定prev的變化,在最後有這樣一行代碼:node.next = node,如果這時執行了unparkSuccessor方法,并且向後周遊的話,就成了死循環了,是以這時隻有prev是穩定的
                s = t;
    }
//内部首先會發生的動作是擷取head節點的next節點,如果擷取到的節點不為空,則直接通過:“LockSupport.unpark()”方法來釋放對應的被挂起的線程,這樣一來将會有一個節點喚醒後繼續進入循環進一步嘗試tryAcquire()方法來擷取鎖
    if (s != null)
        LockSupport.unpark(s.thread); //釋放許可
}
           

繼續閱讀