天天看點

java 共享鎖和ReentrantReadWriteLockReadWriteLock 和 ReentrantReadWriteLock介紹ReadWriteLock 和 ReentrantReadWriteLock函數清單ReentrantReadWriteLock資料結構參考代碼(基于JDK1.7.0_40)擷取共享鎖釋放共享鎖

ReadWriteLock 和 ReentrantReadWriteLock介紹

ReadWriteLock,顧名思義,是讀寫鎖。它維護了一對相關的鎖 — — “讀取鎖”和“寫入鎖”,一個用于讀取操作,另一個用于寫入操作。

“讀取鎖”用于隻讀操作,它是“共享鎖”,能同時被多個線程擷取。

“寫入鎖”用于寫入操作,它是“獨占鎖”,寫入鎖隻能被一個線程鎖擷取。

注意:不能同時存在讀取鎖和寫入鎖!

ReadWriteLock是一個接口。ReentrantReadWriteLock是它的實作類,ReentrantReadWriteLock包括子類ReadLock和WriteLock。

ReadWriteLock 和 ReentrantReadWriteLock函數清單

ReadWriteLock函數清單

// 傳回用于讀取操作的鎖。
Lock readLock()
// 傳回用于寫入操作的鎖。
Lock writeLock()
           

ReentrantReadWriteLock函數清單

// 建立一個新的 ReentrantReadWriteLock,預設是采用“非公平政策”。
ReentrantReadWriteLock()
// 建立一個新的 ReentrantReadWriteLock,fair是“公平政策”。fair為true,意味着公平政策;否則,意味着非公平政策。
ReentrantReadWriteLock(boolean fair)

// 傳回目前擁有寫入鎖的線程,如果沒有這樣的線程,則傳回 null。
protected Thread getOwner()
// 傳回一個 collection,它包含可能正在等待擷取讀取鎖的線程。
protected Collection<Thread> getQueuedReaderThreads()
// 傳回一個 collection,它包含可能正在等待擷取讀取或寫入鎖的線程。
protected Collection<Thread> getQueuedThreads()
// 傳回一個 collection,它包含可能正在等待擷取寫入鎖的線程。
protected Collection<Thread> getQueuedWriterThreads()
// 傳回等待擷取讀取或寫入鎖的線程估計數目。
int getQueueLength()
// 查詢目前線程在此鎖上保持的重入讀取鎖數量。
int getReadHoldCount()
// 查詢為此鎖保持的讀取鎖數量。
int getReadLockCount()
// 傳回一個 collection,它包含可能正在等待與寫入鎖相關的給定條件的那些線程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 傳回正等待與寫入鎖相關的給定條件的線程估計數目。
int getWaitQueueLength(Condition condition)
// 查詢目前線程在此鎖上保持的重入寫入鎖數量。
int getWriteHoldCount()
// 查詢是否給定線程正在等待擷取讀取或寫入鎖。
boolean hasQueuedThread(Thread thread)
// 查詢是否所有的線程正在等待擷取讀取或寫入鎖。
boolean hasQueuedThreads()
// 查詢是否有些線程正在等待與寫入鎖有關的給定條件。
boolean hasWaiters(Condition condition)
// 如果此鎖将公平性設定為 ture,則傳回 true。
boolean isFair()
// 查詢是否某個線程保持了寫入鎖。
boolean isWriteLocked()
// 查詢目前線程是否保持了寫入鎖。
boolean isWriteLockedByCurrentThread()
// 傳回用于讀取操作的鎖。
ReentrantReadWriteLock.ReadLock readLock()
// 傳回用于寫入操作的鎖。
ReentrantReadWriteLock.WriteLock writeLock()
           

ReentrantReadWriteLock資料結構

ReentrantReadWriteLock的UML類圖如下:

java 共享鎖和ReentrantReadWriteLockReadWriteLock 和 ReentrantReadWriteLock介紹ReadWriteLock 和 ReentrantReadWriteLock函數清單ReentrantReadWriteLock資料結構參考代碼(基于JDK1.7.0_40)擷取共享鎖釋放共享鎖

從中可以看出:

(01) ReentrantReadWriteLock實作了ReadWriteLock接口。ReadWriteLock是一個讀寫鎖的接口,提供了”擷取讀鎖的readLock()函數” 和 “擷取寫鎖的writeLock()函數”。

(02) ReentrantReadWriteLock中包含:sync對象,讀鎖readerLock和寫鎖writerLock。讀鎖ReadLock和寫鎖WriteLock都實作了Lock接口。讀鎖ReadLock和寫鎖WriteLock中也都分别包含了”Sync對象”,它們的Sync對象和ReentrantReadWriteLock的Sync對象 是一樣的,就是通過sync,讀鎖和寫鎖實作了對同一個對象的通路。

(03) 和”ReentrantLock”一樣,sync是Sync類型;而且,Sync也是一個繼承于AQS的抽象類。Sync也包括”公平鎖”FairSync和”非公平鎖”NonfairSync。sync對象是”FairSync”和”NonfairSync”中的一個,預設是”NonfairSync”。

參考代碼(基于JDK1.7.0_40)

共享鎖源碼相關的代碼如下:

public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -L;
    // ReentrantReadWriteLock的AQS對象
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    // 擷取“共享鎖”
    public void lock() {
        sync.acquireShared();
    }

    // 如果線程是中斷狀态,則抛出一場,否則嘗試擷取共享鎖。
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly();
    }

    // 嘗試擷取“共享鎖”
    public  boolean tryLock() {
        return sync.tryReadLock();
    }

    // 在指定時間内,嘗試擷取“共享鎖”
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(, unit.toNanos(timeout));
    }

    // 釋放“共享鎖”
    public  void unlock() {
        sync.releaseShared();
    }

    // 建立條件
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    public String toString() {
        int r = sync.getReadLockCount();
        return super.toString() +
            "[Read locks = " + r + "]";
    }
}
           

說明:

ReadLock中的sync是一個Sync對象,Sync繼承于AQS類,即Sync就是一個鎖。ReentrantReadWriteLock中也有一個Sync對象,而且ReadLock中的sync和ReentrantReadWriteLock中的sync是對應關系。即ReentrantReadWriteLock和ReadLock共享同一個AQS對象,共享同一把鎖。

ReentrantReadWriteLock中Sync的定義如下:

final Sync sync;
           

下面,分别從“擷取共享鎖”和“釋放共享鎖”兩個方面對共享鎖進行說明。

擷取共享鎖

擷取共享鎖的思想(即lock函數的步驟),是先通過tryAcquireShared()嘗試擷取共享鎖。嘗試成功的話,則直接傳回;嘗試失敗的話,則通過doAcquireShared()不斷的循環并嘗試擷取鎖,若有需要,則阻塞等待。doAcquireShared()在循環中每次嘗試擷取鎖時,都是通過tryAcquireShared()來進行嘗試的。下面看看“擷取共享鎖”的詳細流程。

1. lock()

lock()在ReadLock中,源碼如下:

public void lock() {
    sync.acquireShared();
}
           

2. acquireShared()

Sync繼承于AQS,acquireShared()定義在AQS中。源碼如下:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < )
        doAcquireShared(arg);
}
           

說明:acquireShared()首先會通過tryAcquireShared()來嘗試擷取鎖。

嘗試成功的話,則不再做任何動作(因為已經成功擷取到鎖了)。

嘗試失敗的話,則通過doAcquireShared()來擷取鎖。doAcquireShared()會擷取到鎖了才傳回。

3. tryAcquireShared()

tryAcquireShared()定義在ReentrantReadWriteLock.java的Sync中,源碼如下:

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 擷取“鎖”的狀态
    int c = getState();
    // 如果“鎖”是“互斥鎖”,并且擷取鎖的線程不是current線程;則傳回-1。
    if (exclusiveCount(c) !=  &&
        getExclusiveOwnerThread() != current)
        return -;
    // 擷取“讀取鎖”的共享計數
    int r = sharedCount(c);
    // 如果“不需要阻塞等待”,并且“讀取鎖”的共享計數小于MAX_COUNT;
    // 則通過CAS函數更新“鎖的狀态”,将“讀取鎖”的共享計數+1。
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 第1次擷取“讀取鎖”。
        if (r == ) { 
            firstReader = current;
            firstReaderHoldCount = ;
        // 如果想要擷取鎖的線程(current)是第1個擷取鎖(firstReader)的線程
        } else if (firstReader == current) { 
            firstReaderHoldCount++;
        } else {
            // HoldCounter是用來統計該線程擷取“讀取鎖”的次數。
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == )
                readHolds.set(rh);
            // 将該線程擷取“讀取鎖”的次數+1。
            rh.count++;
        }
        return ;
    }
    return fullTryAcquireShared(current);
}
           

