天天看點

深入分析AQS實作原理

簡單解釋一下J.U.C,是JDK中提供的并發工具包,​

​java.util.concurrent​

​。裡面提供了很多并發程式設計中很常用的實用工具類,比如atomic原子操作、比如lock同步鎖、fork/join等。

從Lock作為切入點

我想以lock作為切入點來講解AQS,畢竟同步鎖是解決線程安全問題的通用手段,也是我們工作中用得比較多的方式。

Lock API

Lock是一個接口,方法定義如下

void lock() // 如果鎖可用就獲得鎖,如果鎖不可用就阻塞直到鎖釋放
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的線程可中斷,抛出 java.lang.InterruptedException異常
boolean tryLock() // 非阻塞擷取鎖;嘗試擷取鎖,如果成功傳回true
boolean tryLock(long timeout, TimeUnit timeUnit) //帶有逾時時間的擷取鎖方法
void unlock() // 釋放鎖      

Lock的實作

實作Lock接口的類有很多,以下為幾個常見的鎖實作

  • ReentrantLock:表示重入鎖,它是唯一一個實作了Lock接口的類。重入鎖指的是線程在獲得鎖之後,再次擷取該鎖不需要阻塞,而是直接關聯一次計數器增加重入次數
  • ReentrantReadWriteLock:重入讀寫鎖,它實作了ReadWriteLock接口,在這個類中維護了兩個鎖,一個是ReadLock,一個是WriteLock,他們都分别實作了Lock接口。讀寫鎖是一種适合讀多寫少的場景下解決線程安全問題的工具,基本原則是:​

    ​讀和讀不互斥、讀和寫互斥、寫和寫互斥​

    ​。也就是說涉及到影響資料變化的操作都會存在互斥。
  • StampedLock: stampedLock是JDK8引入的新的鎖機制,可以簡單認為是讀寫鎖的一個改進版本,讀寫鎖雖然通過分離讀和寫的功能使得讀和讀之間可以完全并發,但是讀和寫是有沖突的,如果大量的讀線程存在,可能會引起寫線程的饑餓。stampedLock是一種樂觀的讀政策,使得樂觀鎖完全不會阻塞寫線程

ReentrantLock的簡單實用

如何在實際應用中使用ReentrantLock呢?我們通過一個簡單的demo來示範一下

public class Demo {
    private static int count=0;
    static Lock lock=new ReentrantLock();
    public static void inc(){
        lock.lock();
        try {
            Thread.sleep(1);
            count++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }      

這段代碼主要做一件事,就是通過一個靜态的​

​incr()​

​​方法對共享變量​

​count​

​​做連續遞增,在沒有加同步鎖的情況下多線程通路這個方法一定會存線上程安全問題。是以用到了​

​ReentrantLock​

​​來實作同步鎖,并且在finally語句塊中釋放鎖。

那麼我來引出一個問題,大家思考一下

多個線程通過lock競争鎖時,當競争失敗的鎖是如何實作等待以及被喚醒的呢?

什麼是AQS

aqs全稱為AbstractQueuedSynchronizer,它提供了一個FIFO隊列,可以看成是一個用來實作同步鎖以及其他涉及到同步功能的核心元件,常見的有:ReentrantLock、CountDownLatch等。

AQS是一個抽象類,主要是通過繼承的方式來使用,它本身沒有實作任何的同步接口,僅僅是定義了同步狀态的擷取以及釋放的方法來提供自定義的同步元件。

可以這麼說,隻要搞懂了AQS,那麼J.U.C中絕大部分的api都能輕松掌握。

AQS的兩種功能

從使用層面來說,AQS的功能分為兩種:獨占和共享

  • 獨占鎖,每次隻能有一個線程持有鎖,比如前面給大家示範的ReentrantLock就是以獨占方式實作的互斥鎖
  • 共享鎖,允許多個線程同時擷取鎖,并發通路共享資源,比如ReentrantReadWriteLock

ReentrantLock的類圖

仍然以ReentrantLock為例,來分析AQS在重入鎖中的使用。畢竟單純分析AQS沒有太多的含義。先了解這個類圖,可以友善我們了解AQS的原理

深入分析AQS實作原理

AQS的内部實作

AQS的實作依賴内部的同步隊列,也就是FIFO的雙向隊列,如果目前線程競争鎖失敗,那麼AQS會把目前線程以及等待狀态資訊構造成一個Node加入到同步隊列中,同時再阻塞該線程。當擷取鎖的線程釋放鎖以後,會從隊列中喚醒一個阻塞的節點(線程)。

深入分析AQS實作原理
AQS隊列内部維護的是一個FIFO的雙向連結清單,這種結構的特點是每個資料結構都有兩個指針,分别指向直接的後繼節點和直接前驅節點。是以雙向連結清單可以從任意一個節點開始很友善的通路前驅和後繼。每個Node其實是由線程封裝,當線程争搶鎖失敗後會封裝成Node加入到ASQ隊列中去

Node類的組成如下

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev; //前驅節點
        volatile Node next; //後繼節點
        volatile Thread thread;//目前線程
        Node nextWaiter; //存儲在condition隊列中的後繼節點
        //是否為共享鎖
        final boolean isShared() { 
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }
        //将線程構造成一個Node,添加到等待隊列
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        //這個方法會在Condition隊列使用,後續單獨寫一篇文章分析condition
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }      

釋放鎖以及添加線程對于隊列的變化

添加節點

當出現鎖競争以及釋放鎖的時候,AQS同步隊列中的節點會發生變化,首先看一下添加節點的場景。

深入分析AQS實作原理

這裡會涉及到兩個變化

