天天看點

終于講明白了!Java并發程式設計不得不學的AQS思維導圖一、什麼是AQS二、AQS的實作原理三、AQS的模闆模式四、認識ReentrantLock五、源碼分析ReentrantLock總結

思維導圖

終于講明白了!Java并發程式設計不得不學的AQS思維導圖一、什麼是AQS二、AQS的實作原理三、AQS的模闆模式四、認識ReentrantLock五、源碼分析ReentrantLock總結
文章已收錄Github精選,歡迎Star:https://github.com/yehongzhi/learningSummary

一、什麼是AQS

談到并發程式設計,不得不說AQS(AbstractQueuedSynchronizer),這可謂是Doug Lea老爺子的大作之一。AQS即是抽象隊列同步器,是用來建構Lock鎖和同步元件的基礎架構,很多我們熟知的鎖和同步元件都是基于AQS建構,比如ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。

實際上AQS是一個抽象類,我們不妨先看一下源碼:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
	//頭結點
    private transient volatile Node head;
    //尾節點
    private transient volatile Node tail;
    //共享狀态
    private volatile int state;
    
    //内部類,建構連結清單的Node節點
	static final class Node {
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    }
}
//AbstractQueuedSynchronizer的父類
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    //占用鎖的線程
    private transient Thread exclusiveOwnerThread;
}
           

由源碼可以看出AQS是有以下幾個部分組成的:

終于講明白了!Java并發程式設計不得不學的AQS思維導圖一、什麼是AQS二、AQS的實作原理三、AQS的模闆模式四、認識ReentrantLock五、源碼分析ReentrantLock總結

1.1 state共享變量

AQS中裡一個很重要的字段state,表示同步狀态,是由

volatile

修飾的,用于展示目前臨界資源的獲鎖情況。通過getState(),setState(),compareAndSetState()三個方法進行維護。

private volatile int state;

protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
//CAS操作
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
           

關于state的幾個要點:

  • 使用volatile修飾,保證多線程間的可見性。
  • getState()、setState()、compareAndSetState()使用final修飾,限制子類不能對其重寫。
  • compareAndSetState()采用樂觀鎖思想的CAS算法,保證原子性操作。

1.2 CLH隊列

AQS裡另一個重要的概念就是CLH隊列,它是一個雙向連結清單隊列,其内部由head和tail分别記錄頭結點和尾結點,隊列的元素類型是Node。

簡單講一下這個隊列的作用,就是當一個線程擷取同步狀态(state)失敗時,AQS會将此線程以及等待的狀态等資訊封裝成Node加入到隊列中,同時阻塞該線程,等待後續的被喚醒。

隊列的元素就是一個個的Node節點,下面講一下Node節點的組成:

static final class Node {
	//共享模式下的等待标記
    static final Node SHARED = new Node();
	//獨占模式下的等待标記
    static final Node EXCLUSIVE = null;
    //表示目前節點的線程因為逾時或者中斷被取消
    static final int CANCELLED =  1;
	//表示目前節點的後續節點的線程需要運作,也就是通過unpark操作
    static final int SIGNAL    = -1;
	//表示目前節點在condition隊列中
    static final int CONDITION = -2;
	//共享模式下起作用,表示後續的節點會傳播喚醒的操作
    static final int PROPAGATE = -3;
	//狀态,包括上面的四種狀态值,初始值為0,一般是節點的初始狀态
    volatile int waitStatus;
	//上一個節點的引用
    volatile Node prev;
	//下一個節點的引用
    volatile Node next;
	//儲存在目前節點的線程引用
    volatile Thread thread;
	//condition隊列的後續節點
    Node nextWaiter;
}
           

1.3 exclusiveOwnerThread

AQS通過繼承AbstractOwnableSynchronizer類,擁有的屬性。表示獨占模式下同步器的持有者。

二、AQS的實作原理

AQS有兩種模式,分别是獨占式和共享式。

2.1 獨占式

同一時刻僅有一個線程持有同步狀态,也就是其他線程隻有在占有的線程釋放後才能競争,比如ReentrantLock。下面從源碼切入,梳理獨占式的實作思路。

首先看acquire()方法,這是AQS在獨占模式下擷取同步狀态的方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           