說明:tryAcquireShared()的作用是嘗試擷取“共享鎖”。

如果在嘗試擷取鎖時,“不需要阻塞等待”并且“讀取鎖的共享計數小于MAX_COUNT”,則直接通過CAS函數更新“讀取鎖的共享計數”,以及将“目前線程擷取讀取鎖的次數+1”。

否則,通過fullTryAcquireShared()擷取讀取鎖。

4. fullTryAcquireShared()

fullTryAcquireShared()在ReentrantReadWriteLock中定義,源碼如下:

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        // 擷取“鎖”的狀态
        int c = getState();
        // 如果“鎖”是“互斥鎖”,并且擷取鎖的線程不是current線程;則傳回-1。
        if (exclusiveCount(c) != ) {
            if (getExclusiveOwnerThread() != current)
                return -;
        // 如果“需要阻塞等待”。
        // (01) 當“需要阻塞等待”的線程是第1個擷取鎖的線程的話,則繼續往下執行。
        // (02) 當“需要阻塞等待”的線程擷取鎖的次數=0時,則傳回-1。
        } else if (readerShouldBlock()) {
            // 如果想要擷取鎖的線程(current)是第1個擷取鎖(firstReader)的線程
            if (firstReader == current) {
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId()) {
                        rh = readHolds.get();
                        if (rh.count == )
                            readHolds.remove();
                    }
                }
                // 如果目前線程擷取鎖的計數=0,則傳回-1。
                if (rh.count == )
                    return -;
            }
        }
        // 如果“不需要阻塞等待”,則擷取“讀取鎖”的共享統計數;
        // 如果共享統計數超過MAX_COUNT,則抛出異常。
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 将線程擷取“讀取鎖”的次數+1。
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 如果是第1次擷取“讀取鎖”,則更新firstReader和firstReaderHoldCount。
            if (sharedCount(c) == ) {
                firstReader = current;
                firstReaderHoldCount = ;
            // 如果想要擷取鎖的線程(current)是第1個擷取鎖(firstReader)的線程,
            // 則将firstReaderHoldCount+1。
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                else if (rh.count == )
                    readHolds.set(rh);
                // 更新線程的擷取“讀取鎖”的共享計數
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return ;
        }
    }
}
           

說明:fullTryAcquireShared()會根據“是否需要阻塞等待”,“讀取鎖的共享計數是否超過限制”等等進行處理。如果不需要阻塞等待,并且鎖的共享計數沒有超過限制,則通過CAS嘗試擷取鎖,并傳回1。

5. doAcquireShared()

doAcquireShared()定義在AQS函數中,源碼如下:

private void doAcquireShared(int arg) {
    // addWaiter(Node.SHARED)的作用是,建立“目前線程”對應的節點,并将該線程添加到CLH隊列中。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 擷取“node”的前一節點
            final Node p = node.predecessor();
            // 如果“目前線程”是CLH隊列的表頭,則嘗試擷取共享鎖。
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= ) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 如果“目前線程”不是CLH隊列的表頭,則通過shouldParkAfterFailedAcquire()判斷是否需要等待,
            // 需要的話,則通過parkAndCheckInterrupt()進行阻塞等待。若阻塞等待過程中,線程被中斷過,則設定interrupted為true。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

說明:doAcquireShared()的作用是擷取共享鎖。

它會首先建立線程對應的CLH隊列的節點,然後将該節點添加到CLH隊列中。CLH隊列是管理擷取鎖的等待線程的隊列。

