天天看點

并發工具類:ReadWriteLock是如何做到讀讀并行的?

并發工具類:ReadWriteLock是如何做到讀讀并行的?

ReadWriteLock的特點

當我們想保證并發安全的時候,我們可以使用ReentrantLock或者synchronized。這樣就能做到寫寫互斥,讀寫互斥,讀讀互斥。

鑒于大多數業務場景中都是讀多寫少,我們有沒有可能做到讀讀并行呢?還真可以,這個類就是ReadWriteLock

@Test
public void testLock() throws IOException {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    Thread thread1 = new Thread(() -> {
        readLock.lock();
        System.out.println("thread1 read lock " + System.currentTimeMillis());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread1 read unlock " + System.currentTimeMillis());
        readLock.unlock();
    });
    Thread thread2 = new Thread(() -> {
        readLock.lock();
        System.out.println("thread2 read lock " + System.currentTimeMillis());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread2 read unlock " + System.currentTimeMillis());
        readLock.unlock();
    });
    Thread thread3 = new Thread(() -> {
        writeLock.lock();
        System.out.println("thread3 write lock " + System.currentTimeMillis());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread3 write unlock " + System.currentTimeMillis());
        writeLock.unlock();
    });
    thread1.start();
    thread2.start();
    thread3.start();
    System.in.read();
}      

執行結果

thread1 read lock 1646210521360
thread2 read lock 1646210521360
thread1 read unlock 1646210522362
thread2 read unlock 1646210522362
thread3 write lock 1646210522362
thread3 write unlock 1646210523367      

從上面的執行結果,我們可以看到讀鎖和寫鎖互斥,但是讀鎖和讀鎖可以并行

并發工具類:ReadWriteLock是如何做到讀讀并行的?

和ReentrantLock類似ReadWriteLock也分為公平鎖和非公平鎖。到現在估計你也能猜出來公平性和非公平性展現在哪了!

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}      

從ReadWriteLock的行為我們可以猜到,寫鎖是互斥鎖,讀鎖是共享鎖,但是AQS中隻提供了一個state變量來表示鎖的狀态。

我們如何用一個變量來存儲兩種鎖的狀态呢?

在ReadWriteLock中是這樣做的,state變量的高16位表示讀鎖的狀态,低16位表示寫鎖的狀态

并發工具類:ReadWriteLock是如何做到讀讀并行的?

擷取寫鎖

鑒于寫鎖的實作比較簡單,我們就先看寫鎖的實作,再看讀鎖的實作

// WriteLock
public void lock() {
    sync.acquire(1);
}      
// AQS
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}      

上面的代碼我們在AQS中已經分析過了,不再分析了,直接分析加鎖的邏輯

// Sync
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 擷取寫鎖的值
    int w = exclusiveCount(c);
    if (c != 0) {
        // state不為0,寫鎖為0,說明讀鎖不為0
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 1. 讀鎖不為0
        // 2. 寫鎖不為0,并且擷取寫鎖的線程不是目前線程,則寫鎖加鎖失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 超過寫鎖能表示的最大擷取次數
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 寫鎖重入
        setState(c + acquires);
        return true;
    }
    // 沒有被加鎖,先看看是否需要排隊
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 獲鎖成功,執行業務邏輯
    setExclusiveOwnerThread(current);
    return true;
}      

在這裡我們先引入2個概念

鎖更新:同一個線程先申請讀鎖,再申請寫鎖,此時能正确申請到寫鎖

鎖降低:同一個線程先申請寫鎖,再申請讀鎖,此時能正确申請到讀鎖

從上面的源碼中我們可以看到申請寫鎖的時候,隻要有讀鎖就會失敗,是以ReadWriteLock并不支援鎖更新

加鎖時公平鎖和非公平鎖的邏輯和ReentrantLock一樣

static final class NonfairSync extends Sync {
    // 非公平模式,直接cas去搶鎖,搶不到再排隊
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
}

static final class FairSync extends Sync {
    // 同步隊列中有線程則去排隊
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
}      

釋放寫鎖

// WriteLock
public void unlock() {
    sync.release(1);
}      
// AQS
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}      

直接看釋放鎖的邏輯

// Sync
protected final boolean tryRelease(int releases) {
    // 解鎖的線程和擷取鎖的線程不一樣
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    // 寫鎖是可重入的,判斷所有的寫鎖是否都被釋放
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}      

将寫鎖的加鎖次數減一,因為寫鎖是可重入的。當寫鎖都被釋放時,喚醒同步隊列中的線程,否則隻是修改次數

擷取讀鎖

// ReadLock
public void lock() {
    sync.acquireShared(1);
}      
// AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}      

直接看加鎖的邏輯

// Sync
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 寫鎖已經被持有,并且不是持有鎖的線程不是目前線程
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    // 是否需要排隊
    // 是否超過能表示的加鎖次數
    // cas加鎖
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // 第一個擷取讀鎖
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 讀鎖重入
            firstReaderHoldCount++;
        } else {
            // cachedHoldCounter用來儲存最後一個擷取讀鎖的線程
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        // 從 AQS中acquireShared方法可以知道大于0表示擷取到鎖
        return 1;
    }
    // 自旋擷取讀鎖
    return fullTryAcquireShared(current);
}      

當我們加讀鎖的時候,如果有寫鎖并且不是目前線程就會加鎖失敗。如果有寫鎖并且是目前線程那麼可以正常擷取讀鎖,是以ReadWriteLock是支援鎖降級的

firstReader,cachedHoldCounter等隻是一些統計變量,例如讀鎖的擷取次數,對主流程影響不大,不展開分析了

釋放讀鎖

// ReadLock
public void unlock() {
    sync.releaseShared(1);
}      
// AQS
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}      
// Sync
protected final boolean tryReleaseShared(int unused) {

    // 省略部分無關代碼
    
    for (;;) {
        int c = getState();
        // 将讀鎖次數減1
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // nextc == 0表示讀鎖和寫鎖都被釋放了
            return nextc == 0;
    }
}      

通過CAS不斷減少讀鎖的加鎖次數。

總結

讀取是擷取共享鎖,在擷取讀鎖之前會先判斷寫鎖是否被擷取,如果寫鎖被目前線程擷取或者沒有寫鎖,則擷取讀鎖成功,否則擷取讀鎖失敗(支援鎖降級)