天天看點

J.U.C 學習【五】讀寫鎖 -- ReentrantReadWriteLock一、示例二、 讀寫鎖實作原理三、 源碼四、總結

一、示例

   1. 基于 ReentrantLock 實作簡單緩存 

public class Cache1 {
    private static final Map<String, Object> map = new HashMap<String, Object>();

    private static ReentrantLock lock = new ReentrantLock();

    public static Object get(String key) {
        lock.lock();
        try {
            return map.get(key);
        } finally {
            lock.unlock();
        }
    }

    public static void set(String key, Object value) {
        lock.lock();
        try {
            map.put(key, value);
        } finally {
            lock.unlock();
        }
    }
}
           

    上面的代碼中實作了一個簡單的緩存, 在寫入和讀取時都使用 ReentrantLock 進行鎖住,之前文章有講過,ReentrantLock 是一個排他鎖, 隻能在同一時刻隻有一個線程通路,其他線程阻塞。但在緩存這種讀多寫少的場景中,如果每一次讀取資料都要鎖住緩存的話,那效率是很低的,是以這時候需要有一種鎖,能夠在寫入時鎖住緩存,不讓其他線程通路緩存,而在讀取資料時可以讓其他讀線程通路緩存,而寫線程阻塞,這種鎖的名稱叫做 ReentrantReadWriteLock(可重入讀寫鎖)

   2. 基于 ReentrantReadWriteLock 實作簡單緩存

public class Cache2 {
    private static final Map<String, Object> map = new HashMap<String, Object>();

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private static ReentrantReadWriteLock.ReadLock read = lock.readLock();

    private static ReentrantReadWriteLock.WriteLock write = lock.writeLock();

    public static Object get(String key) {
        read.lock();
        try {
            return map.get(key);
        } finally {
            read.unlock();
        }
    }

    public static void set(String key, Object value) {
        write.lock();
        try {
            map.put(key, value);
        } finally {
            write.unlock();
        }
    }
}
           

    如上代碼,當調用 set 方法時,使用讀鎖将緩存鎖住,這時候其他讀或寫線程就進入阻塞狀态,隻有讀鎖釋放後其他線程才能争奪鎖,而調用 get 方法時,會将寫線程阻塞,其他讀線程可以正常進入争奪鎖。

二、 讀寫鎖實作原理

    介紹完 ReentrantReadWriteLock 的使用後,接下來介紹其實作原理。ReentrantReadWriteLock 同上一篇文章中介紹 ReentrantLock 一樣是基于 AQS 來實作,但是在 AQS 中隻有一個 state 的屬性來表示同步狀态,而讀寫鎖有 讀和寫 兩種狀态, 那它是怎麼實作的呢?

      我們知道 AQS 的 state 屬性是 int 類型的,int 類型 32位, 在讀寫鎖的實作中,将 32位的 state 切分成兩部分,分别是高16位和低16位, 高16位表示讀, 低16位表示寫,如下圖所示:

J.U.C 學習【五】讀寫鎖 -- ReentrantReadWriteLock一、示例二、 讀寫鎖實作原理三、 源碼四、總結

      如果目前同步狀态如上圖所示的話,表示有一個線程已經擷取到了寫鎖, 并且重進入了 1 次, 同時該線程也擷取了 2 次讀鎖。讀寫鎖中通過位運算來确定讀寫狀态,假如目前的同步狀态為 S, 那寫狀态就是 S & 0x0000FFFF (将高16位清除), 而讀狀态就是S >>> 16 (無符号右移,高位補0)。當寫狀态加1時,等于 S + 1, 讀狀态加1時, 等于 S + (1 << 16), 也就是 S + 0x00010000。并且我們可以得到一個推論: 當 S != 0 時,并且 S & 0x0000FFFF 等于0時,那麼讀狀态不為0,表示讀鎖已被擷取了。