  • 新的線程封裝成Node節點追加到同步隊列中,設定prev節點以及修改目前節點的前置節點的next節點指向自己
  • 通過CAS講tail重新指向新的尾部節點

釋放鎖移除節點

head節點表示擷取鎖成功的節點,當頭結點在釋放同步狀态時,會喚醒後繼節點,如果後繼節點獲得鎖成功,會把自己設定為頭結點,節點的變化過程如下

深入分析AQS實作原理

這個過程也是涉及到兩個變化

  • 修改head節點指向下一個獲得鎖的節點
  • 新的獲得鎖的節點,将prev的指針指向null

這裡有一個小的變化,就是設定head節點不需要用CAS,原因是設定head節點是由獲得鎖的線程來完成的,而同步鎖隻能由一個線程獲得,是以不需要CAS保證,隻需要把head節點設定為原首節點的後繼節點,并且斷開原head節點的next引用即可

AQS的源碼分析

清楚了AQS的基本架構以後,我們來分析一下AQS的源碼,仍然以ReentrantLock為模型。

ReentrantLock的時序圖

調用ReentrantLock中的lock()方法,源碼的調用過程我使用了時序圖來展現

深入分析AQS實作原理

從圖上可以看出來,當鎖擷取失敗時,會調用addWaiter()方法将目前線程封裝成Node節點加入到AQS隊列,基于這個思路,我們來分析AQS的源碼實作

分析源碼

ReentrantLock.lock()

public void lock() {
    sync.lock();
}      

這個是擷取鎖的入口,調用sync這個類裡面的方法,sync是什麼呢?

abstract static class Sync extends AbstractQueuedSynchronizer      

sync是一個靜态内部類,它繼承了AQS這個抽象類,前面說過AQS是一個同步工具,主要用來實作同步控制。我們在利用這個工具的時候,會繼承它來實作同步控制功能。

通過進一步分析,發現Sync這個類有兩個具體的實作,分别是​

​NofairSync(非公平鎖)​

​​,​

​FailSync(公平鎖)​

​.

  • 公平鎖 表示所有線程嚴格按照FIFO來擷取鎖
  • 非公平鎖 表示可以存在搶占鎖的功能,也就是說不管目前隊列上是否存在其他線程等待,新線程都有機會搶占鎖

公平鎖和非公平鎖的實作上的差異,我會在文章後面做一個解釋,接下來的分析仍然以​

​非公平鎖​

​作為主要分析邏輯。

NonfairSync.lock

final void lock() {
    if (compareAndSetState(0, 1)) //通過cas操作來修改state狀态,表示争搶鎖的操作
      setExclusiveOwnerThread(Thread.currentThread());//設定目前獲得鎖狀态的線程
    else
      acquire(1); //嘗試去擷取鎖
}      

這段代碼簡單解釋一下

