一、示例
1. 基于 ReentrantLock 實作簡單緩存
public class Cache1 {
private static final Map<String, Object> map = new HashMap<String, Object>();
private static ReentrantLock lock = new ReentrantLock();
public static Object get(String key) {
lock.lock();
try {
return map.get(key);
} finally {
lock.unlock();
}
}
public static void set(String key, Object value) {
lock.lock();
try {
map.put(key, value);
} finally {
lock.unlock();
}
}
}
上面的代碼中實作了一個簡單的緩存, 在寫入和讀取時都使用 ReentrantLock 進行鎖住,之前文章有講過,ReentrantLock 是一個排他鎖, 隻能在同一時刻隻有一個線程通路,其他線程阻塞。但在緩存這種讀多寫少的場景中,如果每一次讀取資料都要鎖住緩存的話,那效率是很低的,是以這時候需要有一種鎖,能夠在寫入時鎖住緩存,不讓其他線程通路緩存,而在讀取資料時可以讓其他讀線程通路緩存,而寫線程阻塞,這種鎖的名稱叫做 ReentrantReadWriteLock(可重入讀寫鎖)
2. 基于 ReentrantReadWriteLock 實作簡單緩存
public class Cache2 {
private static final Map<String, Object> map = new HashMap<String, Object>();
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.ReadLock read = lock.readLock();
private static ReentrantReadWriteLock.WriteLock write = lock.writeLock();
public static Object get(String key) {
read.lock();
try {
return map.get(key);
} finally {
read.unlock();
}
}
public static void set(String key, Object value) {
write.lock();
try {
map.put(key, value);
} finally {
write.unlock();
}
}
}
如上代碼,當調用 set 方法時,使用讀鎖将緩存鎖住,這時候其他讀或寫線程就進入阻塞狀态,隻有讀鎖釋放後其他線程才能争奪鎖,而調用 get 方法時,會将寫線程阻塞,其他讀線程可以正常進入争奪鎖。
二、 讀寫鎖實作原理
介紹完 ReentrantReadWriteLock 的使用後,接下來介紹其實作原理。ReentrantReadWriteLock 同上一篇文章中介紹 ReentrantLock 一樣是基于 AQS 來實作,但是在 AQS 中隻有一個 state 的屬性來表示同步狀态,而讀寫鎖有 讀和寫 兩種狀态, 那它是怎麼實作的呢?
我們知道 AQS 的 state 屬性是 int 類型的,int 類型 32位, 在讀寫鎖的實作中,将 32位的 state 切分成兩部分,分别是高16位和低16位, 高16位表示讀, 低16位表示寫,如下圖所示:

