天天看點

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

作者:Java碼農之路

前言

介紹 ReentrantLock、AQS 之前,先分析它們的來源,來自于 JUC 中的核心元件,java.util.concurrent 在并發程式設計中是比較會常用的工具類,裡面包含了很多在并發場景下使用的元件,比如:線程池 > ThreadPoolExecutor、阻塞隊列 > BlockingQueue、計數器 > CountDownLatch、循環屏障 > CyclicBarrier、信号量 > Semaphore、并發集合 > ConcurrentHashMap | CopyOnWriteArrayList |ConcurrentSkipListMap 等

ReentrantLock 與 synchronized 具有相同的基本行為、語義,但它擴充了一些其他的功能且更能靈活控制鎖

1、ReentrantLock 提供了公平鎖、非公平鎖的機制,而 synchronized 并沒有公平鎖的機制

2、ReentrantLock 提供了 tryLock 方法,嘗試擷取鎖而不會阻塞線程去作其他的事情,更加靈活

3、ReentrantLock#lockInterruptibly 方法提供了響應中斷的能力,若目前在等待鎖的線程被中斷了,通過此方法可以捕獲到中斷異常,以便作相應的異常處理

4、ReentrantLock > tryLock(long time, TimeUnit unit) 方法提供了鎖逾時等待能力,可以指定等待鎖的逾時時間,對于限時等待的場景很有用

5、ReentrantLock 可以通過 newCondition 方法擷取多個 Condition 對象來實作多個條件變量,以便可以更加細粒度地調用 await、signal 等待、喚醒操作;synchronized 隻能通過 wait、notify 方法實作簡單的等待和喚醒

在該篇博文主要介紹 ReentrantLock 是如何實作的,以及它的核心方法源碼,如何結合 AQS 實作鎖解決并發安全問題的

Lock

Lock 是 JUC 元件下最核心的接口,絕大部分元件都使用到了 Lock 接口,是以先以 Lock 接口作為切入點講解後續的源碼

Lock 本質上是一個接口,它提供了獲得鎖、釋放鎖、條件變量、鎖中斷能力,定義為接口就意味着它定義了一個鎖的标準規範,也同時意味着鎖的不同實作。實作 Lock 接口的類有很多,以下為幾個常見的鎖實作

ReentrantLock:表示為重入鎖,它是唯一一個實作了 Lock 接口的類;重入鎖是指目前線程獲得鎖以後,再次擷取鎖不需要進行阻塞,而是直接累加 AbstractQueuedSynchronizer#state 變量值

ReentrantReadWriteLock:表示重入讀寫鎖,實作了 ReadWriteLock 接口,在該類中維護了兩種鎖,一個是 ReadLock,另外一個是 WriteLock,它們各自實作了 Lock 接口。

讀寫鎖是一種适合讀多寫少的場景下,來解決線程安全問題的元件,基本的原則:讀讀不互斥、讀寫互斥、寫寫互斥,一旦涉及到資料變化的操作都會是互斥的

3.StampedLock:該鎖是 JDK 8 引入的鎖機制,是讀寫鎖的一個改進版本,讀寫鎖雖然通過分離讀、寫功能使得讀、讀之間可以并行,但是讀、寫是互斥的,若大量的讀線程存在,可能會引起寫線程的饑餓;StampedLock 是一種樂觀鎖的讀政策,采用 CAS 樂觀鎖完全不會阻塞寫線程

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

重要的方法,簡介如下:

  1. lock:若鎖可用就獲得鎖,若鎖不可用就阻塞,直接鎖被釋放
  2. lockInterruptibly:與 lock 方法相似,但阻塞的線程可中斷,會抛出 java.lang.InterruptedException 異常
  3. tryLock:非阻塞擷取鎖,嘗試擷取鎖,若成功傳回 true
  4. tryLock(long timeout, TimeUnit timeUnit):帶有逾時時間的擷取鎖方法
  5. unLock:釋放鎖

重入鎖

重入鎖,支援同一個線程在同一個時刻擷取同一把鎖;也就是說,若目前線程 T1 調用 lock 方法擷取了鎖以後,再次調用 lock,是不會再以阻塞的方式去擷取鎖的,直接增加鎖的重入次數就 OK 了。

synchronized、ReentrantLock 都支援重入鎖,存在多個加鎖的方法互相調用時,其實就是一種鎖可重入特性的場景,以下通過不同的代碼案例來示範可重入鎖是怎樣的

synchronized

/**
 * @author vnjohn
 * @since 2023/6/17
 */
public class SynchronizedDemo {
    public synchronized void lockMethodOne() {
        System.out.println("begin:lockMethodOne");
        lockMethodTwo();
    }

    public void lockMethodTwo() {
        synchronized (this) {
            System.out.println("begin:lockMethodTwo");
        }
    }