  • 由于這裡是非公平鎖,是以調用lock方法時,先去通過cas去搶占鎖
  • 如果搶占鎖成功,儲存獲得鎖成功的目前線程
  • 搶占鎖失敗,調用acquire來走鎖競争邏輯

compareAndSetState

compareAndSetState的代碼實作邏輯如下

// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);      
這段代碼其實邏輯很簡單,就是通過cas樂觀鎖的方式來做比較并替換。上面這段代碼的意思是,如果目前記憶體中的state的值和預期值expect相等,則替換為update。更新成功傳回true,否則傳回false.
這個操作是原子的,不會出現線程安全問題,這裡面涉及到Unsafe這個類的操作,一級涉及到state這個屬性的意義。
**state**      
  • 當state=0時,表示無鎖狀态
  • 當state>0時,表示已經有線程獲得了鎖,也就是state=1,但是因為ReentrantLock允許重入,是以同一個線程多次獲得同步鎖的時候,state會遞增,比如重入5次,那麼state=5。 而在釋放鎖的時候,同樣需要釋放5次直到state=0其他線程才有資格獲得鎖
private volatile int state;      

需要注意的是:不同的AQS實作,state所表達的含義是不一樣的。

Unsafe

Unsafe類是在sun.misc包下,不屬于Java标準。但是很多Java的基礎類庫,包括一些被廣泛使用的高性能開發庫都是基于Unsafe類開發的,比如Netty、Hadoop、Kafka等;Unsafe可認為是Java中留下的後門,提供了一些低層次操作,如直接記憶體通路、線程排程等

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);      

這個是一個native方法, 第一個參數為需要改變的對象,第二個為偏移量(即之前求出來的headOffset的值),第三個參數為期待的值,第四個為更新後的值。

整個方法的作用是如果目前時刻的值等于預期值var4相等,則更新為新的期望值 var5,如果更新成功,則傳回true,否則傳回false;

acquire

acquire是AQS中的方法,如果CAS操作未能成功,說明state已經不為0,此時繼續acquire(1)操作,這裡大家思考一下,acquire方法中的1的參數是用來做什麼呢?如果沒猜中,往前面回顧一下state這個概念

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }      

這個方法的主要邏輯是

  • 通過tryAcquire嘗試擷取獨占鎖,如果成功傳回true,失敗傳回false
  • 如果tryAcquire失敗,則會通過addWaiter方法将目前線程封裝成Node添加到AQS隊列尾部
  • acquireQueued,将Node作為參數,通過自旋去嘗試擷取鎖。

如果大家看過我寫的

​​Synchronized源碼分析​​的文章,就應該能夠明白自旋存在的意義

NonfairSync.tryAcquire

這個方法的作用是嘗試擷取鎖,如果成功傳回true,不成功傳回false

它是重寫AQS類中的tryAcquire方法,并且大家仔細看一下AQS中tryAcquire方法的定義,并沒有實作,而是抛出異常。按照一般的思維模式,既然是一個不實作的模版方法,那應該定義成abstract,讓子類來實作呀?大家想想為什麼

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}      

nonfairTryAcquire

tryAcquire(1)在NonfairSync中的實作代碼如下

