天天看點

并發程式設計專題(六)抽象隊列同步器ReentrantLock詳解

作者:京城小人物

1、前言

通過上篇并發程式設計專題(五)抽象隊列同步器AQS應用Lock詳解

我們已經知道AQS的功能分為獨占和共享,底層資料結構是一個雙向連結清單

獨占鎖,每次隻能有一個線程持有鎖,比如 ReentrantLock 就是以獨占方式實作的互斥鎖

共享鎖,允許多個線程同時擷取鎖,并發通路共享資源 ,比如ReentrantReadWriteLock

2、ReentrantLock源碼分析

以 ReentrantLock 作為切入點,來看看在ReentrantLock是如何使用 AQS 來實作線程的同步的

2.1、入口

public void lock() {
    sync.lock();
}           

這個是 reentrantLock 擷取鎖的入口

sync 實際上是一個抽象的靜态内部類,它繼承了 AQS 來實作重入鎖的邏輯,我們知道AQS 是一個同步隊列,它能夠實作線程的阻塞以及喚醒,但它并不具備業務功能,是以在不同的同步場景中,會繼承 AQS 來實作對應場景的功能

Sync 有兩個具體的實作類,分别是:

NofairSync:非公平,表示可以存在搶占鎖的功能,也就是說不管目前隊列上是否存在其他線程等待,新線程都有機會搶占鎖

FailSync:公平,表示所有線程嚴格按照 FIFO 來擷取鎖

2.2、NofairSync.lock加鎖方法詳解

以非公平鎖為例,看看lock中的實作

  1. 非公平鎖和公平鎖最大的差別在于,在非公平鎖中我搶占鎖的邏輯是,不管有沒有線程排隊,我先上來 cas 去搶占一下
  2. CAS 成功,就表示成功獲得了鎖
  3. CAS 失敗,調用 acquire(1)走鎖競争邏輯
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}           

cas的實作原理

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}           

通過 cas 樂觀鎖的方式來做比較并替換,這段代碼的意思是,如果目前記憶體中的state 的值和預期值 expect 相等,則替換為 update。更新成功傳回 true,否則傳回 false.

這個操作是原子的,不會出現線程安全問題,這裡面涉及到Unsafe這個類的操作,以及涉及到 state 這個屬性的意義。

state 是 AQS 中的一個屬性,它在不同的實作中所表達的含義不一樣,對于重入鎖的實作來說,表示一個同步狀态。它有兩個含義的表示

1. 當 state=0 時,表示無鎖狀态

2. 當 state>0 時,表示已經有線程獲得了鎖,也就是 state=1,但是因為

ReentrantLock 允許重入,是以同一個線程多次獲得同步鎖的時候,state 會遞增,比如重入 5 次,那麼 state=5。而在釋放鎖的時候,同樣需要釋放 5 次直到 state=0,其他線程才有資格獲得鎖

(對于unsafe類會在後續講解)

AQS.acquire

acquire 是 AQS 中的方法,如果 CAS 操作未能成功,說明 state 已經不為 0,此時繼續 acquire(1)操作

這個方法的主要邏輯是:

  1. 通過 tryAcquire 嘗試擷取獨占鎖,如果成功傳回 true,失敗傳回 false
  2. 如果 tryAcquire 失敗,則會通過 addWaiter 方法将目前線程封裝成 Node 添加到 AQS 隊列尾部
  3. acquireQueued,将 Node 作為參數,通過自旋去嘗試擷取鎖。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();
}
           

NonfairSync.tryAcquire

這個方法的作用是嘗試擷取鎖,如果成功傳回 true,不成功傳回 false

它是重寫 AQS 類中的 tryAcquire 方法,并且大家仔細看一下 AQS 中 tryAcquire方法的定義,并沒有實作,而是抛出異常。按照一般的思維模式,既然是一個不實作的模版方法,那應該定義成 abstract,讓子類來實作呀?這裡這麼做是因為獨占鎖和共享鎖的調用方法不同,子類隻需要實作所需要的方法

ReentrantLock.nofairTryAcquire

  1. 擷取目前線程,判斷目前的鎖的狀态
  2. 如果 state=0 表示目前是無鎖狀态,通過 cas 更新 state 狀态的值
  3. 目前線程是屬于重入,則增加重入次數