    public static void main(String[] args) {
        SynchronizedDemo synchronizedDemo = new SynchronizedDemo();
        new Thread(() -> synchronizedDemo.lockMethodOne()).start();
    }
}           

調用 lockMethodOne 方法擷取了目前執行個體的鎖,然後在這個方法裡面還調用了 lockMethodTwo 方法,lockMethodTwo 雖然是代碼塊鎖,但鎖住的也是目前執行個體;若不支援鎖可重入時,目前線程會因為無法擷取 lockMethodTwo 執行個體鎖而被阻塞,即會發生死鎖現象,重入鎖設計的目的是為了避免線程的死鎖

ReentrantLock

ReentrantLock 與 synchronized 同理,示例代碼如下:

/**
 * @author vnjohn
 * @since 2023/6/17
 */
public class ReentrantLockDemo {
    static Lock lock = new ReentrantLock();

    public void lockMethodOne() {
        lock.lock();
        try {
            System.out.println("begin:lockMethodOne");
            lockMethodTwo();
        } finally {
            lock.unlock();
        }
    }

    public void lockMethodTwo() {
        lock.lock();
        try {
            System.out.println("begin:lockMethodTwo");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
        new Thread(()-> reentrantLockDemo.lockMethodOne()).start();
    }
}           

ReentrantReadWriteLock 讀寫鎖

上面提及到的 synchronized、ReentrantLock 重入鎖的特性其實是排它鎖,也是悲觀鎖,該鎖在同一時刻隻允許一個線程進行通路,而讀寫鎖在同一個時刻可以允許多個線程(讀)通路,但是在寫線程通路時,所有的讀線程、其他寫線程都會被阻塞。讀寫鎖維護了一對鎖:讀鎖 > ReentrantReadWriteLock.ReadLock、寫鎖 > ReentrantReadWriteLock.WriteLock;一般情況下,讀寫鎖的性能會比悲觀鎖性能好,因為在大多數場景下讀都是多于寫的,讀寫鎖能夠比排它鎖提供更好的并發性、吞吐量;通過案例來示範讀寫鎖如何使用,如下:

/**
 * @author vnjohn
 * @since 2023/6/18
 */
public class ReentrantReadWriteLockDemo {
    private static Map<String, Object> CACHE_MAP = new HashMap<>();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();

    /**
     * 通過讀鎖從本地緩存中擷取資料
     *
     * @param key
     * @return
     */
    public static Object get(String key) {
        readLock.lock();
        try {
            System.out.println("本地緩存讀取資料:" + key);
            TimeUnit.SECONDS.sleep(1);
            return CACHE_MAP.get(key);
        } catch (InterruptedException e) {
            return null;
        } finally {
            readLock.unlock();
        }
    }

    /**
     * 通過寫鎖從本地緩存中擷取資料
     *
     * @param key
     * @return
     */
    public static Object put(String key, Object obj) {
        writeLock.lock();
        try {
            System.out.println("本地緩存寫入資料:" + key);
            TimeUnit.SECONDS.sleep(1);
            return CACHE_MAP.put(key, obj);
        } catch (InterruptedException e) {
            return null;
        } finally {
            writeLock.unlock();
        }
    }

    public static void main(String[] args) {
        String keyOnce = "thread-batch-once";
        for (int i = 0; i < 5; i++) {
            // 示範讀寫鎖互斥的情況,
            new Thread(()-> ReentrantReadWriteLockDemo.get(keyOnce)).start();
            new Thread(()-> ReentrantReadWriteLockDemo.put(keyOnce, Thread.currentThread().getName())).start();
        }
    }

}           

在該案例中,通過 HashMap 來模拟了一個本地緩存,然後使用讀寫鎖來保證這個本地緩存線程安全性。當執行讀操作的時候,需要擷取讀鎖,在并發通路的時候,讀鎖不會阻塞,因為讀操作不會影響執行結果

在執行寫操作時,線程必須要擷取寫鎖,當已經有線程持有寫鎖的情況下,目前線程會被阻塞,隻有當鎖釋放以後,其他讀寫操作才能繼續執行。使用讀寫鎖提升讀操作的并發性,也保證每次寫操作對所有的讀寫操作的可見性

讀鎖、讀鎖可以共享

讀鎖、寫鎖不可以共享(排它)

寫鎖、寫鎖不可以共享(排它)

ReentrantLock 實作原理

鎖的基本原理是,将多線程并行任務基于某一種機制實作線程的串行執行,進而達到線程安全性的目的。在 synchronize 中,存在鎖更新的概念 > 偏向鎖、輕量級鎖、重量級鎖。基于 CAS 樂觀自旋鎖優化了 synchronize 加鎖開銷,同時在重量級鎖階段,通過線程的阻塞以及喚醒來達到線程競争、同步的目的。那麼在 ReentrantLock 中,也一定會存在這樣的問題需要去解決

那麼在多線程競争重入鎖時,競争失敗的線程是如何實作阻塞以及被喚醒的呢?提及這個必須先說說 AQS 是什麼了!

AQS

AQS > 全稱 AbstractQueuedSynchronizer,内部用到了一個同步等待隊列,它是一個同步工具也是 Lock 用來實作線程同步的核心元件

從 AQS 功能、使用層面來說,AQS 分為兩種:獨占、共享

