
ReadWriteLock的特點
當我們想保證并發安全的時候,我們可以使用ReentrantLock或者synchronized。這樣就能做到寫寫互斥,讀寫互斥,讀讀互斥。
鑒于大多數業務場景中都是讀多寫少,我們有沒有可能做到讀讀并行呢?還真可以,這個類就是ReadWriteLock
@Test
public void testLock() throws IOException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
Thread thread1 = new Thread(() -> {
readLock.lock();
System.out.println("thread1 read lock " + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1 read unlock " + System.currentTimeMillis());
readLock.unlock();
});
Thread thread2 = new Thread(() -> {
readLock.lock();
System.out.println("thread2 read lock " + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2 read unlock " + System.currentTimeMillis());
readLock.unlock();
});
Thread thread3 = new Thread(() -> {
writeLock.lock();
System.out.println("thread3 write lock " + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread3 write unlock " + System.currentTimeMillis());
writeLock.unlock();
});
thread1.start();
thread2.start();
thread3.start();
System.in.read();
}
執行結果
thread1 read lock 1646210521360
thread2 read lock 1646210521360
thread1 read unlock 1646210522362
thread2 read unlock 1646210522362
thread3 write lock 1646210522362
thread3 write unlock 1646210523367
從上面的執行結果,我們可以看到讀鎖和寫鎖互斥,但是讀鎖和讀鎖可以并行
和ReentrantLock類似ReadWriteLock也分為公平鎖和非公平鎖。到現在估計你也能猜出來公平性和非公平性展現在哪了!
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
從ReadWriteLock的行為我們可以猜到,寫鎖是互斥鎖,讀鎖是共享鎖,但是AQS中隻提供了一個state變量來表示鎖的狀态。
我們如何用一個變量來存儲兩種鎖的狀态呢?
在ReadWriteLock中是這樣做的,state變量的高16位表示讀鎖的狀态,低16位表示寫鎖的狀态
擷取寫鎖
鑒于寫鎖的實作比較簡單,我們就先看寫鎖的實作,再看讀鎖的實作
// WriteLock
public void lock() {
sync.acquire(1);
}
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面的代碼我們在AQS中已經分析過了,不再分析了,直接分析加鎖的邏輯
// Sync
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 擷取寫鎖的值
int w = exclusiveCount(c);
if (c != 0) {
// state不為0,寫鎖為0,說明讀鎖不為0
// (Note: if c != 0 and w == 0 then shared count != 0)
// 1. 讀鎖不為0
// 2. 寫鎖不為0,并且擷取寫鎖的線程不是目前線程,則寫鎖加鎖失敗
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 超過寫鎖能表示的最大擷取次數
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 寫鎖重入
setState(c + acquires);
return true;
}
// 沒有被加鎖,先看看是否需要排隊
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 獲鎖成功,執行業務邏輯
setExclusiveOwnerThread(current);
return true;
}
在這裡我們先引入2個概念
鎖更新:同一個線程先申請讀鎖,再申請寫鎖,此時能正确申請到寫鎖
鎖降低:同一個線程先申請寫鎖,再申請讀鎖,此時能正确申請到讀鎖
從上面的源碼中我們可以看到申請寫鎖的時候,隻要有讀鎖就會失敗,是以ReadWriteLock并不支援鎖更新
加鎖時公平鎖和非公平鎖的邏輯和ReentrantLock一樣
static final class NonfairSync extends Sync {
// 非公平模式,直接cas去搶鎖,搶不到再排隊
final boolean writerShouldBlock() {
return false; // writers can always barge
}
}
static final class FairSync extends Sync {
// 同步隊列中有線程則去排隊
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
}
釋放寫鎖
// WriteLock
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
直接看釋放鎖的邏輯
// Sync
protected final boolean tryRelease(int releases) {
// 解鎖的線程和擷取鎖的線程不一樣
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 寫鎖是可重入的,判斷所有的寫鎖是否都被釋放
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
将寫鎖的加鎖次數減一,因為寫鎖是可重入的。當寫鎖都被釋放時,喚醒同步隊列中的線程,否則隻是修改次數
擷取讀鎖
// ReadLock
public void lock() {
sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
直接看加鎖的邏輯
// Sync
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 寫鎖已經被持有,并且不是持有鎖的線程不是目前線程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 是否需要排隊
// 是否超過能表示的加鎖次數
// cas加鎖
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 第一個擷取讀鎖
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 讀鎖重入
firstReaderHoldCount++;
} else {
// cachedHoldCounter用來儲存最後一個擷取讀鎖的線程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
// 從 AQS中acquireShared方法可以知道大于0表示擷取到鎖
return 1;
}
// 自旋擷取讀鎖
return fullTryAcquireShared(current);
}
當我們加讀鎖的時候,如果有寫鎖并且不是目前線程就會加鎖失敗。如果有寫鎖并且是目前線程那麼可以正常擷取讀鎖,是以ReadWriteLock是支援鎖降級的
firstReader,cachedHoldCounter等隻是一些統計變量,例如讀鎖的擷取次數,對主流程影響不大,不展開分析了
釋放讀鎖
// ReadLock
public void unlock() {
sync.releaseShared(1);
}
// AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync
protected final boolean tryReleaseShared(int unused) {
// 省略部分無關代碼
for (;;) {
int c = getState();
// 将讀鎖次數減1
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// nextc == 0表示讀鎖和寫鎖都被釋放了
return nextc == 0;
}
}
通過CAS不斷減少讀鎖的加鎖次數。
總結
讀取是擷取共享鎖,在擷取讀鎖之前會先判斷寫鎖是否被擷取,如果寫鎖被目前線程擷取或者沒有寫鎖,則擷取讀鎖成功,否則擷取讀鎖失敗(支援鎖降級)