天天看點

Java多線程(十)之ReentrantReadWriteLock深入分析一、ReentrantReadWriteLock與ReentrantLock二、ReentrantReadWriteLock的特性三、ReentrantReadWriteLock的内部實作四、小結

一、ReentrantReadWriteLock與ReentrantLock

  說到ReentrantReadWriteLock,首先要做的是與ReentrantLock劃清界限。它和後者都是單獨的實作,彼此之間沒有繼承或實作的關系。

ReentrantLock 實作了标準的互斥操作,也就是一次隻能有一個線程持有鎖,也即所謂獨占鎖的概念。前面的章節中一直在強調這個特點。顯然這個特點在一定程度上面減低了吞吐量,實際上獨占鎖是一種保守的鎖政策,在這種情況下任何“讀/讀”,“寫/讀”,“寫/寫”操作都不能同時發生。但是同樣需要強調的一個概念是,鎖是有一定的開銷的,當并發比較大的時候,鎖的開銷就比較客觀了。是以如果可能的話就盡量少用鎖,非要用鎖的話就嘗試看能否改造為讀寫鎖。

ReadWriteLock 描述的是:一個資源能夠被多個讀線程通路,或者被一個寫線程通路,但是不能同時存在讀寫線程。也就是說讀寫鎖使用的場合是一個共享資源被大量讀取操作,而隻有少量的寫操作(修改資料)。清單0描述了ReadWriteLock的API。

清單0 ReadWriteLock 接口

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}           

清單0描述的ReadWriteLock結構,這裡需要說明的是ReadWriteLock并不是Lock的子接口,隻不過ReadWriteLock借助Lock來實作讀寫兩個視角。在ReadWriteLock中每次讀取共享資料就需要讀取鎖,當需要修改共享資料時就需要寫入鎖。看起來好像是兩個鎖,但其實不盡然,下文會指出。

二、ReentrantReadWriteLock的特性

ReentrantReadWriteLock有以下幾個特性:

  • 公平性
    • 非公平鎖(預設) 這個和獨占鎖的非公平性一樣,由于讀線程之間沒有鎖競争,是以讀操作沒有公平性和非公平性,寫操作時,由于寫操作可能立即擷取到鎖,是以會推遲一個或多個讀操作或者寫操作。是以非公平鎖的吞吐量要高于公平鎖。
    • 公平鎖 利用AQS的CLH隊列,釋放目前保持的鎖(讀鎖或者寫鎖)時,優先為等待時間最長的那個寫線程配置設定寫入鎖,目前前提是寫線程的等待時間要比所有讀線程的等待時間要長。同樣一個線程持有寫入鎖或者有一個寫線程已經在等待了,那麼試圖擷取公平鎖的(非重入)所有線程(包括讀寫線程)都将被阻塞,直到最先的寫線程釋放鎖。如果讀線程的等待時間比寫線程的等待時間還有長,那麼一旦上一個寫線程釋放鎖,這一組讀線程将擷取鎖。
  • 重入性
    • 讀寫鎖允許讀線程和寫線程按照請求鎖的順序重新擷取讀取鎖或者寫入鎖。當然了隻有寫線程釋放了鎖,讀線程才能擷取重入鎖。
    • 寫線程擷取寫入鎖後可以再次擷取讀取鎖,但是讀線程擷取讀取鎖後卻不能擷取寫入鎖。
    • 另外讀寫鎖最多支援65535個遞歸寫入鎖和65535個遞歸讀取鎖。
  • 鎖降級
    • 寫線程擷取寫入鎖後可以擷取讀取鎖,然後釋放寫入鎖,這樣就從寫入鎖變成了讀取鎖,進而實作鎖降級的特性。
  • 鎖更新
    • 讀取鎖是不能直接更新為寫入鎖的。因為擷取一個寫入鎖需要釋放所有讀取鎖,是以如果有兩個讀取鎖視圖擷取寫入鎖而都不釋放讀取鎖時就會發生死鎖。
  • 鎖擷取中斷
    • 讀取鎖和寫入鎖都支援擷取鎖期間被中斷。這個和獨占鎖一緻。
  • 條件變量
    • 寫入鎖提供了條件變量(Condition)的支援,這個和獨占鎖一緻,但是讀取鎖卻不允許擷取條件變量,将得到一個

      UnsupportedOperationException

      異常。
  • 重入數
    • 讀取鎖和寫入鎖的數量最大分别隻能是65535(包括重入數)。