  • 獨占鎖:同一時刻隻能有一個線程持有鎖,操作、寫入資源,比如:ReentrantLock
  • 共享鎖:允許多個線程同時擷取鎖,并發通路共享資源,比如:ReentrantReadWriteLock

AQS 内部實作

AQS 内部的同步等待隊列其實就是維護了一個 FIFO 的雙向連結清單,這種結構的特點是每個節點都會兩個指針,分别指向直接後繼節點、直接前驅節點。是以雙向連結清單可以從任意一個節點開始很友善的通路前驅、後繼節點。節點由内部類 Node 表示,Node 内部類其實是由線程封裝,當線程争搶鎖失敗後會封裝成 Node 加入到 AQS 隊列中去;當擷取鎖的線程釋放鎖以後,會從隊列中喚醒其中一個阻塞的節點(線程)

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

Node 内部結構

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** 線程已取消等待鎖,調用 tryLock(TimeUnit) 或 intercept 中斷方法*/
    static final int CANCELLED =  1;
    /** 表明後續線程需要被喚醒 */
    static final int SIGNAL    = -1;
    /** 表明線程在等待狀态 */
    static final int CONDITION = -2;
    /**
     * 在共享模式下,該值表明下一個需要被分享的節點應該無條件被分享
     */
    static final int PROPAGATE = -3;

    /**
     * 0-預設值、CANCELLED、SIGNAL、CONDITION、PROPAGATE,
     * 後續會通過 CAS 操作改變該值狀态
     */
    volatile int waitStatus;

    /**
     * 前驅節點
     */
    volatile Node prev;

    /**
     * 後繼節點
     */
    volatile Node next;

    /**
     * 目前線程
     */
    volatile Thread thread;

    /**
     * 存儲在 Condition 隊列中的後繼節點
     */
    Node nextWaiter;