先講這個方法的總體思路:

  • tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回。
  • 如果失敗則調用addWaiter()方法把目前線程包裝成Node(狀态為EXCLUSIVE,标記為獨占模式)插入到CLH隊列末尾。
  • 然後acquireQueued()方法使線程阻塞在等待隊列中擷取資源,一直擷取到資源後才傳回,如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
  • 線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。

我們展開來分析,看tryAcquire()方法,嘗試擷取資源,成功傳回true,失敗傳回false。

//直接抛出異常,這是由子類進行實作的方法,展現了模闆模式的思想
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
           

為什麼沒有具體實作呢,其實這是模闆模式的思想。這個方法是嘗試擷取資源,但是擷取資源的方式有很多種實作,比如公平鎖有公平鎖的擷取方式,非公平鎖有非公平鎖的擷取方式(後面會講,别急)。是以這裡是一個沒有具體實作的方法,需要由子類去實作。

接着看addWaiter()方法,這個方法的作用是把目前線程包裝成Node添加到隊列中。

private Node addWaiter(Node mode) {
    //把目前線程包裝成Node節點
    Node node = new Node(Thread.currentThread(), mode);
    //擷取到尾結點
    Node pred = tail;
    //判斷尾結點是否為null,如果不為空,那就證明隊列已經初始化了
    if (pred != null) {
        //已經初始化了,就直接把Node節點添加到隊列的末尾
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            //傳回包含目前線程的節點Node
            return node;
        }
    }
    //如果隊列沒有初始化,那就調用enq()方法
    enq(node);
    return node;
}
           

接着我們看enq()方法,就是一個自旋的操作,把傳進來的node添加到隊列最後,如果隊列沒有初始化則進行初始化。

private Node enq(final Node node) {
    //自旋操作,也就是死循環,隻有加入隊列成功才會return
    for (;;) {
        //把尾結點指派給t
        Node t = tail;
        //如果為空,證明沒有初始化,進行初始化
        if (t == null) { // Must initialize
            //建立一個空的Node節點,并且設定為頭結點
            if (compareAndSetHead(new Node()))
                //然後把頭結點指派給尾結點
                tail = head;
        } else {
            //如果是第一次循環為空,就已經建立了一個一個Node,那麼第二次循環就不會為空了
            //如果尾結點不為空,就把傳進來的node節點的前驅節點指向尾結點
            node.prev = t;
            //cas原子性操作,把傳進來的node節點設定為尾結點
            if (compareAndSetTail(t, node)) {
                //把原來的尾結點的後驅節點指向傳進來的node節點
                t.next = node;
                return t;
            }
        }
    }
}
           

接着我們再把思路跳回去頂層的方法,看acquireQueued()方法。