ffinal boolean nonfairTryAcquire(int acquires) {
    //獲得目前執行的線程
    final Thread current = Thread.currentThread();
    int c = getState(); //獲得state的值
    if (c == 0) { //state=0說明目前是無鎖狀态
        //通過cas操作來替換state的值改為1,大家想想為什麼要用cas呢?
        //理由是,在多線程環境中,直接修改state=1會存線上程安全問題,你猜到了嗎?
        if (compareAndSetState(0, acquires)) {
             //儲存目前獲得鎖的線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //這段邏輯就很簡單了。如果是同一個線程來獲得鎖,則直接增加重入次數
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires; //增加重入次數
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}      
  • 擷取目前線程,判斷目前的鎖的狀态
  • 如果state=0表示目前是無鎖狀态,通過cas更新state狀态的值
  • 如果目前線程是屬于重入,則增加重入次數

addWaiter

當tryAcquire方法擷取鎖失敗以後,則會先調用addWaiter将目前線程封裝成Node,然後添加到AQS隊列

private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE
        //将目前線程封裝成Node,并且mode為獨占鎖
        Node node = new Node(Thread.currentThread(), mode); 
        // Try the fast path of enq; backup to full enq on failure
        // tail是AQS的中表示同步隊列隊尾的屬性,剛開始為null,是以進行enq(node)方法
        Node pred = tail;
        if (pred != null) { //tail不為空的情況,說明隊列中存在節點資料
            node.prev = pred;  //講目前線程的Node的prev節點指向tail
            if (compareAndSetTail(pred, node)) {//通過cas講node添加到AQS隊列
                pred.next = node;//cas成功,把舊的tail的next指針指向新的tail
                return node;
            }
        }
        enq(node); //tail=null,将node添加到同步隊列中
        return node;
    }      
  • 将目前線程封裝成Node
  • 判斷目前連結清單中的tail節點是否為空,如果不為空,則通過cas操作把目前線程的node添加到AQS隊列
  • 如果為空或者cas失敗,調用enq将節點添加到AQS隊列

enq

enq就是通過自旋操作把目前節點加入到隊列中

private Node enq(final Node node) {
        //自旋,不做過多解釋,不清楚的關注公衆号[架構師修煉寶典]
        for (;;) {
            Node t = tail; //如果是第一次添加到隊列,那麼tail=null
            if (t == null) { // Must initialize
                //CAS的方式建立一個空的Node作為頭結點
                if (compareAndSetHead(new Node()))
                   //此時隊列中隻一個頭結點,是以tail也指向它
                    tail = head;
            } else {
//進行第二次循環時,tail不為null,進入else區域。将目前線程的Node結點的prev指向tail,然後使用CAS将tail指向Node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
//t此時指向tail,是以可以CAS成功,将tail重新指向Node。此時t為更新前的tail的值,即指向空的頭結點,t.next=node,就将頭結點的後續結點指向Node,傳回頭結點
                    t.next = node;
                    return t;
                }
            }
        }
    }      

假如有兩個線程t1,t2同時進入enq方法,t==null表示隊列是首次使用,需要先初始化

另外一個線程cas失敗,則進入下次循環,通過cas操作将node添加到隊尾

到目前為止,通過addwaiter方法構造了一個AQS隊列,并且将線程添加到了隊列的節點中

acquireQueued

将添加到隊列中的Node作為參數傳入acquireQueued方法,這裡面會做搶占鎖的操作

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();// 擷取prev節點,若為null即刻抛出NullPointException
            if (p == head && tryAcquire(arg)) {// 如果前驅為head才有資格進行鎖的搶奪
                setHead(node); // 擷取鎖成功後就不需要再進行同步操作了,擷取鎖成功的線程作為新的head節點
//凡是head節點,head.thread與head.prev永遠為null, 但是head.next不為null
                p.next = null; // help GC
                failed = false; //擷取鎖成功
                return interrupted;
            }
//如果擷取鎖失敗,則根據節點的waitStatus決定是否需要挂起線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())// 若前面為true,則執行挂起,待下次喚醒的時候檢測中斷的标志
                interrupted = true;
        }
    } finally {
        if (failed) // 如果抛出異常則取消鎖的擷取,進行出隊(sync queue)操作
            cancelAcquire(node);
    }
}      
  • 擷取目前節點的prev節點
  • 如果prev節點為head節點,那麼它就有資格去争搶鎖,調用tryAcquire搶占鎖
  • 搶占鎖成功以後,把獲得鎖的節點設定為head,并且移除原來的初始化head節點
  • 如果獲得鎖失敗,則根據waitStatus決定是否需要挂起線程
  • 最後,通過cancelAcquire取消獲得鎖的操作

前面的邏輯都很好了解,主要看一下shouldParkAfterFailedAcquire這個方法和parkAndCheckInterrupt的作用

shouldParkAfterFailedAcquire

從上面的分析可以看出,隻有隊列的第二個節點可以有機會争用鎖,如果成功擷取鎖,則此節點晉升為頭節點。對于第三個及以後的節點,if (p == head)條件不成立,首先進行shouldParkAfterFailedAcquire(p, node)操作。