三、ReentrantReadWriteLock的内部實作

3.1 讀寫鎖是獨占鎖的兩個不同視圖

ReentrantReadWriteLock裡面的鎖主體就是一個Sync,也就是上面提到的FairSync或者NonfairSync,是以說實際上隻有一個鎖,隻是在擷取讀取鎖和寫入鎖的方式上不一樣,是以前面才有讀寫鎖是獨占鎖的兩個不同視圖一說。

ReentrantReadWriteLock裡面有兩個類:ReadLock/WriteLock,這兩個類都是Lock的實作。

清單1 ReadLock 片段

public static class ReadLock implements Lock, java.io.Serializable  {
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public  boolean tryLock() {
        return sync.tryReadLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public  void unlock() {
        sync.releaseShared(1);
    }

    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

}           

清單2 WriteLock 片段

public static class WriteLock implements Lock, java.io.Serializable  {
    private final Sync sync;
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    public void lock() {
        sync.acquire(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}           

清單1描述的是讀鎖的實作,清單2描述的是寫鎖的實作。顯然WriteLock就是一個獨占鎖,這和ReentrantLock裡面的實作幾乎相同,都是使用了AQS的acquire/release操作。當然了在内部處理方式上與ReentrantLock還是有一點不同的。對比清單1和清單2可以看到,ReadLock擷取的是共享鎖,WriteLock擷取的是獨占鎖。

3.2 ReentrantReadWriteLock中的state

在AQS章節中介紹到AQS中有一個state字段(int類型,32位)用來描述有多少線程獲持有鎖。在獨占鎖的時代這個值通常是0或者1(如果是重入的就是重入的次數),在共享鎖的時代就是持有鎖的數量。在上一節中談到,ReadWriteLock的讀、寫鎖是相關但是又不一緻的,是以需要兩個數來描述讀鎖(共享鎖)和寫鎖(獨占鎖)的數量。顯然現在一個state就不夠用了。于是在ReentrantReadWrilteLock裡面将這個字段一分為二,高位16位表示共享鎖的數量,低位16位表示獨占鎖的數量(或者重入數量)。2^16-1=65536,這就是上節中提到的為什麼共享鎖和獨占鎖的數量最大隻能是65535的原因了。

3.3 讀寫鎖的擷取和釋放

有了上面的知識後再來分析讀寫鎖的擷取和釋放就容易多了。

清單3 寫入鎖擷取片段

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if ((w == 0 && writerShouldBlock(current)) ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}           

清單3 是寫入鎖擷取的邏輯片段,整個工作流程是這樣的:

    1. 持有鎖線程數非0(c=getState()不為0),如果寫線程數(w)為0(那麼讀線程數就不為0)或者獨占鎖線程(持有鎖的線程)不是目前線程就傳回失敗,或者寫入鎖的數量(其實是重入數)大于65535就抛出一個Error異常。否則進行2。
    2. 如果當且寫線程數位0(那麼讀線程也應該為0,因為步驟1已經處理c!=0的情況),并且目前線程需要阻塞那麼就傳回失敗;如果增加寫線程數失敗也傳回失敗。否則進行3。
    3. 設定獨占線程(寫線程)為目前線程,傳回true。

清單3 中 exclusiveCount(c)就是擷取寫線程數(包括重入數),也就是state的低16位值。另外這裡有一段邏輯是目前寫線程是否需要阻塞writerShouldBlock(current)。清單4 和清單5 就是公平鎖和非公平鎖中是否需要阻塞的片段。很顯然對于非公平鎖而言總是不阻塞目前線程,而對于公平鎖而言如果AQS隊列不為空或者目前線程不是在AQS的隊列頭那麼就阻塞線程,直到隊列前面的線程處理完鎖邏輯。

清單4 公平讀寫鎖寫線程是否阻塞

final boolean writerShouldBlock(Thread current) {
    return !isFirst(current);
}           

清單5 非公平讀寫鎖寫線程是否阻塞

final boolean writerShouldBlock(Thread current) {
    return false;
}           

寫入鎖的擷取邏輯清楚後,釋放鎖就比較簡單了。清單6 描述的寫入鎖釋放邏輯片段,其實就是檢測下剩下的寫入鎖數量,如果是0就将獨占鎖線程清空(意味着沒有線程擷取鎖),否則就是說目前是重入鎖的一次釋放,是以不能将獨占鎖線程清空。然後将剩餘線程狀态數寫回AQS。

清單6 寫入鎖釋放邏輯片段

protected final boolean tryRelease(int releases) {
    int nextc = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    if (exclusiveCount(nextc) == 0) {
        setExclusiveOwnerThread(null);
        setState(nextc);
        return true;
    } else {
        setState(nextc);
        return false;
    }
}           

清單3~6 描述的寫入鎖的擷取釋放過程。讀取鎖的擷取和釋放過程要稍微複雜些。 清單7描述的是讀取鎖的擷取過程。

清單7 讀取鎖擷取過程片段

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    if (sharedCount(c) == MAX_COUNT)
        throw new Error("Maximum lock count exceeded");
    if (!readerShouldBlock(current) &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            cachedHoldCounter = rh = readHolds.get();
        rh.count++;
        return 1;
    }
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = cachedHoldCounter;
    if (rh == null || rh.tid != current.getId())
        rh = readHolds.get();
    for (;;) {
        int c = getState();
        int w = exclusiveCount(c);
        if ((w != 0 && getExclusiveOwnerThread() != current) ||
            ((rh.count | w) == 0 && readerShouldBlock(current)))
            return -1;
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            cachedHoldCounter = rh; // cache for release
            rh.count++;
            return 1;
        }
    }
}           

讀取鎖擷取的過程是這樣的:

