天天看點

并發-AQS之ReentrantReadWriteLock源碼解讀(一)

作者:程式猿凱撒

ReentrantReadWriteLock是Java中的一個可重入讀寫鎖,它允許多個線程同時讀取一個共享資源,但隻允許一個線程寫入該共享資源,當一個線程持有寫鎖時,任何其他線程都不能持有讀或寫鎖。

該鎖具有以下特點:

  1. 可重入性:線程可以多次獲得同一個鎖。
  2. 公平性:可選擇公平或非公平模式。
  3. 讀寫分離:支援多個線程同時讀取共享資源,但隻允許一個線程寫入共享資源。
  4. 鎖降級:寫鎖可以降級為讀鎖,但讀鎖不能更新為寫鎖。

使用ReentrantReadWriteLock可以提高并發通路性能,因為多個線程可以同時讀取共享資源,而不會互相幹擾。但是,在寫操作期間,所有讀和寫的通路都會被暫停,直到寫操作完成。 其實作較為複雜,UML類圖如下

并發-AQS之ReentrantReadWriteLock源碼解讀(一)

這張圖更為形象

并發-AQS之ReentrantReadWriteLock源碼解讀(一)

讀寫鎖特性如下

是否互斥

如何實作讀寫鎖,下面我們來依據源碼一一道來

構造函數

java複制代碼//預設構造方法
public ReentrantReadWriteLock() {
    this(false);
}
           
java複制代碼//是否使用公平鎖的構造方法
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}
           

成員變量設定如下

java複制代碼//讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
//寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
//繼承AQS鎖實作
final Sync sync;
           

Sync

Sync是ReentrantReadWriteLock的内部靜态類,它是ReentrantReadWriteLock的核心實作。Sync繼承自AbstractQueuedSynchronizer(AQS),并重寫了其方法來實作讀寫鎖的語義。

java複制代碼abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;
    //表示共享鎖狀态在鎖狀态中占用的二進制位數(16位),即共享鎖狀态的左移位數
    static final int SHARED_SHIFT   = 16;
    //表示每個共享鎖狀态的數量(1 << SHARED_SHIFT)
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    //表示共享鎖狀态的最大數量((1 << SHARED_SHIFT) - 1)。
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    //傳回一個掩碼,用于提取鎖狀态中的排它鎖狀态(即低16位)
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    ...
           

量和方法的作用是将鎖狀态分成兩部分:高16位表示讀鎖狀态,低16位表示寫鎖狀态。

這種方式,可以高效地實作讀寫分離的鎖機制。在擷取和釋放鎖時,可以根據不同的狀态進行不同的處理,以保證鎖的正确性和高效性。 Sync有兩個實作,非公平鎖和公平鎖

java複制代碼//非公平鎖
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}
           
java複制代碼//公平鎖
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
           

ReadLock、WriteLock

java複制代碼//讀鎖
public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    ...
           
java複制代碼//寫鎖
public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
           

讀鎖和寫鎖是私有屬性,通過這兩個方法暴露出去

java複制代碼public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
           

非公平鎖讀寫鎖加鎖解鎖

讀鎖ReadLock

加鎖

擷取讀鎖。

如果寫鎖沒有被另一個線程持有,則立即擷取讀鎖并傳回。

如果寫鎖被另一個線程持有,則目前線程為了線程排程目的變為無效狀态,并處于休眠狀态,直到讀鎖被擷取。

lock()

java複制代碼//ReentrantReadWriteLock.ReadLock
public void lock() {
    sync.acquireShared(1);
}
           

共享狀态擷取鎖,acquireShared是AQS方法

java複制代碼// AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
    // 嘗試擷取共享鎖(傳回1表示成功,傳回-1表示失敗)
    if (tryAcquireShared(arg) < 0)
        // 失敗了就可能要排隊等待
        doAcquireShared(arg);
}
           

下面主要解讀ReentrantReadWriteLock.Sync中重寫的tryAcquireShared方法