如果目前同步狀态如上圖所示的話,表示有一個線程已經擷取到了寫鎖, 并且重進入了 1 次, 同時該線程也擷取了 2 次讀鎖。讀寫鎖中通過位運算來确定讀寫狀态,假如目前的同步狀态為 S, 那寫狀态就是 S & 0x0000FFFF (将高16位清除), 而讀狀态就是S >>> 16 (無符号右移,高位補0)。當寫狀态加1時,等于 S + 1, 讀狀态加1時, 等于 S + (1 << 16), 也就是 S + 0x00010000。并且我們可以得到一個推論: 當 S != 0 時,并且 S & 0x0000FFFF 等于0時,那麼讀狀态不為0,表示讀鎖已被擷取了。
三、 源碼
1. 寫鎖的擷取與釋放
寫鎖是一個支援重進入的排他鎖,目前線程擷取寫鎖時,其他讀寫線程阻塞,而目前線程可以繼續擷取讀狀态和寫狀态。當有其他線程擷取了讀鎖時,該線程進入阻塞狀态。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 擷取同步狀态
int c = getState();
// 計算寫狀态的值,采用 c & 0x0000FFFF
int w = exclusiveCount(c);
// 如果同步狀态不為 0
if (c != 0) {
// 如果 w == 0 ,目前狀态為讀狀态,如果 w != 0,
// 目前狀态為寫狀态,判斷獨占鎖的占有線程是不是目前線程,不是傳回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 目前讀狀态的值超過最大值 65535, 則抛出異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 讀狀态加1
setState(c + acquires);
return true;
}
// 是否要進入阻塞, 否則嘗試設定同步狀态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 設定成功則将目前線程設定為獨占鎖的占有線程
setExclusiveOwnerThread(current);
// 傳回true說明擷取寫鎖成功
return true;
}
如上圖代碼, 先判斷同步狀态是不是等于0, 如果不等于0,判斷 w 是不是等于0 ,w的計算方法是 同步狀态 & 0x0000FFFF,如果等于0的話說明,目前同步狀态是讀狀态,傳回false進入阻塞,如果不等于0時,說明同步狀态為寫狀态,則判斷目前線程是不是占有獨占鎖的線程(重進入判斷), 不是則進入阻塞。因為讀寫狀态分别隻占了 16 位, 是以它們最大值都是 65535, 如果讀或寫狀态超過這個值則抛出異常。 writerShouldBlock() 方法是公平鎖和非公平鎖的實作,判斷目前線程是否需要進入等待,如果不需要則使用 CAS 設定同步狀态,設定成功将獨占鎖的占有線程設定位目前線程。讀鎖的釋放比較簡單,同 ReentrantLock 中的釋放鎖相似,如下:
protected final boolean tryRelease(int releases) {
// 判斷目前線程是不是占有獨占鎖的線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 同步狀态減1
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 如果寫狀态為 0,則設定獨占鎖的占有線程為 null
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
2. 讀鎖的擷取和釋放
protected final int tryAcquireShared(int unused) {
// 擷取目前線程
Thread current = Thread.currentThread();
// 擷取同步狀态
int c = getState();
// 如果目前是寫狀态,并且目前線程不是獨占鎖鎖占有的線程的話,傳回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 擷取讀狀态的值
int r = sharedCount(c);
// 1. readerShouldBlock 公平鎖和非公平鎖的判斷
// 2. 判斷讀狀态的值是不是超過了最大值 65535
// 3. CAS 設定同步狀态
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果 r == 0 設定firstReader 為目前線程,并設定該線程重進入數為1次
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果firstReader為目前線程的話,設定重進入數+1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 下面這一部分是計算每個線程的重進入數,下面再做詳解
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// CAS 擷取讀取鎖
return fullTryAcquireShared(current);
}
過程:
- 判斷目前是不是寫狀态,如果是則判斷占有寫鎖的線程是不是目前線程,不是則傳回 -1
- 使用 CAS 設定同步狀态,成功則擷取到讀鎖
- 擷取不成功則進入 fullTryAcquireShared 方法
注意,代碼中CAS設定同步狀态時,新值為 c + SHARED_UNIT, SHARED_UNIT 的值為 1 << 16, 相當于讀狀态加1,下面來看看 fullTryAcquireShared 方法:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 擷取同步狀态
int c = getState();
// 如果目前是寫狀态,并且目前線程不是獨占鎖鎖占有的線程的話,傳回-1
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 如果是公平鎖的情況下
} else if (readerShouldBlock()) {
// 判斷第一個擷取讀鎖線程是不是目前線程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 下面這一部分是判斷目前線程是否已經擷取過讀鎖,沒有的話傳回 -1
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 判斷是否超過讀鎖的數量
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS 設定讀狀态
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 讀狀态 == 0, 設定firstReader 為目前線程,并設定該線程重進入數為1次
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果firstReader為目前線程的話,設定重進入數+1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 下面這一部分是計算每個線程的重進入數,下面再做詳解
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
fullTryAcquireShared 方法同上面擷取讀鎖的代碼相似,這邊不多做解釋,接下來看看讀鎖的釋放:
protected final boolean tryReleaseShared(int unused) {
// 擷取目前線程
Thread current = Thread.currentThread();
// 如果目前線程為第一個擷取讀鎖的線程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 如果目前線程重進入次數為1時,設定為null
if (firstReaderHoldCount == 1)
firstReader = null;
else
// 否則重進入次數減1
firstReaderHoldCount--;
} else {
// 下面詳解
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// CAS 設定讀狀态減 1
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
讀鎖的釋放比較簡單,主要是通過 CAS 設定讀狀态減 1。
3. 線程擷取讀鎖的重進入數
線程擷取讀鎖的重進入數值是儲存在 HoldCounter 中, 而 HoldCounter 是儲存在 ThreadLocalHolderCounter 中,先來看看 HoldCounter。
static final class HoldCounter {
// 記錄重進入數
int count = 0;
// 設定目前的線程id
final long tid = getThreadId(Thread.currentThread());
}
HoldCounter 的實作很簡單,隻有一個記錄重進入數的 count 和記錄線程id的tid,下面來看看 ThreadLocalHolderCount 的實作
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
ThreadLocalHolderCounter 繼承了 ThreadLocal 類, 這個類後面會介紹,這裡不多講,隻要知道這個是為每個線程單獨儲存一份 HoldCounter 即可,下面來看看剛才擷取和釋放讀鎖時中涉及到這個的代碼
if (rh == null)
// 擷取緩存的 HoldCounter
rh = cachedHoldCounter;
// 如果 rh 的線程id不等于目前線程的話,從readHolds中擷取
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 如果 rh 的重進入數為0的話,說明該線程第一次擷取讀鎖,将其存進readHolds中
else if (rh.count == 0)
readHolds.set(rh);
// 重進入數加 1
rh.count++;
// 緩存 HoldCounter
cachedHoldCounter = rh; // cache for release
上面主要是擷取讀鎖時,将目前線程的重進入數加 1, 而在釋放讀鎖時,則将目前線程的重進入數減1,如下
// 擷取緩存的 HoldCounter
HoldCounter rh = cachedHoldCounter;
// 如果 rh == null 或者 rh 的線程id不等于目前線程的id時,從 readHolds 擷取
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 擷取目前線程的重進入數
int count = rh.count;
// 如果目前線程等于1,從 readHolds 中移除
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 重進入數減1
--rh.count;
四、總結
本篇主要介紹了讀寫鎖的應用以及其實作原理,它主要應用于讀多寫少的場景中,在某個線程擷取到寫鎖時,自己線程可以重進入擷取讀寫狀态,而其他讀寫線程進入阻塞。在某個線程擷取到讀鎖時,其他讀線程可以擷取讀狀态,而想擷取寫鎖的線程将進入阻塞,并且線程擷取讀狀态的重進入是通過 HoldCounter 來記錄的。
參考資料:《Java并發程式設計的藝術》