shouldParkAfterFailedAcquire方法是判斷一個争用鎖的線程是否應該被阻塞。它首先判斷一個節點的前置節點的狀态是否為Node.SIGNAL,如果是,是說明此節點已經将狀态設定-如果鎖釋放,則應當通知它,是以它可以安全的阻塞了,傳回true。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //前繼節點的狀态
    if (ws == Node.SIGNAL)//如果是SIGNAL狀态,意味着目前線程需要被unpark喚醒
               return true;
如果前節點的狀态大于0,即為CANCELLED狀态時,則會從前節點開始逐漸循環找到一個沒有被“CANCELLED”節點設定為目前節點的前節點,傳回false。在下次循環執行shouldParkAfterFailedAcquire時,傳回true。這個操作實際是把隊列中CANCELLED的節點剔除掉。
    if (ws > 0) {// 如果前繼節點是“取消”狀态,則設定 “目前節點”的 “目前前繼節點” 為 “‘原前繼節點'的前繼節點”。
       
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { // 如果前繼節點為“0”或者“共享鎖”狀态,則設定前繼節點為SIGNAL狀态。
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}      

parkAndCheckInterrupt

如果shouldParkAfterFailedAcquire傳回了true,則會執行:​

​parkAndCheckInterrupt()​

​方法,它是通LockSupport.park(this)将目前線程挂起到WATING狀态,它需要等待一個中斷、unpark方法來喚醒它,通過這樣一種FIFO的機制的等待,來實作了Lock的操作。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}      

LockSupport

LockSupport類是Java6引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了Unsafe類裡的函數,歸結到Unsafe裡,隻有兩個函數:

public native void unpark(Thread jthread);  
public native void park(boolean isAbsolute, long time);      

unpark函數為線程提供“許可(permit)”,線程調用park函數則等待“許可”。這個有點像信号量,但是這個“許可”是不能疊加的,“許可”是一次性的。

permit相當于0/1的開關,預設是0,調用一次unpark就加1變成了1.調用一次park會消費permit,又會變成0。 如果再調用一次park會阻塞,因為permit已經是0了。直到permit變成1.這時調用unpark會把permit設定為1.每個線程都有一個相關的permit,permit最多隻有一個,重複調用unpark不會累積

鎖的釋放

ReentrantLock.unlock

加鎖的過程分析完以後,再來分析一下釋放鎖的過程,調用release方法,這個方法裡面做兩件事,1,釋放鎖 ;2,喚醒park的線程

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}      

tryRelease

這個動作可以認為就是一個設定鎖狀态的操作,而且是将狀态減掉傳入的參數值(參數是1),如果結果狀态為0,就将排它鎖的Owner設定為null,以使得其它的線程有機會進行執行。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 這裡是将鎖的數量減1
    if (Thread.currentThread() != getExclusiveOwnerThread())// 如果釋放的線程和擷取鎖的線程不是同一個,抛出非法螢幕狀态異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { 
// 由于重入的關系,不是每次釋放鎖c都等于0,
    // 直到最後一次釋放鎖時,才會把目前線程釋放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}      

unparkSuccessor

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {//判斷後繼節點是否為空或者是否是取消狀态,
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0) //然後從隊列尾部向前周遊找到最前面的一個waitStatus小于0的節點, 至于為什麼從尾部開始向前周遊,因為在doAcquireInterruptibly.cancelAcquire方法的處理過程中隻設定了next的變化,沒有設定prev的變化,在最後有這樣一行代碼:node.next = node,如果這時執行了unparkSuccessor方法,并且向後周遊的話,就成了死循環了,是以這時隻有prev是穩定的
                s = t;
    }
//内部首先會發生的動作是擷取head節點的next節點,如果擷取到的節點不為空,則直接通過:“LockSupport.unpark()”方法來釋放對應的被挂起的線程,這樣一來将會有一個節點喚醒後繼續進入循環進一步嘗試tryAcquire()方法來擷取鎖
    if (s != null)
        LockSupport.unpark(s.thread); //釋放許可
}      

總結

繼續閱讀