天天看点

ReentrantReadWriteLock 可重入读写锁代码分析与简单实例

个人水平有限,如有错误,请各位看官指出。

前面提到的ReentrantLock是排它锁,这种锁在同一时刻下只允许一个线程进行访问,无论是公平模式还是非公平模式,性能都不是很高。ReentrantReadWriteLock是读写锁,同一时刻允许多个线程读,但是在写线程访问时,其后的读线程或写线程都会阻塞。读写锁实现了读锁与写锁分离,使得并发性能比一般的排它锁有了很大的提升(尤其是读操作比较多的情况)。

ReentrantReadWriteLock 也可以实现公平与非公平,其内部包含读锁与写锁两种锁,读锁是用AQS的共享模式,允许多线程同时获取锁;写锁是用AQS的独占模式,同一时刻下只允许一个线程获取锁。

下面列举一下读锁与写锁的关系:

1:读锁与写锁不能同时存在,除非是同一个线程先获得写锁再去获取读锁实现锁的降级。

2:当一个线程持有读锁时,另一个线程访问写锁,这时写锁获取失败,加入队列并阻塞,直到读锁释放。

3:当一个线程持有写锁时,无论另一个线程访问读锁还是写锁,都不会成功,加入队列并阻塞,直到写锁释放。

ReentrantReadWriteLock 非公平锁

读锁获取锁

    protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) @1
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() && @2
                r < MAX_COUNT &&        @3
                compareAndSetState(c, c + SHARED_UNIT)) {  @4
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);  @5
        }
           

@1 当前存在写锁并且当前线程不是Owner线程,获取失败

@2 判断“读是否需要阻塞”,其代码如下

final boolean readerShouldBlock() {
            
            return apparentlyFirstQueuedIsExclusive();
        }
           

apparentlyFirstQueuedIsExclusive的代码如下

final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
           

其主要意思是如果队列中的第一个节点是独占的即写锁,那么当前线程等待

@3 判断读锁数量是否已经达到最大值

@4 设置读锁状态。

如果 @2 @3 @4这三步都是true那么获取所成功,如果以上3步失败,执行@5 fullTryAcquireShared。

fullTryAcquireShared代码如下

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
           

以上代码主要解决两个问题。1:readerShouldBlock方法返回true 2:CAS操作失败。

如果是1问题:那么先判断是不是重入锁,因为如果是重入是第二次,前面已经设置了firstReader = current,这里判断下是否当前线程是firstReader 如果是就直接CAS操作,这里如果失败,就进行循环。如果不是重入锁,拿到当前线程的锁计数器,然后操作,这里显然rh.count == 0,因为是第一次,后续还有写线程等待,返回-1.加入队列进行阻塞。

如果是2问题:直接循环操作,知道拿到锁为止。

读锁释放

代码如下

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
           

这里的代码和ReentrantLock共享模式释放是一样的,只要区别是子类实现的不同

tryReleaseShared的代码如下

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0) //1
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
           

tryReleaseShared中首先修改线程对应的读锁计数信息,如果线程未获取过读锁,那么在步骤1的地方判断将成功,然后抛出unmatchedUnlockException异常,由于每个线程只会修改自己的计数信息,因此整个过程是安全的

修改完计数信息后,线程将进入一个循环中尝试修改锁状态,直到成功,然后返回读锁是否已经全部释放(nextc == 0),如果读锁全部释放,则等待队列首位的写线程(如果存在)将被唤醒。这里解释下为什么会唤醒写线程而不是唤醒读线程:当前有个线程正在获取读锁,获取完成后去执行没有释放,此时第二个线程去获取写锁,但是前面存在读线程,此时获取写锁线程加入队列并阻塞,以后无论是读还是写线程都会阻塞。当第一个读线程释放锁后,唤醒写线程。如果当前都是读线程去获取锁,没有写线程参与都会获取到,因为是共享模式。如果第一个线程是写线程,后续读与写都会阻塞,写线程可以唤醒读或者写。

总结如下:读线程只能唤醒写线程;写线程能唤醒读线程或写线程;

写锁获取锁

代码如下

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

和ReentrantLock没什么区别,看下tryAcquire的实现

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);//获取写锁数量 
            if (c != 0) {//判断当前是否有现成获取锁 
                // (Note: if c != 0 and w == 0 then shared count != 0)
                //有线程获取锁,写锁数量为0,所以存在读锁,这时获取写锁失败。写锁加入队列
                //有线程获取锁,写锁数量不为0,这时只允许锁的重入,如果当前线程不是Owner线程则获取写锁失败。写锁加入队列 
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                //这里返回true的条件是w!=0 && current == getExclusiveOwnerThread() 即锁的重入 
                return true;
            }
            if (writerShouldBlock() ||//非公平的情况下,始终返会false,即获取写锁从不阻塞 
                !compareAndSetState(c, c + acquires))//设置状态 
                return false;
            setExclusiveOwnerThread(current);
            return true; //成功获取锁
        }
           

写锁的释放

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒队列中的第一个线程
            return true;
        }
        return false;
    }
           

和ReentrantLock没什么区别,看下tryRelease的具体实现

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively()) //判断Owner线程是不是当前线程
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0; //判断写锁数量是否为0,是0的情况下才是,写锁释放完成
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
           

这里的代码很简单,考虑到重入锁的情况,只有写锁的状态为0的情况下才是真正的释放写锁。

ReentrantReadWriteLock 公平锁

公平与非公平锁的释放都是一样的逻辑,他们的获取有些差别,具体差别是writerShouldBlock与readerShouldBlock这两个方法。代码如下:

    final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
           

都是调用了hasQueuedPredecessors方法,前面讲ReentrantLock已经提到过了,当前线程获取锁的时候,判断head节点的后一个节点是否存在,如果存在就把当前线程的加入到队列。

继续阅读