//在隊列中的節點node通過acquireQueued()方法擷取資源,忽略中斷。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋的操作,一個死循環
        for (;;) {
            //擷取傳進來的node節點的前驅節點,指派給p
            final Node p = node.predecessor();
            //如果p是頭結點,node節點就是第二個節點,則再次去嘗試擷取資源
            if (p == head && tryAcquire(arg)) {
                //tryAcquire(arg)擷取資源成功的話,則把node節點設定為頭結點
                setHead(node);
                //把原來的頭結點p的後驅節點設定為null,等待GC垃圾回收
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果p不是頭結點,或者tryAcquire()擷取資源失敗,判斷是否可以被park,也就是把線程阻塞起來
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//&&前面如果傳回true,将目前線程阻塞并檢查是否被中斷
                //如果阻塞過程中被中斷,則置interrupted标志位為true。
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

最後是selfInterrupt()方法,自我中斷。

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
           

過程記不住沒關系,下面畫張圖來總結一下,其實很簡單。

終于講明白了!Java并發程式設計不得不學的AQS思維導圖一、什麼是AQS二、AQS的實作原理三、AQS的模闆模式四、認識ReentrantLock五、源碼分析ReentrantLock總結

2.2 共享式

即共享資源可以被多個線程同時占有,直到共享資源被占用完畢。比如ReadWriteLock和CountdownLatch。下面我們從源碼去分析其實作原理。

首先還是看最頂層的acquireShared()方法。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
           

這段代碼很簡單,首先調用tryAcquireShared()方法,tryAcquireShared傳回是一個int數值,當傳回值大于等于0的時候,說明獲得成功擷取鎖,方法結束,否則傳回負數,表示擷取同步狀态失敗,執行doAcquireShared方法。

tryAcquireShared()方法是一個模闆方法由子類去重寫,意思是需要如何擷取同步資源由實作類去定義,AQS隻是一個架構。

那麼就看如果擷取資源失敗,執行的doAcquireShared()方法。

private void doAcquireShared(int arg) {
    //調用addWaiter()方法,把目前線程包裝成Node,标志為共享式,插入到隊列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //擷取目前節點node的前驅節點
            final Node p = node.predecessor();
            //前驅節點是否是頭結點
            if (p == head) {
                //如果前驅節點是頭結點,則調用tryAcquireShared()擷取同步資源
                int r = tryAcquireShared(arg);
                //r>=0表示擷取同步資源成功,隻有擷取成功,才會執行到return退出for循環
                if (r >= 0) {
                    //設定node為頭結點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //判斷是否可以被park,跟獨占式的邏輯一樣傳回true,則進行park操作,阻塞線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

這段邏輯基本上跟獨占式的邏輯差不多,不同的地方在于入隊的Node是标志為SHARED共享式的,擷取同步資源的方式是tryAcquireShared()方法。

三、AQS的模闆模式

模闆模式在AQS中的應用可謂是一大精髓,在上文中有提到的tryAcquireShared()和tryAcquire()都是很重要的模闆方法。一般使用AQS往往都是使用一個内部類繼承AQS,然後重寫相應的模闆方法。

AQS已經把一些常用的,比如入隊,出隊,CAS操作等等建構了一個架構,使用者隻需要實作擷取資源,釋放資源的,因為很多鎖,還有同步器,其實就是擷取資源和釋放資源的方式有比較大的差別。

那麼我們看一下模闆方法有哪些。

3.1 tryAcquire()

tryAcquire()方法,獨占式擷取同步資源,傳回true表示擷取同步資源成功,false表示擷取失敗。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
           

3.2 tryRelease()

tryRelease()方法,獨占式使用,tryRelease()的傳回值來判斷該線程是否已經完成釋放資源,子類來決定是否能成功釋放鎖。

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
           

3.3 tryAcquireShared()

tryAcquireShared()方法,共享式擷取同步資源,傳回大于等于0表示擷取資源成功,傳回小于0表示失敗。

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
           

3.4 tryReleaseShared()

tryReleaseShared()方法,共享式嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。

protected boolean tryReleaseShared(int arg) {
	throw new UnsupportedOperationException();
}
           

3.5 isHeldExclusively()

isHeldExclusively()方法,該線程是否正在獨占資源。隻有用到condition才需要去實作它。

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
           

四、認識ReentrantLock

ReentrantLock是一個很經典的使用AQS的案例,不妨以此為切入點來繼續深入。ReentrantLock的特性有很多,首先它是一個悲觀鎖,其次有兩種模式分别是公平鎖和非公平鎖,最後它是重入鎖,也就是能夠對共享資源重複加鎖。

AQS通常是使用内部類實作,是以不難想象在ReentrantLock類裡有兩個内部類,我們看一張類圖。

終于講明白了!Java并發程式設計不得不學的AQS思維導圖一、什麼是AQS二、AQS的實作原理三、AQS的模闆模式四、認識ReentrantLock五、源碼分析ReentrantLock總結

FairSync是公平鎖的實作,NonfairSync則是非公平鎖的實作。通過構造器傳入的boolean值進行判斷。

public ReentrantLock(boolean fair) {
    //true則使用公平鎖,false則使用非公平鎖
    sync = fair ? new FairSync() : new NonfairSync();
}
//預設是非公平鎖
public ReentrantLock() {
    sync = new NonfairSync();
}
           

公平鎖是遵循FIFO(先進先出)原則的,先到的線程會優先擷取資源,後到的線程會進行排隊等待,能保證每個線程都能拿到鎖,不會存在有線程餓死的情況。

非公平鎖是則不遵守先進先出的原則,會出現有線程插隊的情況,不能保證每個線程都能拿到鎖,會存在有線程餓死的情況。

下面我們從源碼分析去找出這兩種鎖的差別。

五、源碼分析ReentrantLock

5.1 上鎖

ReentrantLock是通過lock()方法上鎖,是以看lock()方法。

public void lock() {
    sync.lock();
}
           

sync就是NonfairSync或者FairSync。

//這裡就是調用AQS的acquire()方法,擷取同步資源
final void lock() {
    acquire(1);
}
           

acquire()方法前面已經解析過了,主要看FairSync的tryAcquire()方法。

protected final boolean tryAcquire(int acquires) {
    //擷取目前線程
    final Thread current = Thread.currentThread();
    //擷取同步狀态
    int c = getState();
    //判斷同步狀态是否為0
    if (c == 0) {
        //關鍵在這裡,公平鎖會判斷是否需要排隊
        if (!hasQueuedPredecessors() &&
            //如果不需要排隊,則直接cas操作更新同步狀态為1
            compareAndSetState(0, acquires)) {
            //設定占用鎖的線程為目前線程
            setExclusiveOwnerThread(current);
            //傳回true,表示上鎖成功
            return true;
        }
    }
    //判斷目前線程是否是擁有鎖的線程,主要是可重入鎖的邏輯
    else if (current == getExclusiveOwnerThread()) {
        //如果是目前線程,則同步狀态+1
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        //設定同步狀态
        setState(nextc);
        return true;
    }
    //以上情況都不是,則傳回false,表示上鎖失敗。上鎖失敗根據AQS的架構設計,會入隊排隊
    return false;
}
           

如果是非公平鎖NonfairSync的tryAcquire(),我們繼續分析。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
//非公平式擷取鎖
final boolean nonfairTryAcquire(int acquires) {
    //這段跟公平鎖是一樣的操作
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //關鍵在這裡,不再判斷是否需要排隊,而是直接去更新同步狀态,通俗點講就是插隊
        if (compareAndSetState(0, acquires)) {
            //如果擷取同步狀态成功,則設定占用鎖的線程為目前線程
            setExclusiveOwnerThread(current);
            //傳回true表示擷取鎖成功
            return true;
        }
    }
    //以下邏輯跟公平鎖的邏輯一樣
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

其實很明顯了,關鍵的差別就在于嘗試擷取鎖的時候,公平鎖會判斷是否需要排隊再去更新同步狀态,非公平鎖是直接就更新同步,不判斷是否需要排隊。

從性能上來說,公平鎖的性能是比非公平鎖要差的,因為公平鎖要遵守FIFO(先進先出)的原則,這就會增加了上下文切換與等待線程的狀态變換時間。

非公平鎖的缺點也是很明顯的,因為允許插隊,這就會存在有線程餓死的情況。

5.2 解鎖

解鎖對應的方法就是unlock()。

public void unlock() {
    //調用AQS中的release()方法
    sync.release(1);
}
//這是AQS架構定義的release()方法
public final boolean release(int arg) {
    //目前鎖是不是沒有被線程持有,傳回true表示該鎖沒有被任何線程持有
    if (tryRelease(arg)) {
        //擷取頭結點h
        Node h = head;
        //判斷頭結點是否為null并且waitStatus不是初始化節點狀态,解除線程挂起狀态
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           

關鍵在于tryRelease(),這就不需要分公平鎖和非公平鎖的情況,隻需要考慮可重入的邏輯。

protected final boolean tryRelease(int releases) {
    //減少可重入的次數
    int c = getState() - releases;
    //如果目前線程不是持有鎖的線程,抛出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果持有線程全部釋放,将目前獨占鎖所有線程設定為null,并更新state
    if (c == 0) {
        //狀态為0,表示持有線程被全部釋放,設定為true
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           

總結

JUC可謂是學習java的一個難點,而學習AQS其實關鍵在于并發的思維,因為需要考慮的情況很多,其次需要了解模闆模式的思想,這才能了解為什麼AQS作為一個架構的作用。ReentrantLock這個類我覺得是了解AQS一個很好的切入點,看懂了之後再去看AQS的其他應用類應該會輕松很多。

那麼這篇文章就講到這裡了,希望看完能有所收獲,感謝你的閱讀。

覺得有用就點個贊吧,你的點贊是我創作的最大動力~

我是一個努力讓大家記住的程式員。我們下期再見!!!

終于講明白了!Java并發程式設計不得不學的AQS思維導圖一、什麼是AQS二、AQS的實作原理三、AQS的模闆模式四、認識ReentrantLock五、源碼分析ReentrantLock總結
能力有限,如果有什麼錯誤或者不當之處,請大家批評指正,一起學習交流!