    1. 如果寫線程持有鎖(也就是獨占鎖數量不為0),并且獨占線程不是目前線程,那麼就傳回失敗。因為允許寫入線程擷取鎖的同時擷取讀取鎖。否則進行2。
    2. 如果讀線程請求鎖數量達到了65535(包括重入鎖),那麼就跑出一個錯誤Error,否則進行3。
    3. 如果讀線程不用等待(實際上是是否需要公平鎖),并且增加讀取鎖狀态數成功,那麼就傳回成功,否則進行4。
    4. 步驟3失敗的原因是CAS操作修改狀态數失敗,那麼就需要循環不斷嘗試去修改狀态直到成功或者鎖被寫入線程占有。實際上是過程3的不斷嘗試直到CAS計數成功或者被寫入線程占有鎖。

3.4 HoldCounter

在清單7 中有一個對象HoldCounter,這裡暫且不提這是什麼結構和為什麼存在這樣一個結構。

接下來根據清單8 我們來看如何釋放一個讀取鎖。同樣先不理HoldCounter,關鍵的在于for循環裡面,其實就是一個不斷嘗試的CAS操作,直到修改狀态成功。前面說過state的高16位描述的共享鎖(讀取鎖)的數量,是以每次都需要減去2^16,這樣就相當于讀取鎖數量減1。實際上SHARED_UNIT=1<<16。

清單8 讀取鎖釋放過程

protected final boolean tryReleaseShared(int unused) {
    HoldCounter rh = cachedHoldCounter;
    Thread current = Thread.currentThread();
    if (rh == null || rh.tid != current.getId())
        rh = readHolds.get();
    if (rh.tryDecrement() <= 0)
        throw new IllegalMonitorStateException();
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}           

好了,現在回頭看HoldCounter到底是一個什麼東西。首先我們可以看到隻有在擷取共享鎖(讀取鎖)的時候加1,也隻有在釋放共享鎖的時候減1有作用,并且在釋放鎖的時候抛出了一個IllegalMonitorStateException異常。而我們知道IllegalMonitorStateException通常描述的是一個線程操作一個不屬于自己的螢幕對象的引發的異常。也就是說這裡的意思是一個線程釋放了一個不屬于自己或者不存在的共享鎖。

前面的章節中一再強調,對于共享鎖,其實并不是鎖的概念,更像是計數器的概念。一個共享鎖就相對于一次計數器操作,一次擷取共享鎖相當于計數器加1,釋放一個共享鎖就相當于計數器減1。顯然隻有線程持有了共享鎖(也就是目前線程攜帶一個計數器,描述自己持有多少個共享鎖或者多重共享鎖),才能釋放一個共享鎖。否則一個沒有擷取共享鎖的線程調用一次釋放操作就會導緻讀寫鎖的state(持有鎖的線程數,包括重入數)錯誤。

明白了HoldCounter的作用後我們就可以猜到它的作用其實就是目前線程持有共享鎖(讀取鎖)的數量,包括重入的數量。那麼這個數量就必須和線程綁定在一起。

在Java裡面将一個對象和線程綁定在一起,就隻有ThreadLocal才能實作了。是以毫無疑問HoldCounter就應該是綁定到線程上的一個計數器。

清單9 線程持有讀取鎖數量的計數器

static final class HoldCounter {
    int count;
    final long tid = Thread.currentThread().getId();
    int tryDecrement() {
        int c = count;
        if (c > 0)
            count = c - 1;
        return c;
    }
}

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}           