final boolean nonfairTryAcquire(int acquires) {
    // 擷取目前執行的線程
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {//表示無鎖狀态
        //cas 替換 state 的值,cas 成功表示擷取鎖成功
        if (compareAndSetState(0, acquires)) {
            //儲存目前獲得鎖的線程,下次再來的時候不要再嘗試競争鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果同一個線程來獲得鎖,直接增加重入次數
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}           

AQS.addWaiter

當 tryAcquire 方法擷取鎖失敗以後,則會先調用 addWaiter 将目前線程封裝成Node.

入參 mode 表示目前節點的狀态,傳遞的參數是 Node.EXCLUSIVE,表示獨占狀态。意味着重入鎖用到了 AQS 的獨占鎖功能

  1. 将目前線程封裝成 Node
  2. 目前連結清單中的 tail 節點是否為空,如果不為空,則通過 cas 操作把目前線程的node 添加到 AQS 隊列
  3. 如果為空或者 cas 失敗,調用 enq 将節點添加到 AQS 隊列
private Node addWaiter(Node mode) {
    //把目前線程封裝為 Node
    Node node = new Node(Thread.currentThread(), mode);
    //tail是AQS中表示同步隊列隊尾的屬性,預設是null
    Node pred = tail;
    //tail 不為空的情況下,說明隊列中存在節點
    if (pred != null) {
        //把目前線程的 Node 的 prev 指向 tail
        node.prev = pred;
        //通過cas把node加入到AQS隊尾,也就是設定為tail
        if (compareAndSetTail(pred, node)) {
            ////設定成功以後,把原tail節點的next指向目前 node
            pred.next = node;
            return node;
        }
    }
    //tail=null,把 node 添加到同步隊列
    enq(node);
    return node;
}           

enq

通過自旋把目前節點加入隊列

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}           

圖解分析:

假設 3 個線程來争搶鎖,那麼截止到 enq 方法運作結束之後,或者調用 addwaiter

方法結束後,AQS 中的連結清單結構圖

并發程式設計專題(六)抽象隊列同步器ReentrantLock詳解

AQS.acquireQueued

通過 addWaiter 方法把線程添加到連結清單後,會接着把 Node 作為參數傳遞給acquireQueued 方法,去競争鎖

  1. 擷取目前節點的 prev 節點
  2. 如果 prev 節點為 head 節點,那麼它就有資格去争搶鎖,調用 tryAcquire 搶占鎖
  3. 搶占鎖成功以後,把獲得鎖的節點設定為 head,并且移除原來的初始化 head節點
  4. 如果獲得鎖失敗,則根據 waitStatus 決定是否需要挂起線程
  5. 最後,通過 cancelAcquire 取消獲得鎖的操作
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //擷取目前節點的 prev 節點
            final Node p = node.predecessor();
            //如果是head節點,說明有資格去争搶鎖
            if (p == head && tryAcquire(arg)) {
                //擷取鎖成功,也就是ThreadA已經釋放了鎖,然後設定 head為ThreadB獲得執行權限
                setHead(node);
                //把原head節點從連結清單中移除
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //ThreadA 可能還沒釋放鎖,使得ThreadB在執行tryAcquire時會傳回false
            if (shouldParkAfterFailedAcquire(p, node) &&
             parkAndCheckInterrupt())
                ////并且傳回目前線程在等待過程中有沒有中斷過
                interrupted = true;
        }
     } finally {
        if (failed)
            cancelAcquire(node);
    }
}    

           

NofairSync.tryAcquire

這個方法在前面分析過,就是通過 state 的狀态來判斷是否處于無鎖狀态,然後在通過 cas 進行競争鎖操作。成功表示獲得鎖,失敗表示獲得鎖失敗

shouldParkAfterFailedAcquire

如果 ThreadA 的鎖還沒有釋放的情況下,ThreadB 和 ThreadC 來争搶鎖肯定是會失敗,那麼失敗以後會調用 shouldParkAfterFailedAcquire 方法

Node 有 5 種狀态,分别是:CANCELLED(1),SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、預設狀态(0)

  • CANCELLED: 在同步隊列中等待的線程等待逾時或被中斷,需要從同步隊列中取消該 Node 的結點, 其結點的 waitStatus 為 CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化
  • SIGNAL: 隻要前置節點釋放鎖,就會通知辨別為 SIGNAL 狀态的後續節點的線程
  • CONDITION:和 Condition 有關系,後續會講解
  • PROPAGATE:共享模式下,PROPAGATE 狀态的線程處于可運作狀态
  • 0:初始狀态

這個方法的主要作用是,通過 Node 的狀态來判斷,ThreadA 競争鎖失敗以後是否應該被挂起。

  1. 如果 ThreadA 的 pred 節點狀态為 SIGNAL,那就表示可以放心挂起目前線程
  2. 通過循環掃描連結清單把 CANCELLED 狀态的節點移除
  3. 修改 pred 節點的狀态為 SIGNAL,傳回 false.

