天天看點

圖解ReentrantLock公平鎖和非公平鎖實作

作者:網際網路進階架構師

概述

ReentrantLock是Java并發中十分常用的一個類,具備類似synchronized鎖的作用。但是相比synchronized, 它具備更強的能力,同時支援公平鎖和非公平鎖。

公平鎖: 指多個線程按照申請鎖的順序來擷取鎖,線程直接進入隊列中排隊,隊列中的第一個線程才能獲得鎖。

非公平鎖: 多個線程加鎖時直接嘗試擷取鎖,能搶到鎖到直接占有鎖,搶不到才會到等待隊列的隊尾等待。

那ReentrantLock中具體是怎麼實作公平和非公鎖的呢?它們之間又有什麼優缺點呢?本文就帶大家一探究竟。

RenentrantLock原理概述

圖解ReentrantLock公平鎖和非公平鎖實作

上面是RenentrantLock的類結構圖。

  • RenentrantLock實作了Lock接口,Lock接口提供了鎖的通用api,比如加鎖lock,解鎖unlock等操作。
  • RenentrantLock底層加鎖是通過AQS實作的,兩個内部類FairSync服務于公平鎖,NofaireSync服務于非公平鎖的實作,他們統一繼承自AQS。

ReentrantLock 類 API:

  • public void lock():獲得鎖

如果鎖沒有被另一個線程占用,則将鎖定計數設定為 1

如果目前線程已經保持鎖定,則保持計數增加 1

如果鎖被另一個線程保持,則目前線程被禁用線程排程,并且在鎖定已被擷取之前處于休眠狀态

  • public void unlock():嘗試釋放鎖

如果目前線程是該鎖的持有者,則保持計數遞減

如果保持計數現在為零,則鎖定被釋放

如果目前線程不是該鎖的持有者,則抛出異常

關于AQS的原理, 強烈大家閱讀深入淺出了解Java并發AQS的獨占鎖模式

非公平鎖實作

示範

@Test
public void testUnfairLock() throws InterruptedException {
    // 無參構造函數,預設建立非公平鎖模式
    ReentrantLock reentrantLock = new ReentrantLock();

    for (int i = 0; i < 10; i++) {
        final int threadNum = i;
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("線程" + threadNum + "擷取鎖");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // finally中解鎖
                reentrantLock.unlock();
                System.out.println("線程" + threadNum +"釋放鎖");
            }
        }).start();
        Thread.sleep(999);
    }

    Thread.sleep(100000);
}           

運作結果:

線程0擷取鎖
線程0釋放鎖
線程1擷取鎖
線程1釋放鎖
線程3擷取鎖
線程3釋放鎖
線程2擷取鎖
線程2釋放鎖
線程5擷取鎖
線程5釋放鎖
線程4擷取鎖
線程4釋放鎖
....           
  • 預設構造函數建立的是非公平鎖
  • 運作結果可以看到線程3優先于線程2擷取鎖(這個結果是人為造的,很難模拟出來)。

加鎖原理

1.構造函數建立鎖對象

public ReentrantLock() {
 	sync = new NonfairSync();
}           
  • 預設構造函數,建立了NonfairSync,非公平鎖同步器,它是繼承自AQS.

2.第一個線程加鎖時,不存在競争,如下圖:

// ReentrantLock.NonfairSync#lock
final void lock() {
    // 用 cas 嘗試(僅嘗試一次)将 state 從 0 改為 1, 如果成功表示【獲得了獨占鎖】
    if (compareAndSetState(0, 1))
        // 設定目前線程為獨占線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);//失敗進入
}           
圖解ReentrantLock公平鎖和非公平鎖實作
  • cas修改state從0到1,擷取鎖
  • 設定鎖對象的線程為目前線程

3.第二個線程申請加鎖時,出現鎖競争,如下圖:

圖解ReentrantLock公平鎖和非公平鎖實作
  • Thread-1 執行,CAS 嘗試将 state 由 0 改為 1,結果失敗(第一次),進入 acquire 邏輯
// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    // tryAcquire 嘗試擷取鎖失敗時, 會調用 addWaiter 将目前線程封裝成node入隊,acquireQueued 阻塞目前線程,
    // acquireQueued 傳回 true 表示挂起過程中線程被中斷喚醒過,false 表示未被中斷過
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果線程被中斷了邏輯來到這,完成一次真正的打斷效果
        selfInterrupt();
}           
  • 調用tryAcquire方法嘗試擷取鎖,這裡由子類NonfairSync實作。
  • 如果tryAcquire擷取鎖失敗,通過addWaiter方法将目前線程封裝成節點,入隊
  • acquireQueued方法會将目前線程阻塞
// ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
// 搶占成功傳回 true,搶占失敗傳回 false
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // state 值
    int c = getState();
    // 條件成立說明目前處于【無鎖狀态】
    if (c == 0) {
        //如果還沒有獲得鎖,嘗試用cas獲得,這裡展現非公平性: 不去檢查 AQS 隊列是否有阻塞線程直接擷取鎖        
    	if (compareAndSetState(0, acquires)) {
            // 擷取鎖成功設定目前線程為獨占鎖線程。
            setExclusiveOwnerThread(current);
            return true;
         }    
	} 
    // 這部分是重入鎖的原理    
   	// 如果已經有線程獲得了鎖, 獨占鎖線程還是目前線程, 表示【發生了鎖重入】
	else if (current == getExclusiveOwnerThread()) {
        // 更新鎖重入的值
        int nextc = c + acquires;
        // 越界判斷,當重入的深度很深時,會導緻 nextc < 0,int值達到最大之後再 + 1 變負數
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新 state 的值,這裡不使用 cas 是因為目前線程正在持有鎖,是以這裡的操作相當于在一個管程内
        setState(nextc);
        return true;
    }
    // 擷取失敗
    return false;
}           
  • 正是這個方法展現了非公平鎖,在nonfairTryAcquire如果發現state=0,無鎖的情況,它會忽略隊列中等待的線程,優先擷取一次鎖,相當于"插隊"。

4.第二個線程tryAcquire申請鎖失敗,通過執行addWaiter方法加入到隊列中。

圖解ReentrantLock公平鎖和非公平鎖實作
  • 圖中黃色三角表示該 Node 的 waitStatus 狀态,其中 0 為預設正常狀态
  • Node 的建立是懶惰的,其中第一個 Node 稱為 Dummy(啞元)或哨兵,用來占位,并不關聯線程。
// AbstractQueuedSynchronizer#addWaiter,傳回目前線程的 node 節點
private Node addWaiter(Node mode) {
    // 将目前線程關聯到一個 Node 對象上, 模式為獨占模式   
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 快速入隊,如果 tail 不為 null,說明存在阻塞隊列
    if (pred != null) {
        // 将目前節點的前驅節點指向 尾節點
        node.prev = pred;
        // 通過 cas 将 Node 對象加入 AQS 隊列,成為尾節點,【尾插法】
        if (compareAndSetTail(pred, node)) {
            pred.next = node;// 雙向連結清單
            return node;
        }
    }
    // 初始時隊列為空,或者 CAS 失敗進入這裡
    enq(node);
    return node;
}           
// AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
    // 自旋入隊,必須入隊成功才結束循環
    for (;;) {
        Node t = tail;
        // 說明目前鎖被占用,且目前線程可能是【第一個擷取鎖失敗】的線程,【還沒有建立隊列】
        if (t == null) {
            // 設定一個【啞元節點】,頭尾指針都指向該節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 自旋到這,普通入隊方式,首先指派尾節點的前驅節點【尾插法】
            node.prev = t;
            // 【在設定完尾節點後,才更新的原始尾節點的後繼節點,是以此時從前往後周遊會丢失尾節點】
            if (compareAndSetTail(t, node)) {
                //【此時 t.next  = null,并且這裡已經 CAS 結束,線程并不是安全的】
                t.next = node;
                return t;	// 傳回目前 node 的前驅節點
            }
        }
    }
}           

5.第二個線程加入隊列後,現在要做的是想辦法阻塞線程,不讓它執行,就看acquireQueued的了。

圖解ReentrantLock公平鎖和非公平鎖實作
  • 圖中黃色三角表示該 Node 的 waitStatus 狀态,0 為預設正常狀态, 但是-1狀态表示它肩負喚醒下一個節點的線程。
  • 灰色表示線程阻塞了。