java複制代碼//ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    //// 狀态變量的值 在讀寫鎖模式下,高16位存儲的是共享鎖(讀鎖)被擷取的次數,低16位存儲的是互斥鎖(寫鎖)被擷取的次數
    int c = getState();
    //寫鎖本占用,判斷是否被目前線程占用,如果不是直接傳回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //讀鎖擷取次數
    int r = sharedCount(c);
    //讀鎖不需要block且讀鎖次數小于最大值,嘗試更新state值
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //擷取讀鎖成功
        //如果之前還沒有線程擷取讀鎖
        if (r == 0) {
            // 記錄第一個讀者為目前線程
            firstReader = current;
            // 第一個讀者重入的次數為1
            firstReaderHoldCount = 1;
        // 如果有線程擷取了讀鎖且是目前線程
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // 如果有線程擷取了讀鎖且目前線程不是第一個
            // 則從緩存中擷取重入次數儲存器
            HoldCounter rh = cachedHoldCounter;
            // 如果緩存不屬性目前線程 再從ThreadLocal中擷取
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 如果rh的次數為0,把它放到ThreadLocal中去
                readHolds.set(rh);
            // 重入的次數加1(初始次數為0)
            rh.count++;
        }
        // 擷取讀鎖成功,傳回1
        return 1;
    }
    // 通過這個方法再去嘗試擷取讀鎖(如果之前其它線程擷取了寫鎖,一樣傳回-1表示失敗)
    return fullTryAcquireShared(current);
}
           

exclusiveCount是寫鎖擷取次數

java複制代碼static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
           

sharedCount是讀鎖擷取次數

java複制代碼static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
           

readerShouldBlock是判斷擷取讀鎖是否需要block

java複制代碼// AbstractQueuedSynchronizer
abstract boolean readerShouldBlock();
           

我們看非公平鎖的實作

arduino複制代碼//ReentrantReadWriteLock.NonfairSync
final boolean readerShouldBlock() {
    //該方法檢查在隊列中是否存在等待的寫入線程,如果存在,則傳回true
    return apparentlyFirstQueuedIsExclusive();
}
           
java複制代碼//AbstractQueuedSynchronizer
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // 擷取目前的頭結點。
    // 如果頭結點不為null,則擷取頭結點的下一個結點。
    // 如果下一個結點不為null,并且該結點不是共享結點,且該結點的線程不為null,則傳回true;否則傳回false。
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}
           

HoldCounter readHolds定義如下

java複制代碼private transient HoldCounter cachedHoldCounter;
private transient ThreadLocalHoldCounter readHolds;
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}
           

fullTryAcquireShared嘗試在此擷取共享鎖,法的實作與 tryAcquireShared 方法中的代碼在一定程度上重複,但是此方法不會在重試和惰性讀取保持計數之間增加複雜性,是以更加簡單。

java複制代碼//ReentrantReadWriteLock.Sync
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
           

後面doAcquireShared共享模式下是否需要等待,前文已經分析,具體看下這篇

并發-AQS之Semaphore源碼解讀

lockInterruptibly()

中斷擷取鎖

java複制代碼//ReentrantReadWriteLock.ReadLock
public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
           

acquireSharedInterruptibly增加了線程中斷相關邏輯,擷取共享鎖的方式沒變

java複制代碼//AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
           

doAcquireSharedInterruptibly前文也分析過,可看下 并發-AQS之Semaphore源碼解讀

嘗試擷取鎖

tryLock()

java複制代碼//ReentrantReadWriteLock.ReadLock
public boolean tryLock() {
    return sync.tryReadLock();
}
           

tryReadLock方法邏輯與fullTryAcquireShared類似,不再贅述

java複制代碼//ReentrantReadWriteLock.Sync
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}
           

tryLock(long timeout, TimeUnit unit)

java複制代碼//ReentrantReadWriteLock.ReadLock
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
           
java複制代碼//AbstractQueuedSynchronizer
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}
           

doAcquireSharedNanos前文也分析過,可看下

