一、示例
1. 基于 ReentrantLock 实现简单缓存
public class Cache1 {
private static final Map<String, Object> map = new HashMap<String, Object>();
private static ReentrantLock lock = new ReentrantLock();
public static Object get(String key) {
lock.lock();
try {
return map.get(key);
} finally {
lock.unlock();
}
}
public static void set(String key, Object value) {
lock.lock();
try {
map.put(key, value);
} finally {
lock.unlock();
}
}
}
上面的代码中实现了一个简单的缓存, 在写入和读取时都使用 ReentrantLock 进行锁住,之前文章有讲过,ReentrantLock 是一个排他锁, 只能在同一时刻只有一个线程访问,其他线程阻塞。但在缓存这种读多写少的场景中,如果每一次读取数据都要锁住缓存的话,那效率是很低的,所以这时候需要有一种锁,能够在写入时锁住缓存,不让其他线程访问缓存,而在读取数据时可以让其他读线程访问缓存,而写线程阻塞,这种锁的名称叫做 ReentrantReadWriteLock(可重入读写锁)
2. 基于 ReentrantReadWriteLock 实现简单缓存
public class Cache2 {
private static final Map<String, Object> map = new HashMap<String, Object>();
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.ReadLock read = lock.readLock();
private static ReentrantReadWriteLock.WriteLock write = lock.writeLock();
public static Object get(String key) {
read.lock();
try {
return map.get(key);
} finally {
read.unlock();
}
}
public static void set(String key, Object value) {
write.lock();
try {
map.put(key, value);
} finally {
write.unlock();
}
}
}
如上代码,当调用 set 方法时,使用读锁将缓存锁住,这时候其他读或写线程就进入阻塞状态,只有读锁释放后其他线程才能争夺锁,而调用 get 方法时,会将写线程阻塞,其他读线程可以正常进入争夺锁。
二、 读写锁实现原理
介绍完 ReentrantReadWriteLock 的使用后,接下来介绍其实现原理。ReentrantReadWriteLock 同上一篇文章中介绍 ReentrantLock 一样是基于 AQS 来实现,但是在 AQS 中只有一个 state 的属性来表示同步状态,而读写锁有 读和写 两种状态, 那它是怎么实现的呢?
我们知道 AQS 的 state 属性是 int 类型的,int 类型 32位, 在读写锁的实现中,将 32位的 state 切分成两部分,分别是高16位和低16位, 高16位表示读, 低16位表示写,如下图所示:

如果当前同步状态如上图所示的话,表示有一个线程已经获取到了写锁, 并且重进入了 1 次, 同时该线程也获取了 2 次读锁。读写锁中通过位运算来确定读写状态,假如当前的同步状态为 S, 那写状态就是 S & 0x0000FFFF (将高16位清除), 而读状态就是S >>> 16 (无符号右移,高位补0)。当写状态加1时,等于 S + 1, 读状态加1时, 等于 S + (1 << 16), 也就是 S + 0x00010000。并且我们可以得到一个推论: 当 S != 0 时,并且 S & 0x0000FFFF 等于0时,那么读状态不为0,表示读锁已被获取了。
三、 源码
1. 写锁的获取与释放
写锁是一个支持重进入的排他锁,当前线程获取写锁时,其他读写线程阻塞,而当前线程可以继续获取读状态和写状态。当有其他线程获取了读锁时,该线程进入阻塞状态。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
// 计算写状态的值,采用 c & 0x0000FFFF
int w = exclusiveCount(c);
// 如果同步状态不为 0
if (c != 0) {
// 如果 w == 0 ,当前状态为读状态,如果 w != 0,
// 当前状态为写状态,判断独占锁的占有线程是不是当前线程,不是返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 当前读状态的值超过最大值 65535, 则抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 读状态加1
setState(c + acquires);
return true;
}
// 是否要进入阻塞, 否则尝试设置同步状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置成功则将当前线程设置为独占锁的占有线程
setExclusiveOwnerThread(current);
// 返回true说明获取写锁成功
return true;
}
如上图代码, 先判断同步状态是不是等于0, 如果不等于0,判断 w 是不是等于0 ,w的计算方法是 同步状态 & 0x0000FFFF,如果等于0的话说明,当前同步状态是读状态,返回false进入阻塞,如果不等于0时,说明同步状态为写状态,则判断当前线程是不是占有独占锁的线程(重进入判断), 不是则进入阻塞。因为读写状态分别只占了 16 位, 所以它们最大值都是 65535, 如果读或写状态超过这个值则抛出异常。 writerShouldBlock() 方法是公平锁和非公平锁的实现,判断当前线程是否需要进入等待,如果不需要则使用 CAS 设置同步状态,设置成功将独占锁的占有线程设置位当前线程。读锁的释放比较简单,同 ReentrantLock 中的释放锁相似,如下:
protected final boolean tryRelease(int releases) {
// 判断当前线程是不是占有独占锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 同步状态减1
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 如果写状态为 0,则设置独占锁的占有线程为 null
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
2. 读锁的获取和释放
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
// 如果当前是写状态,并且当前线程不是独占锁锁占有的线程的话,返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读状态的值
int r = sharedCount(c);
// 1. readerShouldBlock 公平锁和非公平锁的判断
// 2. 判断读状态的值是不是超过了最大值 65535
// 3. CAS 设置同步状态
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果 r == 0 设置firstReader 为当前线程,并设置该线程重进入数为1次
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果firstReader为当前线程的话,设置重进入数+1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 下面这一部分是计算每个线程的重进入数,下面再做详解
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// CAS 获取读取锁
return fullTryAcquireShared(current);
}
过程:
- 判断当前是不是写状态,如果是则判断占有写锁的线程是不是当前线程,不是则返回 -1
- 使用 CAS 设置同步状态,成功则获取到读锁
- 获取不成功则进入 fullTryAcquireShared 方法
注意,代码中CAS设置同步状态时,新值为 c + SHARED_UNIT, SHARED_UNIT 的值为 1 << 16, 相当于读状态加1,下面来看看 fullTryAcquireShared 方法:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 获取同步状态
int c = getState();
// 如果当前是写状态,并且当前线程不是独占锁锁占有的线程的话,返回-1
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 如果是公平锁的情况下
} else if (readerShouldBlock()) {
// 判断第一个获取读锁线程是不是当前线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 下面这一部分是判断当前线程是否已经获取过读锁,没有的话返回 -1
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 判断是否超过读锁的数量
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS 设置读状态
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 读状态 == 0, 设置firstReader 为当前线程,并设置该线程重进入数为1次
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果firstReader为当前线程的话,设置重进入数+1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 下面这一部分是计算每个线程的重进入数,下面再做详解
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
fullTryAcquireShared 方法同上面获取读锁的代码相似,这边不多做解释,接下来看看读锁的释放:
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 如果当前线程为第一个获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 如果当前线程重进入次数为1时,设置为null
if (firstReaderHoldCount == 1)
firstReader = null;
else
// 否则重进入次数减1
firstReaderHoldCount--;
} else {
// 下面详解
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// CAS 设置读状态减 1
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
读锁的释放比较简单,主要是通过 CAS 设置读状态减 1。
3. 线程获取读锁的重进入数
线程获取读锁的重进入数值是保存在 HoldCounter 中, 而 HoldCounter 是保存在 ThreadLocalHolderCounter 中,先来看看 HoldCounter。
static final class HoldCounter {
// 记录重进入数
int count = 0;
// 设置当前的线程id
final long tid = getThreadId(Thread.currentThread());
}
HoldCounter 的实现很简单,只有一个记录重进入数的 count 和记录线程id的tid,下面来看看 ThreadLocalHolderCount 的实现
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
ThreadLocalHolderCounter 继承了 ThreadLocal 类, 这个类后面会介绍,这里不多讲,只要知道这个是为每个线程单独保存一份 HoldCounter 即可,下面来看看刚才获取和释放读锁时中涉及到这个的代码
if (rh == null)
// 获取缓存的 HoldCounter
rh = cachedHoldCounter;
// 如果 rh 的线程id不等于当前线程的话,从readHolds中获取
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 如果 rh 的重进入数为0的话,说明该线程第一次获取读锁,将其存进readHolds中
else if (rh.count == 0)
readHolds.set(rh);
// 重进入数加 1
rh.count++;
// 缓存 HoldCounter
cachedHoldCounter = rh; // cache for release
上面主要是获取读锁时,将当前线程的重进入数加 1, 而在释放读锁时,则将当前线程的重进入数减1,如下
// 获取缓存的 HoldCounter
HoldCounter rh = cachedHoldCounter;
// 如果 rh == null 或者 rh 的线程id不等于当前线程的id时,从 readHolds 获取
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 获取当前线程的重进入数
int count = rh.count;
// 如果当前线程等于1,从 readHolds 中移除
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 重进入数减1
--rh.count;
四、总结
本篇主要介绍了读写锁的应用以及其实现原理,它主要应用于读多写少的场景中,在某个线程获取到写锁时,自己线程可以重进入获取读写状态,而其他读写线程进入阻塞。在某个线程获取到读锁时,其他读线程可以获取读状态,而想获取写锁的线程将进入阻塞,并且线程获取读状态的重进入是通过 HoldCounter 来记录的。
参考资料:《Java并发编程的艺术》