inal boolean acquireQueued(final Node node, int arg) {
    // true 表示目前線程搶占鎖失敗,false 表示成功
    boolean failed = true;
    try {
        // 中斷标記,表示目前線程是否被中斷
        boolean interrupted = false;
        for (;;) {
            // 獲得目前線程節點的前驅節點
            final Node p = node.predecessor();
            // 前驅節點是 head, FIFO 隊列的特性表示輪到目前線程可以去擷取鎖
            if (p == head && tryAcquire(arg)) {
                // 擷取成功, 設定目前線程自己的 node 為 head
                setHead(node);
                p.next = null; // help GC
                // 表示搶占鎖成功
                failed = false;
                // 傳回目前線程是否被中斷
                return interrupted;
            }
            // 判斷是否應當 park,傳回 false 後需要新一輪的循環,傳回 true 進入條件二阻塞線程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                // 條件二傳回結果是目前線程是否被打斷,沒有被打斷傳回 false 不進入這裡的邏輯
                // 【就算被打斷了,也會繼續循環,并不會傳回】
                interrupted = true;
        }
    } finally {
        // 【可打斷模式下才會進入該邏輯】
        if (failed)
            cancelAcquire(node);
    }
}           
  • acquireQueued 會在一個自旋中不斷嘗試獲得鎖,失敗後進入 park 阻塞
  • 如果目前線程是在 head 節點後,也就是第一個節點,又會直接多一次機會 tryAcquire 嘗試擷取鎖,如果還是被占用,會傳回失敗。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 表示前置節點是個可以喚醒目前節點的節點,傳回 true
    if (ws == Node.SIGNAL)
        return true;
    // 前置節點的狀态處于取消狀态,需要【删除前面所有取消的節點】, 傳回到外層循環重試
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 擷取到非取消的節點,連接配接上目前節點
        pred.next = node;
    // 預設情況下 node 的 waitStatus 是 0,進入這裡的邏輯
    } else {
        // 【設定上一個節點狀态為 Node.SIGNAL】,傳回外層循環重試
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 傳回不應該 park,再次嘗試一次
    return false;
}           
  • shouldParkAfterFailedAcquire發現前驅節點等待狀态是-1, 傳回true,表示需要阻塞。
  • shouldParkAfterFailedAcquire發現前驅節點等待狀态大于0,說明是無效節點,會進行清理。
  • shouldParkAfterFailedAcquire發現前驅節點等待狀态等于0,将前驅 node 的 waitStatus 改為 -1,傳回 false。
private final boolean parkAndCheckInterrupt() {
    // 阻塞目前線程,如果打斷标記已經是 true, 則 park 會失效
    LockSupport.park(this);
    // 判斷目前線程是否被打斷,清除打斷标記
    return Thread.interrupted();
}           
  • 通過不斷自旋嘗試擷取鎖,最終前驅節點的等待狀态為-1的時候,進行阻塞目前線程。
  • 通過調用LockSupport.park方法進行阻塞。

6.多個線程嘗試擷取鎖,競争失敗後,最終形成下面的圖形。

圖解ReentrantLock公平鎖和非公平鎖實作

釋放鎖原理

1.第一個線程通過調用unlock方法釋放鎖。

public void unlock() {
    sync.release(1);
}
複制代碼           
  • 最終調用的是同步器的release方法。
圖解ReentrantLock公平鎖和非公平鎖實作
  • 設定鎖定的線程exclusiveOwnerThread為null
  • 設定鎖的state為0
// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    // 嘗試釋放鎖,tryRelease 傳回 true 表示目前線程已經【完全釋放鎖,重入的釋放了】
    if (tryRelease(arg)) {
        // 隊列頭節點
        Node h = head;
        // 頭節點什麼時候是空?沒有發生鎖競争,沒有競争線程建立啞元節點
        // 條件成立說明阻塞隊列有等待線程,需要喚醒 head 節點後面的線程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }    
    return false;
}           
  • 進入 tryRelease,設定 exclusiveOwnerThread 為 null,state = 0
  • 目前隊列不為 null,并且 head 的 waitStatus = -1,進入 unparkSuccessor, 喚醒阻塞的線程

2.線程一通過調用tryRelease方法釋放鎖,該類的實作是在子類中

// ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
    // 減去釋放的值,可能重入
    int c = getState() - releases;
    // 如果目前線程不是持有鎖的線程直接報錯
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否已經完全釋放鎖
    boolean free = false;
    // 支援鎖重入, 隻有 state 減為 0, 才完全釋放鎖成功
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 目前線程就是持有鎖線程,是以可以直接更新鎖,不需要使用 CAS
    setState(c);
    return free;
}
複制代碼           
  • 修改鎖資源的state

3.喚醒隊列中第一個線程Thread1

private void unparkSuccessor(Node node) {
    // 目前節點的狀态
    int ws = node.waitStatus;    
    if (ws < 0)        
        // 【嘗試重置狀态為 0】,因為目前節點要完成對後續節點的喚醒任務了,不需要 -1 了
        compareAndSetWaitStatus(node, ws, 0);    
    // 找到需要 unpark 的節點,目前節點的下一個    
    Node s = node.next;    
    // 已取消的節點不能喚醒,需要找到距離頭節點最近的非取消的節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        // AQS 隊列【從後至前】找需要 unpark 的節點,直到 t == 目前的 node 為止,找不到就不喚醒了
        for (Node t = tail; t != null && t != node; t = t.prev)
            // 說明目前線程狀态需要被喚醒
            if (t.waitStatus <= 0)
                // 置換引用
                s = t;
    }
    // 【找到合适的可以被喚醒的 node,則喚醒線程】
    if (s != null)
        LockSupport.unpark(s.thread);
}           
  • 從後往前找到隊列中距離 head 最近的一個沒取消的 Node,unpark 恢複其運作,本例中即為 Thread-1
  • thread1活了,開始重新去擷取鎖,也就是前面acquireQueued中的流程。

