ReentrantReadWriteLock是Java中的一個可重入讀寫鎖,它允許多個線程同時讀取一個共享資源,但隻允許一個線程寫入該共享資源,當一個線程持有寫鎖時,任何其他線程都不能持有讀或寫鎖。
該鎖具有以下特點:
- 可重入性:線程可以多次獲得同一個鎖。
- 公平性:可選擇公平或非公平模式。
- 讀寫分離:支援多個線程同時讀取共享資源,但隻允許一個線程寫入共享資源。
- 鎖降級:寫鎖可以降級為讀鎖,但讀鎖不能更新為寫鎖。
使用ReentrantReadWriteLock可以提高并發通路性能,因為多個線程可以同時讀取共享資源,而不會互相幹擾。但是,在寫操作期間,所有讀和寫的通路都會被暫停,直到寫操作完成。 其實作較為複雜,UML類圖如下
這張圖更為形象
讀寫鎖特性如下
是否互斥 | 讀 | 寫 |
讀 | 否 | 是 |
寫 | 是 | 是 |
如何實作讀寫鎖,下面我們來依據源碼一一道來
構造函數
java複制代碼//預設構造方法
public ReentrantReadWriteLock() {
this(false);
}
java複制代碼//是否使用公平鎖的構造方法
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
成員變量設定如下
java複制代碼//讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
//寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
//繼承AQS鎖實作
final Sync sync;
Sync
Sync是ReentrantReadWriteLock的内部靜态類,它是ReentrantReadWriteLock的核心實作。Sync繼承自AbstractQueuedSynchronizer(AQS),并重寫了其方法來實作讀寫鎖的語義。
java複制代碼abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//表示共享鎖狀态在鎖狀态中占用的二進制位數(16位),即共享鎖狀态的左移位數
static final int SHARED_SHIFT = 16;
//表示每個共享鎖狀态的數量(1 << SHARED_SHIFT)
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//表示共享鎖狀态的最大數量((1 << SHARED_SHIFT) - 1)。
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//傳回一個掩碼,用于提取鎖狀态中的排它鎖狀态(即低16位)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
...
量和方法的作用是将鎖狀态分成兩部分:高16位表示讀鎖狀态,低16位表示寫鎖狀态。
這種方式,可以高效地實作讀寫分離的鎖機制。在擷取和釋放鎖時,可以根據不同的狀态進行不同的處理,以保證鎖的正确性和高效性。 Sync有兩個實作,非公平鎖和公平鎖
java複制代碼//非公平鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
java複制代碼//公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
ReadLock、WriteLock
java複制代碼//讀鎖
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
...
java複制代碼//寫鎖
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
讀鎖和寫鎖是私有屬性,通過這兩個方法暴露出去
java複制代碼public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
非公平鎖讀寫鎖加鎖解鎖
讀鎖ReadLock
加鎖
擷取讀鎖。
如果寫鎖沒有被另一個線程持有,則立即擷取讀鎖并傳回。
如果寫鎖被另一個線程持有,則目前線程為了線程排程目的變為無效狀态,并處于休眠狀态,直到讀鎖被擷取。
lock()
java複制代碼//ReentrantReadWriteLock.ReadLock
public void lock() {
sync.acquireShared(1);
}
共享狀态擷取鎖,acquireShared是AQS方法
java複制代碼// AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
// 嘗試擷取共享鎖(傳回1表示成功,傳回-1表示失敗)
if (tryAcquireShared(arg) < 0)
// 失敗了就可能要排隊等待
doAcquireShared(arg);
}
下面主要解讀ReentrantReadWriteLock.Sync中重寫的tryAcquireShared方法
java複制代碼//ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
//// 狀态變量的值 在讀寫鎖模式下,高16位存儲的是共享鎖(讀鎖)被擷取的次數,低16位存儲的是互斥鎖(寫鎖)被擷取的次數
int c = getState();
//寫鎖本占用,判斷是否被目前線程占用,如果不是直接傳回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//讀鎖擷取次數
int r = sharedCount(c);
//讀鎖不需要block且讀鎖次數小于最大值,嘗試更新state值
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//擷取讀鎖成功
//如果之前還沒有線程擷取讀鎖
if (r == 0) {
// 記錄第一個讀者為目前線程
firstReader = current;
// 第一個讀者重入的次數為1
firstReaderHoldCount = 1;
// 如果有線程擷取了讀鎖且是目前線程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 如果有線程擷取了讀鎖且目前線程不是第一個
// 則從緩存中擷取重入次數儲存器
HoldCounter rh = cachedHoldCounter;
// 如果緩存不屬性目前線程 再從ThreadLocal中擷取
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 如果rh的次數為0,把它放到ThreadLocal中去
readHolds.set(rh);
// 重入的次數加1(初始次數為0)
rh.count++;
}
// 擷取讀鎖成功,傳回1
return 1;
}
// 通過這個方法再去嘗試擷取讀鎖(如果之前其它線程擷取了寫鎖,一樣傳回-1表示失敗)
return fullTryAcquireShared(current);
}
exclusiveCount是寫鎖擷取次數
java複制代碼static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
sharedCount是讀鎖擷取次數
java複制代碼static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
readerShouldBlock是判斷擷取讀鎖是否需要block
java複制代碼// AbstractQueuedSynchronizer
abstract boolean readerShouldBlock();
我們看非公平鎖的實作
arduino複制代碼//ReentrantReadWriteLock.NonfairSync
final boolean readerShouldBlock() {
//該方法檢查在隊列中是否存在等待的寫入線程,如果存在,則傳回true
return apparentlyFirstQueuedIsExclusive();
}
java複制代碼//AbstractQueuedSynchronizer
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 擷取目前的頭結點。
// 如果頭結點不為null,則擷取頭結點的下一個結點。
// 如果下一個結點不為null,并且該結點不是共享結點,且該結點的線程不為null,則傳回true;否則傳回false。
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
HoldCounter readHolds定義如下
java複制代碼private transient HoldCounter cachedHoldCounter;
private transient ThreadLocalHoldCounter readHolds;
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
fullTryAcquireShared嘗試在此擷取共享鎖,法的實作與 tryAcquireShared 方法中的代碼在一定程度上重複,但是此方法不會在重試和惰性讀取保持計數之間增加複雜性,是以更加簡單。
java複制代碼//ReentrantReadWriteLock.Sync
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
後面doAcquireShared共享模式下是否需要等待,前文已經分析,具體看下這篇
并發-AQS之Semaphore源碼解讀
lockInterruptibly()
中斷擷取鎖
java複制代碼//ReentrantReadWriteLock.ReadLock
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly增加了線程中斷相關邏輯,擷取共享鎖的方式沒變
java複制代碼//AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly前文也分析過,可看下 并發-AQS之Semaphore源碼解讀
嘗試擷取鎖
tryLock()
java複制代碼//ReentrantReadWriteLock.ReadLock
public boolean tryLock() {
return sync.tryReadLock();
}
tryReadLock方法邏輯與fullTryAcquireShared類似,不再贅述
java複制代碼//ReentrantReadWriteLock.Sync
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
tryLock(long timeout, TimeUnit unit)
java複制代碼//ReentrantReadWriteLock.ReadLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
java複制代碼//AbstractQueuedSynchronizer
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
doAcquireSharedNanos前文也分析過,可看下
并發-AQS之Semaphore源碼解讀
解鎖
unlock()
java複制代碼//ReentrantReadWriteLock.ReadLock
public void unlock() {
sync.releaseShared(1);
}
arduino複制代碼//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
//如果嘗試釋放成功了(共享鎖全部釋放),就喚醒下一個節點
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared實作如下
java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 如果第一個讀線程是目前線程 就把它重入的次數減1 如果減到0了就把第一個讀者置為空
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 如果第一個讀者不是目前線程
// 一樣地,把它重入的次數減1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 共享鎖擷取的次數減1
// 如果減為0了說明完全釋放了,才傳回true
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
doReleaseShared喚醒下一個節點,前文也分析過,可看下
并發-AQS之Semaphore源碼解讀
newCondition()
不支援
java複制代碼//ReentrantReadWriteLock.ReadLock
public Condition newCondition() {
throw new UnsupportedOperationException();
}
寫鎖WriteLock
加鎖
lock()
獨占式擷取鎖
java複制代碼//ReentrantReadWriteLock.WriteLock
public void lock() {
sync.acquire(1);
}
java複制代碼//AbstractQueuedSynchronizer
public final void acquire(int arg) {
// 先嘗試擷取鎖 如果失敗,則會進入隊列中排隊,後面的邏輯跟ReentrantLock一樣
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
// 互斥鎖被擷取的次數
int w = exclusiveCount(c);
if (c != 0) {
// 如果共享鎖被擷取的次數不為0,或者被其它線程擷取了互斥鎖(寫鎖) 那麼就傳回false,擷取寫鎖失敗
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 溢出檢測
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 共享鎖不為0且目前線程=占用線程
setState(c + acquires);
return true;
}
// 如果c等于0,就嘗試更新state的值(非公平模式writerShouldBlock()傳回false)
// 如果失敗了,說明擷取寫鎖失敗,傳回false
// 如果成功了,說明擷取寫鎖成功,把自己設定為占有者,并傳回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
NonfairSync的writerShouldBlock直接傳回false
java複制代碼//ReentrantReadWriteLock.NonfairSync
final boolean writerShouldBlock() {
return false; // writers can always barge
}
acquireQueued參考Reentrantlonk實作
并發-AQS之Reentrantlonk源碼解讀
lockInterruptibly()
csharp複制代碼//ReentrantReadWriteLock.WriteLock
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
java複制代碼//AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly參考Reentrantlonk實作
并發-AQS之Reentrantlonk源碼解讀
嘗試擷取鎖
tryLock( )
java複制代碼//ReentrantReadWriteLock.WriteLock
public boolean tryLock( ) {
return sync.tryWriteLock();
}
java複制代碼//ReentrantReadWriteLock.Sync
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryLock(long timeout, TimeUnit unit)
java複制代碼//ReentrantReadWriteLock.WriteLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
java複制代碼//AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos 參考Reentrantlonk實作
并發-AQS之Reentrantlonk源碼解讀
解鎖
unlock()
java複制代碼//ReentrantReadWriteLock.WriteLock
public void unlock() {
sync.release(1);
}
java複制代碼//AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
其他方法
newCondition()
java複制代碼//ReentrantReadWriteLock.WriteLock
public Condition newCondition() {
return sync.newCondition();
}
isHeldByCurrentThread()
java複制代碼//ReentrantReadWriteLock.WriteLock
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
java複制代碼//ReentrantReadWriteLock.Sync
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
getHoldCount()
java複制代碼//ReentrantReadWriteLock.WriteLock
public int getHoldCount() {
return sync.getWriteHoldCount();
}
java複制代碼//ReentrantReadWriteLock.Sync
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
作者:仲文1992
連結:https://juejin.cn/post/7245206582002712634