三、 源碼

 1. 寫鎖的擷取與釋放

      寫鎖是一個支援重進入的排他鎖,目前線程擷取寫鎖時,其他讀寫線程阻塞,而目前線程可以繼續擷取讀狀态和寫狀态。當有其他線程擷取了讀鎖時,該線程進入阻塞狀态。

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            // 擷取同步狀态
            int c = getState();
            // 計算寫狀态的值,采用 c & 0x0000FFFF
            int w = exclusiveCount(c);
            // 如果同步狀态不為 0
            if (c != 0) {
                // 如果 w == 0 ,目前狀态為讀狀态,如果 w != 0,
                // 目前狀态為寫狀态,判斷獨占鎖的占有線程是不是目前線程,不是傳回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 目前讀狀态的值超過最大值 65535, 則抛出異常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 讀狀态加1
                setState(c + acquires);
                return true;
            }
            // 是否要進入阻塞, 否則嘗試設定同步狀态
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            // 設定成功則将目前線程設定為獨占鎖的占有線程
            setExclusiveOwnerThread(current);
            // 傳回true說明擷取寫鎖成功
            return true;
        }
           

     如上圖代碼, 先判斷同步狀态是不是等于0, 如果不等于0,判斷 w 是不是等于0 ,w的計算方法是 同步狀态 & 0x0000FFFF,如果等于0的話說明,目前同步狀态是讀狀态,傳回false進入阻塞,如果不等于0時,說明同步狀态為寫狀态,則判斷目前線程是不是占有獨占鎖的線程(重進入判斷), 不是則進入阻塞。因為讀寫狀态分别隻占了 16 位, 是以它們最大值都是 65535, 如果讀或寫狀态超過這個值則抛出異常。 writerShouldBlock() 方法是公平鎖和非公平鎖的實作,判斷目前線程是否需要進入等待,如果不需要則使用 CAS 設定同步狀态,設定成功将獨占鎖的占有線程設定位目前線程。讀鎖的釋放比較簡單,同 ReentrantLock 中的釋放鎖相似,如下:

protected final boolean tryRelease(int releases) {
            // 判斷目前線程是不是占有獨占鎖的線程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 同步狀态減1
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            // 如果寫狀态為 0,則設定獨占鎖的占有線程為 null
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
           

 2. 讀鎖的擷取和釋放

protected final int tryAcquireShared(int unused) {
            // 擷取目前線程
            Thread current = Thread.currentThread();
            // 擷取同步狀态
            int c = getState();
            // 如果目前是寫狀态,并且目前線程不是獨占鎖鎖占有的線程的話,傳回-1
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 擷取讀狀态的值
            int r = sharedCount(c);
            // 1. readerShouldBlock 公平鎖和非公平鎖的判斷
            // 2. 判斷讀狀态的值是不是超過了最大值 65535
            // 3. CAS 設定同步狀态
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果 r == 0 設定firstReader 為目前線程,并設定該線程重進入數為1次
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                // 如果firstReader為目前線程的話,設定重進入數+1
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    // 下面這一部分是計算每個線程的重進入數,下面再做詳解
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // CAS 擷取讀取鎖
            return fullTryAcquireShared(current);
        }
           

過程: 

  1. 判斷目前是不是寫狀态,如果是則判斷占有寫鎖的線程是不是目前線程,不是則傳回 -1 
  2. 使用 CAS 設定同步狀态,成功則擷取到讀鎖
  3. 擷取不成功則進入 fullTryAcquireShared 方法

注意,代碼中CAS設定同步狀态時,新值為 c + SHARED_UNIT, SHARED_UNIT 的值為 1 << 16, 相當于讀狀态加1,下面來看看 fullTryAcquireShared 方法:

final int fullTryAcquireShared(Thread current) {
            
            HoldCounter rh = null;
            for (;;) {
                // 擷取同步狀态
                int c = getState();
                // 如果目前是寫狀态,并且目前線程不是獨占鎖鎖占有的線程的話,傳回-1
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                // 如果是公平鎖的情況下
                } else if (readerShouldBlock()) {
                    // 判斷第一個擷取讀鎖線程是不是目前線程
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        // 下面這一部分是判斷目前線程是否已經擷取過讀鎖,沒有的話傳回 -1
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 判斷是否超過讀鎖的數量
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // CAS 設定讀狀态
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    // 讀狀态 == 0, 設定firstReader 為目前線程,并設定該線程重進入數為1次
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    // 如果firstReader為目前線程的話,設定重進入數+1
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        // 下面這一部分是計算每個線程的重進入數,下面再做詳解
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
           

 fullTryAcquireShared 方法同上面擷取讀鎖的代碼相似,這邊不多做解釋,接下來看看讀鎖的釋放:

protected final boolean tryReleaseShared(int unused) {
            // 擷取目前線程
            Thread current = Thread.currentThread();
            // 如果目前線程為第一個擷取讀鎖的線程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                // 如果目前線程重進入次數為1時,設定為null
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                // 否則重進入次數減1
                    firstReaderHoldCount--;
            } else {
                // 下面詳解
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                // CAS 設定讀狀态減 1
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }
           

 讀鎖的釋放比較簡單,主要是通過 CAS 設定讀狀态減 1。

3.  線程擷取讀鎖的重進入數

      線程擷取讀鎖的重進入數值是儲存在 HoldCounter 中, 而 HoldCounter 是儲存在 ThreadLocalHolderCounter 中,先來看看 HoldCounter。

static final class HoldCounter {
            // 記錄重進入數
            int count = 0;
            // 設定目前的線程id
            final long tid = getThreadId(Thread.currentThread());
        }
           

  HoldCounter 的實作很簡單,隻有一個記錄重進入數的 count 和記錄線程id的tid,下面來看看 ThreadLocalHolderCount 的實作

static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
           

   ThreadLocalHolderCounter 繼承了 ThreadLocal 類, 這個類後面會介紹,這裡不多講,隻要知道這個是為每個線程單獨儲存一份 HoldCounter 即可,下面來看看剛才擷取和釋放讀鎖時中涉及到這個的代碼

if (rh == null)
       // 擷取緩存的 HoldCounter
       rh = cachedHoldCounter;
    // 如果 rh 的線程id不等于目前線程的話,從readHolds中擷取
    if (rh == null || rh.tid != getThreadId(current))
       rh = readHolds.get();
    // 如果 rh 的重進入數為0的話,說明該線程第一次擷取讀鎖,将其存進readHolds中
    else if (rh.count == 0) 
       readHolds.set(rh);
    // 重進入數加 1 
    rh.count++;
    // 緩存 HoldCounter
    cachedHoldCounter = rh; // cache for release
           

  上面主要是擷取讀鎖時,将目前線程的重進入數加 1, 而在釋放讀鎖時,則将目前線程的重進入數減1,如下

// 擷取緩存的 HoldCounter
    HoldCounter rh = cachedHoldCounter;
    // 如果 rh == null 或者 rh 的線程id不等于目前線程的id時,從 readHolds 擷取
    if (rh == null || rh.tid != getThreadId(current))
        rh = readHolds.get();
    // 擷取目前線程的重進入數
    int count = rh.count;
    // 如果目前線程等于1,從 readHolds 中移除
    if (count <= 1) {
        readHolds.remove();
        if (count <= 0)
            throw unmatchedUnlockException();
        }
    // 重進入數減1
    --rh.count;
           

四、總結

    本篇主要介紹了讀寫鎖的應用以及其實作原理,它主要應用于讀多寫少的場景中,在某個線程擷取到寫鎖時,自己線程可以重進入擷取讀寫狀态,而其他讀寫線程進入阻塞。在某個線程擷取到讀鎖時,其他讀線程可以擷取讀狀态,而想擷取寫鎖的線程将進入阻塞,并且線程擷取讀狀态的重進入是通過 HoldCounter 來記錄的。

參考資料:《Java并發程式設計的藝術》