為什麼這裡查找喚醒的節點是從後往前,而不是從前往後呢?

從後向前的喚醒的原因:enq 方法中,節點是尾插法,首先指派的是尾節點的前驅節點,此時前驅節點的 next 并沒有指向尾節點,從前周遊會丢失尾節點。

4.Thread1恢複執行流程

圖解ReentrantLock公平鎖和非公平鎖實作
  • 喚醒的Thread-1 線程會從 park 位置開始執行,如果加鎖成功(沒有競争),設定了exclusiveOwnerThread為Thread-1, state=1。
  • head 指向剛剛 Thread-1 所在的 Node,該 Node 會清空 Thread
  • 原本的 head 因為從連結清單斷開,而可被垃圾回收
圖解ReentrantLock公平鎖和非公平鎖實作

5.另一種可能,突然來了Thread-4來競争,展現非公平鎖

如果這時有其它線程來競争鎖,例如這時有 Thread-4 來了并搶占了鎖,很有可能搶占成功。

圖解ReentrantLock公平鎖和非公平鎖實作
  • Thread-4 被設定為 exclusiveOwnerThread,state = 1
  • Thread-1 再次進入 acquireQueued 流程,擷取鎖失敗,重新進入 park 阻塞

公平鎖實作

示範

@Test
public void testfairLock() throws InterruptedException {
    // 有參構造函數,true表示公平鎖,false表示非公平鎖
    ReentrantLock reentrantLock = new ReentrantLock(true);

    for (int i = 0; i < 10; i++) {
        final int threadNum = i;
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("線程" + threadNum + "擷取鎖");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // finally中解鎖
                reentrantLock.unlock();
                System.out.println("線程" + threadNum +"釋放鎖");
            }
        }).start();
        Thread.sleep(10);
    }

    Thread.sleep(100000);
}           

運作結果:

線程0擷取鎖
線程0釋放鎖
線程1擷取鎖
線程1釋放鎖
線程2擷取鎖
線程2釋放鎖
線程3擷取鎖
線程3釋放鎖
線程4擷取鎖
線程4釋放鎖
線程5擷取鎖
線程5釋放鎖
線程6擷取鎖
線程6釋放鎖
線程7擷取鎖
線程7釋放鎖
線程8擷取鎖
線程8釋放鎖
線程9擷取鎖
線程9釋放鎖           
  • ReentrantLock有參構造函數,true表示公平鎖,false表示非公平鎖
  • 觀察運作結果,所有擷取鎖的過程都是根據申請鎖的時間保持一緻。

原理實作

公平鎖和非公鎖的整體流程基本是一緻的,唯一不同的是嘗試擷取鎖tryAcquire的實作。

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 先檢查 AQS 隊列中是否有前驅節點, 沒有(false)才去競争
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 鎖重入
        return false;
    }
}           
public final boolean hasQueuedPredecessors() {    
    Node t = tail;
    Node h = head;
    Node s;    
    // 頭尾指向一個節點,連結清單為空,傳回false
    return h != t &&
        // 頭尾之間有節點,判斷頭節點的下一個是不是空
        // 不是空進入最後的判斷,第二個節點的線程是否是本線程,不是傳回 true,表示目前節點有前驅節點
        ((s = h.next) == null || s.thread != Thread.currentThread());
}           

與非公平鎖最大的差別是:公平鎖擷取鎖的時候先檢查 AQS 隊列中是否有非目前線程的等待節點,沒有才去 CAS 競争,有的話,就老老實實排隊去吧。而非公平鎖會嘗試搶一次鎖,如果搶不到的話,老老實實排隊去吧。

總結

非公平鎖和公平鎖的兩處不同:

  1. 非公平鎖在調用 lock 後,首先就會調用 CAS 進行一次搶鎖,如果這個時候恰巧鎖沒有被占用,那麼直接就擷取到鎖傳回了。
  2. 非公平鎖在 CAS 失敗後,和公平鎖一樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,如果發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,但是公平鎖會判斷等待隊列是否有線程處于等待狀态,如果有則不去搶鎖,乖乖排到後面。

公平鎖和非公平鎖就這兩點差別,如果這兩次 CAS 都不成功,那麼後面非公平鎖和公平鎖是一樣的,都要進入到阻塞隊列等待喚醒。

相對來說,非公平鎖會有更好的性能,因為它的吞吐量比較大。當然,非公平鎖讓擷取鎖的時間變得更加不确定,可能會導緻在阻塞隊列中的線程長期處于饑餓狀态。

作者:JAVA旭陽

連結:https://juejin.cn/post/7153270781084958750

來源:稀土掘金

繼續閱讀