傳回 false 時,也就是不需要挂起,傳回 true,則需要調用 parkAndCheckInterrupt挂起目前線程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前置節點的waitStatus
    int ws = pred.waitStatus;
    //如果前置節點為SIGNAL,意味着隻需要等待其他前置節點的線程被釋放,
    if (ws == Node.SIGNAL)
       //傳回true,意味着可以直接放心的挂起了
        return true;
    //ws大于0,意味着prev節點取消了排隊,直接移除這個節點就行    
    if (ws > 0) {           
        do {
            node.prev = pred = pred.prev;
            // //這裡采用循環,移除CANCELLED的節點
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //利用cas設定prev節點的狀态為SIGNAL(-1)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}           

parkAndCheckInterrupt

使用 LockSupport.park 挂起目前線程程式設計 WATING 狀态

Thread.interrupted,傳回目前線程是否被其他線程觸發過中斷請求,也就是

thread.interrupt(); 如果有觸發過中斷請求,那麼這個方法會傳回目前的中斷辨別true,并且對中斷辨別進行複位辨別已經響應過了中斷請求。如果傳回 true,意味着在 acquire 方法中會執行 selfInterrupt()。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

           

2.3、鎖的釋放流程

ReentrantLock.unlock

public final boolean release(int arg) {
    if (tryRelease(arg)) {
       // 拿到head節點
        Node h = head;
       //如果head節點不為空并且狀态!=0.調用unparkSuccessor(h)
       // 喚醒後續節點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}           

ReentrantLock.tryRelease

這個方法可以認為是一個設定鎖狀态的操作,通過将 state 狀态減掉傳入的參數值(參數是 1),如果結果狀态為 0,就将排它鎖的 Owner 設定為 null,以使得其它的線程有機會進行執行。

在排它鎖中,加鎖的時候狀态會增加 1(當然可以自己修改這個值),在解鎖的時候減掉 1,同一個鎖,在可以重入後,可能會被疊加為 2、3、4 這些值,隻有 unlock()的次數與 lock()的次數對應才會将 Owner 線程設定為空,而且也隻有這種情況下才會傳回 true。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}           

unparkSuccessor

private void unparkSuccessor(Node node) {
    //獲得head節點狀态
    int ws = node.waitStatus;
    if (ws < 0)
    // 設定head節點狀态為0
        compareAndSetWaitStatus(node, ws, 0);
    // 拿到head的下一節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    //如果下一個節點為 null 或者 status>0 表示 cancelled 狀态. 
    //通過從尾部節點開始掃描,找到距離head最近的一個waitStatus<=0 的節點
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //next 節點不為空,直接喚醒這個線程即可
    if (s != null)
        LockSupport.unpark(s.thread);
}           

為什麼在釋放鎖的時候是從 tail 進行掃描

node.prev = t;
if (compareAndSetTail(t, node)) {
      t.next = node;
      return t;
}           

因為在之前enq添加節點方法中,在 cas 操作之後,t.next=node 操作之前。存在其他線程調用 unlock 方法從 head開始往後周遊,由于 t.next=node 還沒執行意味着連結清單的關系還沒有建立完整。就會導緻周遊到 t 節點的時候被中斷。是以從後往前周遊,一定不會存在這個問題。

原本挂起的線程繼續執行

通過 ReentrantLock.unlock,原本挂起的線程被喚醒以後繼續執行,應該從哪裡執行呢。原來被挂起的線程是在 acquireQueued 方法中,是以被喚醒以後繼續從這個方法開始執行

AQS.acquireQueued

這個方法前面已經完整分析過了,我們隻關注一下 ThreadB 被喚醒以後的執行流程。由于 ThreadB 的 prev 節點指向的是 head,并且 ThreadA 已經釋放了鎖。是以這個時候調用 tryAcquire 方法時,就可以順利搶占到鎖

2.4、公平鎖和非公平鎖的差別

鎖的公平性是相對于擷取鎖的順序而言的,如果是一個公平鎖,那麼鎖的擷取順序就應該符合請求的絕對時間順序,也就是 FIFO。隻要CAS 設定同步狀态成功,則表示目前線程擷取了鎖,而公平鎖則不一樣,差異點有兩個

FairSync.tryAcquire

非公平鎖在擷取鎖的時候,會先通過 CAS 進行搶占,而公平鎖則不會

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}           

這個方法與 nonfairTryAcquire(int acquires)比較,不同的地方在于判斷條件多了hasQueuedPredecessors()方法,也就是加入了[同步隊列中目前節點是否有前驅節點]的判斷,如果該方法傳回 true,則表示有線程比目前線程更早地請求擷取鎖,是以需要等待前驅線程擷取并釋放鎖之後才能繼續擷取鎖。

繼續閱讀