清單9 描述的是線程持有讀取鎖數量的計數器。可以看到這裡使用ThreadLocal将HoldCounter綁定到目前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock裡面緩存的上一個讀取線程(cachedHoldCounter)是否是目前線程。這樣做的好處是可以減少ThreadLocal.get()的次數,因為這也是一個耗時操作。需要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的原因是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(盡管GC能夠智能的發現這種引用而回收它們,但是這需要一定的代價),是以其實這樣做隻是為了幫助GC快速回收對象而已。

除了readLock()和writeLock()外,Lock對象還允許tryLock(),那麼ReadLock和WriteLock的tryLock()不一樣。清單10 和清單11 分别描述了讀取鎖的tryLock()和寫入鎖的tryLock()。

讀取鎖tryLock()也就是tryReadLock()成功的條件是:沒有寫入鎖或者寫入鎖是目前線程,并且讀線程共享鎖數量沒有超過65535個。

寫入鎖tryLock()也就是tryWriteLock()成功的條件是: 沒有寫入鎖或者寫入鎖是目前線程,并且嘗試一次修改state成功。

清單10 讀取鎖的tryLock()

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            rh.count++;
            return true;
        }
    }
}           

清單11 寫入鎖的tryLock()

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 ||current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}           

四、小結

使用ReentrantReadWriteLock可以推廣到大部分讀,少量寫的場景,因為讀線程之間沒有競争,是以比起sychronzied,性能好很多。

如果需要較為精确的控制緩存,使用ReentrantReadWriteLock倒也不失為一個方案。

參考内容來源:

ReentrantReadWriteLock  http://uule.iteye.com/blog/1549707

深入淺出 Java Concurrency (13): 鎖機制 part 8 讀寫鎖 (ReentrantReadWriteLock) (1)

http://www.blogjava.net/xylz/archive/2010/07/14/326080.html

深入淺出 Java Concurrency (14): 鎖機制 part 9 讀寫鎖 (ReentrantReadWriteLock) (2)

http://www.blogjava.net/xylz/archive/2010/07/15/326152.html

高性能鎖ReentrantReadWriteLock

http://jhaij.iteye.com/blog/269656

JDK說明

http://www.cjsdn.net/Doc/JDK60/java/util/concurrent/locks/ReentrantReadWriteLock.html

關于concurrent包 線程池、資源封鎖和隊列、ReentrantReadWriteLock介紹

http://www.oschina.net/question/16_636