    /**
     * 是否為共享模式(共享鎖)
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 擷取前驅節點或抛出空指針異常
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
	// 用于建立初始的頭或共享标記
    Node() {    // Used to establish initial head or SHARED marker
    }
	// 該構造方法會構造成一個 Node,添加到等待隊列中
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
	// 該構造方法會在 Condition 隊列中使用
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}           

Node 變更過程

當出現鎖競争或釋放鎖時, AQS 同步等待隊列中的節點會發生變化

添加節點

下面來看一下添加節點的場景是怎樣的

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

在這裡會發生三個變化:

  1. 新的競争鎖線程會封裝成 Node 節點追加到同步隊列中,設定 prev 節點指向原有的 tail 尾部節點
  2. 通過 CAS 操作将 tail 指針指向新加入的 Node 節點
  3. 修改原有的 tail 尾部節點 next 指針指向新加入的 Node

以上的變化發生在核心方法:AbstractQueuedSynchronizer#addWaiter 中

釋放節點

head 節點表示擷取鎖成功的節點,當頭節點在釋放同步狀态時,會喚醒後繼節點,若後繼節點擷取鎖成功,會将自身設定為頭節點,節點的變化過程如下:

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

在這裡會發生兩個變化:

  1. 設定 head 頭節點指向下一個擷取鎖的節點
  2. 新的擷取鎖節點,将 prev 指針指向 null

設定 head 頭節點不需要使用 CAS 操作,原因:設定 head 頭節點是由擷取鎖的線程來完成的,同步鎖隻能由一個線程擷取,是以不适合通過 CAS 來保證,隻需要把 head 頭節點設定為原 head 頭節點的後繼節點,并且切斷原 head 頭節點的 next 引用即可

ReentrantLock 類源碼分析

以 ReentrantLock 類作為入口,看看該類源碼級别是如何使用 AQS 來實作線程同步的

時序圖

ReentrantLock#lock 方法源碼的調用過程,通過時序圖的方式來進行展示

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

鎖競争核心方法

簡單梳理了一下 lock 流程以後,下面來介紹 ReentrantLock、AQS 中一些核心方法内容以及其作用

NonfairSync#lock

NonfairSync 實作類是 ReentrantLock 類内部接口 Sync 的實作類,它采用非公平鎖的方式進行鎖競争, 下面來看其源碼内容是如何實作的

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
           

非公平鎖、公平鎖最大的差別:在于非公平鎖搶占鎖的邏輯是不管有沒有等待隊列中有沒有線程在排隊,我先上來用 CAS 操作搶占一下

  • CAS 成功,即表示成功擷取到了鎖
  • CAS 失敗,調用 AbstractQueuedSynchronizer#acquire 方法走競争鎖邏輯

CAS(Compare And Set-比較并交換)

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}           

通過 CAS 樂觀鎖的方式來作比較并替換,若目前記憶體中的 state 值與預期值 expect 相等,則替換為 update 值;更新成功傳回 true,否則傳回 false;該操作是原子性的,不會出現線程安全問題,這裡面會涉及到 Unsafe 類的操作

state 是 AQS 中的一個屬性,它在不同的元件實作中所表達的含義不一樣,對于重入鎖 ReentrantLock 來說,它有以下兩個含義:

  1. 當 state = 0 時,表示無鎖狀态
  2. 當 state = 1 時,表示已經有線程擷取到了鎖
因為 ReentrantLock 允許可重入,是以同一個線程多次擷取鎖時,state 會遞增,比如:在一段代碼中,目前線程重複擷取同一把鎖三次(未釋放的情況下)state 為 3;而在釋放鎖時,同樣需要釋放 3 次直至 state = 0 其他線程才有資格去擷取這把鎖

AQS#acquire

acquire 方法是 AQS 中的核心方法,若 CAS 操作未能成功,說明 state 值已經不為 0,此時繼續調用 acquire(1) 方法操作,如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}           

該方法分以下幾塊邏輯進行:

  1. 通過 tryAcquire 嘗試去擷取獨占鎖,若成功傳回 true,失敗傳回 false
  2. 若 tryAcquire 執行結果為 false,則會調用 addWaiter 方法,将目前線程封裝成 Node 添加到 AQS 隊列的尾部
  3. acquireQueued:将 Node 作為參數,通過自旋的方式去嘗試擷取鎖,這裡會執行線程的阻塞等待邏輯

ReentrantLock.NonfairSync#tryAcquire

NonfairSync#tryAcquire 方法重寫至 AQS 類,AQS 該方法并沒有實作,而是抛出異常,具體的實作内容交由給子類去進行實作,這裡采用了設計模式 > 模版方法

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

具體的子類實作:ReentrantLock.NonfairSync#tryAcquire,該方法作用:嘗試擷取一把鎖,若成功傳回 true、失敗傳回 false

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
	// 擷取目前線程
    final Thread current = Thread.currentThread();
    int c = getState();// 擷取 state 狀态
    if (c == 0) { // 代表無鎖狀态
    	// CAS 替換 state 值,CAS 成功表示鎖擷取成功
        if (compareAndSetState(0, acquires)) {
        	// 儲存目前擷取鎖的線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若同一個線程多次擷取同一把鎖,直接增加鎖重入次數即可
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}           
  1. 擷取目前記憶體中 state 鎖狀态值
  2. state 狀态為 0 代表目前鎖處于無鎖狀态,首次擷取鎖的線程可以通過 CAS 操作更新 state 鎖狀态值
  3. 若目前線程等于鎖占有的線程,則增加鎖重入次數即可
  4. 其他情況,代表擷取鎖失敗的線程,執行 AQS#acquire 方法中的 addWaiter(Node.EXCLUSIVE), arg) 方法 > 添加獨占模式的 Node 隊列節點

AQS#addWaiter

當 tryAcquire 方法擷取鎖失敗以後,則會先調用 addWaiter 将目前線程封裝成 Node,源碼如下:

private Node addWaiter(Node mode) {
	// 将目前線程封裝為 Node 節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // tail 指向 AQS 同步隊列中的尾部節點,預設:null
    Node pred = tail;
    // tail 不為空的情況下,說明同步隊列中存在節點
    if (pred != null) {
    	// 将目前線程 Node prev 前驅節點指針指向原來的尾部節點
        node.prev = pred;
        // 通過 CAS 操作将目前 Node 設定為尾部節點
        if (compareAndSetTail(pred, node)) {
        	// 設定成功以後,将原來的尾部節點 next 後繼節點指向目前 Node
            pred.next = node;
            return node;
        }
    }
    // tail 為空的情況下,調用 enq 方法将目前 Node 添加到同步隊列中
    enq(node);
    return node;
}           

入參:mode 表示節點的狀态,ReentrantLock 傳入的狀态參數:Node.EXCLUSIVE 代表獨占模式,意味着重入鎖擷取鎖采用獨占的方式,addWaiter 方法基本的執行過程,如下所示:

  1. 将目前線程封裝為 Node 節點對象
  2. 判斷目前同步隊列中的尾部節點是否為空
  3. 若尾部節點不為空,通過 CAS 操作将目前線程的 Node 添加到同步隊列中,并将新加入的 Node 設定為尾節點,采用尾插法的方式進行隊列入隊的
  4. 若尾部節點為空或者 CAS 設定尾部節點失敗,調用 enq 方法将目前 Node 添加到同步隊列中

AQS#enq

該方法通過自旋的方式以便可以成功将目前節點加入到同步隊列中

/**
 *      +------+  prev +-----+       +-----+
 * head |      | <---- |     | <---- |     |  tail
 *      +------+       +-----+       +-----+
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾節點為空,說明目前同步隊列中未存在元素
        // 初始化一個空對象 Node,先通過 CAS 将其設定為頭節點,若成功再将其設定為尾節點
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        	// 目前節點的前驅節點指向原來尾部節點、将目前節點設定為尾部節點、原來尾部節點後繼節點指向目前節點
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}           

enq 方法執行隻是為了維護同步等待隊列的節點元素,當多個線程開始競争鎖時,必然會進行排隊,第一次入隊的線程不僅要承擔将自身加入到隊列中,同時還需要初始化一個空 Node 對象,将其設定為頭尾節點

圖解分析

假設有 3 個線程同時來争搶鎖,那麼截止到 AQS#addWaiter 或 AQS#enq 方法結束之後,AQS 中同步等待隊列結構圖,如下所示:

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

AQS#acquireQueued

當執行完 AQS#addWaiter 方法以後,會将傳回的 Node 參數傳遞給 acquireQueued 方法,去實作鎖競争、阻塞線程邏輯,方法源碼如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        	// 擷取目前 Node 節點的前驅 prev 節點
            final Node p = node.predecessor();
            // 若前驅 prev 節點為頭節點,目前 Node 重新嘗試擷取鎖
            if (p == head && tryAcquire(arg)) {
            	// 擷取鎖成功,說明鎖已經被持有的線程所釋放,設定目前 Node 為頭節點
                setHead(node);
                // 将原 head 頭節點從同步隊列中移除
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 擷取鎖失敗,會擷取前驅節點并更新 waitStatus 狀态值
            // 随機調用原生鎖 LockSupport#park 方法阻塞目前競争鎖的線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}           

acquireQueued 方法基本的執行過程同時以 AQS#enq 分析中的圖解分析為例,如下所示:

  1. 擷取目前節點 Node prev 前驅節點

比如:目前是線程 B,那麼它的前驅節點就是 Thread 為 null 的頭節點

  1. 若 prev 前驅節點為 head 頭節點,那麼它有資格再去争搶一次鎖,調用 ReentrantLock.NonfairSync#tryAcquire 方法搶占鎖
也就是說線程 B 在這裡也會有一次機會再去争奪鎖

3.若搶占鎖成功,把目前搶到鎖的 Node 節點設定為 head 頭節點,并且移除原有的頭節點

4.若搶占鎖失敗,先通過 shouldParkAfterFailedAcquire 方法更新一次 waitStatus 值狀态,然後再調用原生鎖支援 > LockSupport.park(this) 阻塞目前線程等待後續被喚醒

仍然以線程 A 未釋放鎖,線程 B 處于首節點的情況作以說明:由于 acquireQueued 方法是死循環,所有的 Node 建立時 waitStatus 屬性值都為 0 (除了 Condition 條件變量)第一次周遊時會搶一次鎖;這一次會調用 shouldParkAfterFailedAcquire 方法将 waitStatus -> 0 更改為 SIGNAL-待喚醒狀态,該方法執行完以後會傳回 false,然後會繼續第二次循環,第二次執行 shouldParkAfterFailedAcquire 方法傳回 true,接着會調用 parkAndCheckInterrupt 方法使用原生鎖方式:LockSupport.park(this) > 阻塞目前線程并傳回目前線程是否中斷的辨別

5.最後,通過 cancelAcquire 方法取消目前線程擷取鎖的節點

AQS#shouldParkAfterFailedAcquire

線程 A 未釋放鎖,線程 B、線程 C 來争搶鎖肯定會失敗,失敗以後會調用 shouldParkAfterFailedAcquire 方法,Node#waitStatus 存在五種狀态,如下:

  1. CANCELLED:值為 1,即為結束狀态,在同步等待隊列中等待的線程逾時或被中斷,需要從同步隊列中取消該線程 Node 節點,進入該狀态以後的節點将不再發生變化
  2. 0:初始化狀态
  3. SIGNAL:值為 -1,目前驅節點釋放鎖以後,就會通知辨別為 SIGNAL 狀态的後繼 Node 節點線程
  4. CONDITION:值為 -2,與 ReentrantLock#newCondition 條件變量有關系,AQS.ConditionObject#addConditionWaiter 在該方法中會提現出來
  5. PROPAGATE:值為 -3,在共享模式下,PROPAGATE 處于可運作狀态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   int ws = pred.waitStatus;
   // 若前驅節點為 SIGNAL,意味着隻需要等待其他前驅節點的線程被釋放
   // 當擷取鎖的線程調用 release 方法後,該前驅節點的線程就會被喚醒
   if (ws == Node.SIGNAL)
       // 傳回 true,意味着目前線程可以放心調用 parkAndCheckInterrupt 方法進行挂起
       return true;
   // waitState 大于 0,意味着 prev 前驅節點取消了排隊操作,直接将這個節點移除即可
   if (ws > 0) {
       // 相當于:pred=pred.prev;node.prev=pred;
       // 從尾部節點開始查找,直到将所有的 CANCELLED 節點移除
       do {
           node.prev = pred = pred.prev;
       } while (pred.waitStatus > 0);
       pred.next = node;
   } else {
       // 使用 CAS 設定前驅 prev 節點狀态為 SIGNAL
       compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
   }
   return false;
}           

該方法主要作用:通過 Node 節點狀态來判斷,線程 B、線程 C 競争鎖失敗後是否應該要被挂起

  1. 若 Thread-B、Thread-C 前驅 prev 節點狀态為 SIGNAL,表示可以放心挂起所在的目前線程
  2. 若目前線程 prev 節點狀态為 CANCELLED,采用循環方式掃描同步等待隊列将 CANCELLED 狀态的節點從同步等待隊列中移除
  3. 以上兩個條件都滿足,将前驅 prev 節點狀态改為 SIGNAL,傳回 false

該方法傳回 true、false 代表的含義不同,當傳回 true 時,不會進入 AQS#acquireQueued 方法的下一次循環,會調用 parkAndCheckInterrupt 方法将目前線程阻塞;當傳回 false 時,會進入到 AQS#acquireQueued 方法的下一次循環再次嘗試争搶一次鎖,當搶鎖成功目前線程就是獨占線程,搶鎖失敗再調用 parkAndCheckInterrupt 方法将目前線程阻塞

AQS#parkAndCheckInterrupt

parkAndCheckInterrupt 方法邏輯比較簡單,先看源碼,如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}           

調用 LockSupport#park 方法挂起目前線程程式設計 waiting 狀态

LockSupport 原生鎖支援

park 方法:阻塞目前線程,不需要使用 sync 修飾,直接可以使用

unpark 方法:喚醒指定線程

unpark 方法可以先于 park 方法先調用,unpark 相當于是擷取許可數量 1、park 相當于是消費許可數量 1

Thread#interrupted:傳回目前線程是否被其他線程觸發過中斷請求,也就是調用 Thread#interrupt 方法;若有觸發過中斷請求,那麼該方法會傳回目前的中斷辨別為 true,并且會對中斷辨別進行複位辨別已經響應過了中斷請求,也就是會在 AQS#acquire 方法中執行 selfInterrupt 方法

selfInterrupt:辨別目前線程是否執行 AQS#acquireQueued 方法時被中斷過,若被中斷過,則需要響應中斷請求,因為線上程調用 AQS#acquireQueued 方法是不會去響應中斷請求的

通過 AQS#acquireQueued 方法來競争鎖,若 Thread-A 仍然還在執行中未釋放鎖,那麼 Thread-B、Thread-C 還會繼續挂起

到這裡,鎖相關的競争方法在這裡基本上都介紹過了,其實看到這裡,能發現,當競争的鎖線程失敗時,會調用 LockSupport#park 方法阻塞住,等待鎖匙放時,還會有 LockSuppor#unpark 方法進行鎖匙放,下面就來分析鎖匙放時的一些核心方法是如何處理的!!!

鎖釋放核心方法

若此時 Thread-A 釋放鎖了,那麼接下來 Thread-B、Thread-C 是如何走的呢?

ReentrantLock#unlock

public void unlock() {
    sync.release(1);
}           

在 unlock 方法中,會調用其内部類 Sync#release 方法,但由于 Sync 并未其父類 AQS#release 方法,是以它會延用其父類 AQS#release 方法的處理邏輯,源碼如下:

public final boolean release(int arg) {
	// 若釋放占用目前鎖的節點 Node 線程成功
    if (tryRelease(arg)) {
        // 擷取 AQS 同步等待隊列中的 head 頭節點
        Node h = head;
        // 若 head 節點不為空 & waitStatus 非預設值,直接喚醒下一個節點去争搶鎖
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}           

該方法主要的執行流程分為幾步,如下:

  1. 先調用 ReentrantLock.Sync#tryRelease 方法探測鎖釋放是否可以成功,它來自 AQS 子類 ReentrantLock.Sync 所實作的
  2. 擷取同步等待隊列中的 head 首節點,若其不為空,并且它的 waitStatus 屬性值非預設值 0,那麼就會調用 unparkSuccessor 方法喚醒隊列中的下一個節點

ReentrantLock.Sync#tryRelease

該方法也展現了鎖重入次數的操作,源代碼如下:

protected final boolean tryRelease(int releases) {
	// 目前鎖線程重入次數減去要釋放的次數
    int c = getState() - releases;
    // 目前線程不等于鎖持有線程,則判斷中斷監聽鎖異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若減去後的鎖次數為 0
    if (c == 0) {
    	// 傳回 true、設定鎖持有線程為 null,其他線程就可以競争鎖了
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 遞減鎖重入次數,傳回 false,鎖仍然被目前線程所持有
    setState(c);
    return free;
}           

ReentrantLock.Sync#tryRelease 執行流程主要分析如下:

  1. 通過将 AQS#state 屬性值減少傳入的參數值(參數:1)若減去的結果狀态值為 0,就将排它鎖 Owner 持有線程設定為 null,同時傳回 true,以便于其他的線程有機會執行競争鎖操作
  2. 若減去的結果狀态值不為 0,傳回 free 變量預設值 false,目前線程仍然繼續持有這把鎖,其他線程暫時不可以争搶鎖
在排它鎖中,加鎖時 state 狀态會增加 1,在解鎖時會減去 1,同一把鎖,在被重入時,可能會被疊加為 2、3、4 等,隻有當調用 unlock 方法次數與調用 lock 方法次數相對應,才會把鎖 Owner 持有線程設定為空,也隻有這種情況下該方法執行結果才有傳回 true

AQS#unparkSuccessor

當 ReentrantLock.Sync#tryRelease 方法執行完以後,會取同步等待隊列中首節點,喚醒隊列中下一個節點去争搶這把鎖,該方法源碼如下:

private void unparkSuccessor(Node node) {
    // 擷取傳入節點的 waitStatus 屬性值  
    int ws = node.waitStatus;
    if (ws < 0)
    	// 小于 0 通過 CAS 将其修改為 0 
        compareAndSetWaitStatus(node, ws, 0);
    // 擷取傳入節點的後繼節點
    Node s = node.next;
    // 若後繼節點為空或者 waitStatus 大于 0 說明它是 CANCELLED-結束狀态
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從尾部節點開始掃描,找到距離目前傳入節點最近的一個 waitStatus 小于等于 0 的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 将同步等待隊列中 > 最前面的一個非 CANCELLED 狀态的 Node 線程進行喚醒 
    if (s != null)
        LockSupport.unpark(s.thread);
}           

分析一下此方法分别會作那些事情,如下:

1.擷取目前傳入節點 Node waitStatus 屬性值,若它小于 0 時,先通過 CAS 操作将其修改為 0

2.目前節點其實就是 head 頭節點,喚醒的操作不會喚醒頭節點,隻會喚醒頭節點後面不為 CANCELLED 狀态的首節點 Node 線程

擷取目前節點的 next 後繼節點,若後繼節點為空或 waitStatus 大于 0(CANCELLED)那麼就會周遊該同步等待隊列,從尾部往前查找的方式,比對到與目前節點最近的一個非 CANCELLED 節點,将其設定為待喚醒的節點

3.若待喚醒的節點不為空,調用原生鎖 LockSupport#unpark 方法将其喚醒,以便它可以再次去争搶鎖

當節點被喚醒後,比如:Thread-A 釋放鎖成功以後,會調用 AQS#unparkSuccessor 方法喚醒它的下一個節點 Thread-B 所持有的(非 CANCELLED)

随機 Thread-B 被喚醒,它會繼續執行 AQS#acquireQueued 方法中的循環,執行:if (p == head && tryAcquire(arg)) 代碼塊,是以後續被喚醒的線程都會是這樣,通過該代碼來確定同步隊列中的節點能夠擷取鎖資源

那麼為什麼在釋放鎖的時候一定要從尾部開始掃描呢?

回顧一下 AQS#enq 方法執行的邏輯,插入新節點時,它是從隊列尾部進行節點入隊的,看下圖紅色所标注的,在 CAS 操作成功之後,t.next = node; 操作之前,可能會存在其他線程調用 unlock 方法從 head 開始向後周遊,由于 t.next = node; 還未執行也就意味着同步等待隊列關系還未建立完整,就會導緻周遊到原始的尾部節點時被中斷 > 隊列中的連結清單關系斷鍊了;是以說,從後往前周遊就不會出現這個問題
深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

挂起線程被喚醒後執行過程

當持有鎖的線程調用 ReentrantLock#unlock 方法,原本被挂起的 Thread-B、Thread-C 線程就有機會被喚醒再繼續執行,被喚醒之後的線程會繼續執行 AQS#acquireQueued 方法内的循環,該方法在上面已經分析過了,接下來以 Thread-B 被喚醒後為例,看它整個的執行過程以及變化,以流程圖的方式呈現

同步等待隊列變更結構圖:

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

同步等待隊列執行過程流程圖:

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

部落客是以如下源碼,對 ReentrantLock、AQS 核心方法源碼進行檢視的,分享如下:

public class MultiThreadReentrantLockDemo {
    private static final ReentrantLock LOCK = new ReentrantLock();

    public void threadAProcess() {
        LOCK.lock();
        try {
            System.out.println("執行:threadAProcess 方法");
            // 處理業務邏輯中....
            // 斷點過程中該時間可以延長
            TimeUnit.SECONDS.sleep(6);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            LOCK.unlock();
        }
    }

    public void threadBProcess() {
        LOCK.lock();
        try {
            System.out.println("執行:threadBProcess 方法");
            // 處理業務邏輯中....
            TimeUnit.SECONDS.sleep(60 * 5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            LOCK.unlock();
        }
    }

    public void threadCProcess() {
        LOCK.lock();
        try {
            System.out.println("執行:threadCProcess 方法");
            // 處理業務邏輯中....
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            LOCK.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MultiThreadReentrantLockDemo multiThreadLock = new MultiThreadReentrantLockDemo();

        new Thread(()-> multiThreadLock.threadAProcess(), "Thread-A").start();

        Thread threadB = new Thread(() -> multiThreadLock.threadBProcess(), "Thread-B");
        threadB.start();
        // 可能會出現 Thread-C 先執行的情況,是以先通過 join 方法讓線程 B 先跑完
        threadB.join();

        new Thread(()-> multiThreadLock.threadCProcess(), "Thread-C").start();
    }
}           

注意:斷點模式下,要以 Thread 模式執行;如下圖:

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

公平鎖、非公平鎖差別

鎖的公平與否其實取決于擷取鎖的順序性,若為公平鎖,那麼擷取鎖的順序應該絕對符合 FIFO 隊列 > 先進先出的特性,上面所分析的例子都是以非公平鎖(預設是非公平鎖)隻要 CAS 設定 AQS#state 屬性值成功,就代表目前線程擷取到了鎖,而公平鎖不一樣,差異的地方有如下兩點:

1、FairSync#lock、NonfairSync#lock

非公平鎖在擷取鎖時,先通過 CAS 操作進行鎖搶占,而公平鎖不會
深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術

2、FairSync#tryAcquire、NonfairSync#tryAcquire

兩者方法之間不同之處在于判斷多了一個條件:hasQueuedPredecessors,也就是說加入了同步隊列中目前節點是否有前驅節點的判斷,若該方法傳回 true,則表示有線程比目前線程更早入隊、更早地請求擷取鎖,是以,需要等待前驅節點的線程擷取完再釋放鎖以後才能繼續擷取鎖!

深入源碼解析 ReentrantLock、AQS:掌握 Java 并發程式設計關鍵技術
public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}           

1、h != t:頭尾節點是否相同,若相同則表示隊列中隻有一個節點,即目前未發生鎖競争

假設:目前隻有線程 Thread-A 一人争搶鎖,那麼 head == null && tail ==null,那麼傳回 false 會去争搶鎖;反之,會繼續走第二步的判斷

2、(s = h.next) == null:檢查頭節點的後繼節點是否為空,即判斷是否存在後繼節點

假設:線程 Thread-A、Thread-B 同時争搶鎖,Thread-A 搶到了,那麼同步等待隊列中會有頭節點、Thread-B 所在節點,條件不滿足傳回 false,會繼續走第三步的判斷;反之,目前隻有一個節點 > 傳回 true 不會去争搶鎖,不走第三步的判斷

3、s.thread != Thread.currentThread():檢查後繼節點的線程是否與目前線程不同,即判斷後繼節點持有線程是否為目前線程

假設:頭節點的後繼節點持有線程就是目前的線程,會傳回 false 會去争搶鎖;反之,頭節點的後繼節點持有線程不是目前的線程,會傳回 true 不會去争搶鎖,它會進入到排隊模式!!!

總結

ReentrantLock 基于悲觀鎖實作(LockSupport),但是在處理 AQS#state 鎖狀态時是基于 CAS 樂觀鎖實作的,兩者在不同場景下都會各自的好處,因為前者已經悲觀鎖,後者再用 CAS 操作并沒有任何問題

>在這裡其實就是偷換概念了,不一定用了悲觀鎖就不能用樂觀鎖

該篇博文介紹了 JUC 元件下 ReentrantLock 核心概念、使用、源碼以及 AQS 基礎元件的核心方法,闡述了 AQS 内部實作、資料結構以及節點變更過程,在後面,看 ReentrantLock 是如何基于 AQS 核心方法去完成其内部鎖競争工作的、鎖釋放後如何喚醒其他節點線程,全文上下以畫圖+文字加以說明,不限于時序圖、結構圖、流程圖,最後說明了在 ReentrantLock 公平鎖、非公平鎖之間的差別,希望能夠幫助你快速了解 AQS 内部如何巧妙處理高并發場景問題的

繼續閱讀