并發-AQS之Semaphore源碼解讀

解鎖

unlock()

java複制代碼//ReentrantReadWriteLock.ReadLock
public void unlock() {
    sync.releaseShared(1);
}
           
arduino複制代碼//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
    //如果嘗試釋放成功了(共享鎖全部釋放),就喚醒下一個節點
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
           

tryReleaseShared實作如下

java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 如果第一個讀線程是目前線程 就把它重入的次數減1 如果減到0了就把第一個讀者置為空
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 如果第一個讀者不是目前線程
        // 一樣地,把它重入的次數減1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        // 共享鎖擷取的次數減1 
        // 如果減為0了說明完全釋放了,才傳回true
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
           

doReleaseShared喚醒下一個節點,前文也分析過,可看下

并發-AQS之Semaphore源碼解讀

newCondition()

不支援

java複制代碼//ReentrantReadWriteLock.ReadLock
public Condition newCondition() {
    throw new UnsupportedOperationException();
}
           

寫鎖WriteLock

加鎖

lock()

獨占式擷取鎖

java複制代碼//ReentrantReadWriteLock.WriteLock
public void lock() {
    sync.acquire(1);
}
           
java複制代碼//AbstractQueuedSynchronizer
public final void acquire(int arg) {
    // 先嘗試擷取鎖 如果失敗,則會進入隊列中排隊,後面的邏輯跟ReentrantLock一樣
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           
java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 互斥鎖被擷取的次數
    int w = exclusiveCount(c);
    if (c != 0) {
        // 如果共享鎖被擷取的次數不為0,或者被其它線程擷取了互斥鎖(寫鎖) 那麼就傳回false,擷取寫鎖失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 溢出檢測
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 共享鎖不為0且目前線程=占用線程
        setState(c + acquires);
        return true;
    }
    // 如果c等于0,就嘗試更新state的值(非公平模式writerShouldBlock()傳回false) 
    // 如果失敗了,說明擷取寫鎖失敗,傳回false 
    // 如果成功了,說明擷取寫鎖成功,把自己設定為占有者,并傳回true
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
           

NonfairSync的writerShouldBlock直接傳回false

java複制代碼//ReentrantReadWriteLock.NonfairSync
final boolean writerShouldBlock() {
    return false; // writers can always barge
}
           

acquireQueued參考Reentrantlonk實作

并發-AQS之Reentrantlonk源碼解讀

lockInterruptibly()

csharp複制代碼//ReentrantReadWriteLock.WriteLock
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
           
java複制代碼//AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
           

doAcquireInterruptibly參考Reentrantlonk實作

并發-AQS之Reentrantlonk源碼解讀

嘗試擷取鎖

tryLock( )

java複制代碼//ReentrantReadWriteLock.WriteLock
public boolean tryLock( ) {
    return sync.tryWriteLock();
}
           
java複制代碼//ReentrantReadWriteLock.Sync
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
           

tryLock(long timeout, TimeUnit unit)

java複制代碼//ReentrantReadWriteLock.WriteLock
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
           
java複制代碼//AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
           

doAcquireNanos 參考Reentrantlonk實作

并發-AQS之Reentrantlonk源碼解讀

解鎖

unlock()

java複制代碼//ReentrantReadWriteLock.WriteLock
public void unlock() {
    sync.release(1);
}
           
java複制代碼//AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           
java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
           

其他方法

newCondition()

java複制代碼//ReentrantReadWriteLock.WriteLock
public Condition newCondition() {
    return sync.newCondition();
}
           

isHeldByCurrentThread()

java複制代碼//ReentrantReadWriteLock.WriteLock
public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
}
           
java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}
           

getHoldCount()

java複制代碼//ReentrantReadWriteLock.WriteLock
public int getHoldCount() {
    return sync.getWriteHoldCount();
}
           
java複制代碼//ReentrantReadWriteLock.Sync
final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;           

作者:仲文1992

連結:https://juejin.cn/post/7245206582002712634

繼續閱讀