如果“目前線程”是CLH隊列的表頭,則嘗試擷取共享鎖;否則,則需要通過shouldParkAfterFailedAcquire()判斷是否阻塞等待,需要的話,則通過parkAndCheckInterrupt()進行阻塞等待。

doAcquireShared()會通過for循環,不斷的進行上面的操作;目的就是擷取共享鎖。需要注意的是:doAcquireShared()在每一次嘗試擷取鎖時,是通過tryAcquireShared()來執行的!

shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()等函數已經在“Java多線程系列–“JUC鎖”03之 公平鎖(一) ”中詳細介紹過,這裡就不再重複說明了。

釋放共享鎖

釋放共享鎖的思想,是先通過tryReleaseShared()嘗試釋放共享鎖。嘗試成功的話,則通過doReleaseShared()喚醒“其他等待擷取共享鎖的線程”,并傳回true;否則的話,傳回flase。

1. unlock()

public  void unlock() {
    sync.releaseShared();
}
           

說明:該函數實際上調用releaseShared(1)釋放共享鎖。

2. releaseShared()

releaseShared()在AQS中實作,源碼如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
           

說明:releaseShared()的目的是讓目前線程釋放它所持有的共享鎖。

它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接傳回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。

3. tryReleaseShared()

tryReleaseShared()定義在ReentrantReadWriteLock中,源碼如下:

protected final boolean tryReleaseShared(int unused) {
    // 擷取目前線程,即釋放共享鎖的線程。
    Thread current = Thread.currentThread();
    // 如果想要釋放鎖的線程(current)是第1個擷取鎖(firstReader)的線程,
    // 并且“第1個擷取鎖的線程擷取鎖的次數”=1,則設定firstReader為null;
    // 否則,将“第1個擷取鎖的線程的擷取次數”-1。
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == )
            firstReader = null;
        else
            firstReaderHoldCount--;
    // 擷取rh對象,并更新“目前線程擷取鎖的資訊”。
    } else {

        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= ) {
            readHolds.remove();
            if (count <= )
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        // 擷取鎖的狀态
        int c = getState();
        // 将鎖的擷取次數-1。
        int nextc = c - SHARED_UNIT;
        // 通過CAS更新鎖的狀态。
        if (compareAndSetState(c, nextc))
            return nextc == ;
    }
}
           

說明:tryReleaseShared()的作用是嘗試釋放共享鎖。

4. doReleaseShared()

doReleaseShared()定義在AQS中,源碼如下:

private void doReleaseShared() {
    for (;;) {
        // 擷取CLH隊列的頭節點
        Node h = head;
        // 如果頭節點不為null,并且頭節點不等于tail節點。
        if (h != null && h != tail) {
            // 擷取頭節點對應的線程的狀态
            int ws = h.waitStatus;
            // 如果頭節點對應的線程是SIGNAL狀态,則意味着“頭節點的下一個節點所對應的線程”需要被unpark喚醒。
            if (ws == Node.SIGNAL) {
                // 設定“頭節點對應的線程狀态”為空狀态。失敗的話,則繼續循環。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, ))
                    continue;
                // 喚醒“頭節點的下一個節點所對應的線程”。
                unparkSuccessor(h);
            }
            // 如果頭節點對應的線程是空狀态,則設定“檔案點對應的線程所擁有的共享鎖”為其它線程擷取鎖的空狀态。
            else if (ws ==  &&
                     !compareAndSetWaitStatus(h, , Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果頭節點發生變化,則繼續循環。否則,退出循環。
        if (h == head)                   // loop if head changed
            break;
    }
}
           

說明:doReleaseShared()會釋放“共享鎖”。它會從前往後的周遊CLH隊列,依次“喚醒”然後“執行”隊列中每個節點對應的線程;最終的目的是讓這些線程釋放它們所持有的鎖。

公平共享鎖和非公平共享鎖

和互斥鎖ReentrantLock一樣,ReadLock也分為公平鎖和非公平鎖。

公平鎖和非公平鎖的差別,展現在判斷是否需要阻塞的函數readerShouldBlock()是不同的。

公平鎖的readerShouldBlock()的源碼如下:

final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}
           

在公平共享鎖中,如果在目前線程的前面有其他線程在等待擷取共享鎖,則傳回true;否則,傳回false。

非公平鎖的readerShouldBlock()的源碼如下:

final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
           

在非公平共享鎖中,它會無視目前線程的前面是否有其他線程在等待擷取共享鎖。隻要該非公平共享鎖對應的線程不為null,則傳回true。

ReentrantReadWriteLock示例

import java.util.concurrent.locks.ReadWriteLock; 
import java.util.concurrent.locks.ReentrantReadWriteLock; 

public class ReadWriteLockTest1 { 

    public static void main(String[] args) { 
        // 建立賬戶
        MyCount myCount = new MyCount("4238920615242830", ); 
        // 建立使用者,并指定賬戶
        User user = new User("Tommy", myCount); 

        // 分别啟動3個“讀取賬戶金錢”的線程 和 3個“設定賬戶金錢”的線程
        for (int i=; i<; i++) {
            user.getCash();
            user.setCash((i+)*);
        }
    } 
} 

class User {
    private String name;            //使用者名 
    private MyCount myCount;        //所要操作的賬戶 
    private ReadWriteLock myLock;   //執行操作所需的鎖對象 

    User(String name, MyCount myCount) {
        this.name = name; 
        this.myCount = myCount; 
        this.myLock = new ReentrantReadWriteLock();
    }

    public void getCash() {
        new Thread() {
            public void run() {
                myLock.readLock().lock(); 
                try {
                    System.out.println(Thread.currentThread().getName() +" getCash start"); 
                    myCount.getCash();
                    Thread.sleep();
                    System.out.println(Thread.currentThread().getName() +" getCash end"); 
                } catch (InterruptedException e) {
                } finally {
                    myLock.readLock().unlock(); 
                }
            }
        }.start();
    }

    public void setCash(final int cash) {
        new Thread() {
            public void run() {
                myLock.writeLock().lock(); 
                try {
                    System.out.println(Thread.currentThread().getName() +" setCash start"); 
                    myCount.setCash(cash);
                    Thread.sleep();
                    System.out.println(Thread.currentThread().getName() +" setCash end"); 
                } catch (InterruptedException e) {
                } finally {
                    myLock.writeLock().unlock(); 
                }
            }
        }.start();
    }
}

class MyCount {
    private String id;         //賬号 
    private int    cash;       //賬戶餘額 

    MyCount(String id, int cash) { 
        this.id = id; 
        this.cash = cash; 
    } 

    public String getId() { 
        return id; 
    } 

    public void setId(String id) { 
        this.id = id; 
    } 

    public int getCash() { 
        System.out.println(Thread.currentThread().getName() +" getCash cash="+ cash); 
        return cash; 
    } 

    public void setCash(int cash) { 
        System.out.println(Thread.currentThread().getName() +" setCash cash="+ cash); 
        this.cash = cash; 
    } 
}
           

運作結果:

Thread- getCash start
Thread- getCash start
Thread- getCash cash=
Thread- getCash cash=
Thread- getCash end
Thread- getCash end
Thread- setCash start
Thread- setCash cash=
Thread- setCash end
Thread- setCash start
Thread- setCash cash=
Thread- setCash end
Thread- getCash start
Thread- getCash cash=
Thread- getCash end
Thread- setCash start
Thread- setCash cash=
Thread- setCash end
           

結果說明:

(01) 觀察Thread0和Thread-2的運作結果,我們發現,Thread-0啟動并擷取到“讀取鎖”,在它還沒運作完畢的時候,Thread-2也啟動了并且也成功擷取到“讀取鎖”。

是以,“讀取鎖”支援被多個線程同時擷取。

(02) 觀察Thread-1,Thread-3,Thread-5這三個“寫入鎖”的線程。隻要“寫入鎖”被某線程擷取,則該線程運作完畢了,才釋放該鎖。

是以,“寫入鎖”不支援被多個線程同時擷取。

轉載自:http://www.cnblogs.com/skywang12345/p/3505809.html

參考:http://ifeve.com/juc-reentrantreadwritelock/

http://blog.csdn.net/yuhongye111/article/details/39055531