ReadWriteLock 和 ReentrantReadWriteLock介紹
ReadWriteLock,顧名思義,是讀寫鎖。它維護了一對相關的鎖 — — “讀取鎖”和“寫入鎖”,一個用于讀取操作,另一個用于寫入操作。
“讀取鎖”用于隻讀操作,它是“共享鎖”,能同時被多個線程擷取。
“寫入鎖”用于寫入操作,它是“獨占鎖”,寫入鎖隻能被一個線程鎖擷取。
注意:不能同時存在讀取鎖和寫入鎖!
ReadWriteLock是一個接口。ReentrantReadWriteLock是它的實作類,ReentrantReadWriteLock包括子類ReadLock和WriteLock。
ReadWriteLock 和 ReentrantReadWriteLock函數清單
ReadWriteLock函數清單
// 傳回用于讀取操作的鎖。
Lock readLock()
// 傳回用于寫入操作的鎖。
Lock writeLock()
ReentrantReadWriteLock函數清單
// 建立一個新的 ReentrantReadWriteLock,預設是采用“非公平政策”。
ReentrantReadWriteLock()
// 建立一個新的 ReentrantReadWriteLock,fair是“公平政策”。fair為true,意味着公平政策;否則,意味着非公平政策。
ReentrantReadWriteLock(boolean fair)
// 傳回目前擁有寫入鎖的線程,如果沒有這樣的線程,則傳回 null。
protected Thread getOwner()
// 傳回一個 collection,它包含可能正在等待擷取讀取鎖的線程。
protected Collection<Thread> getQueuedReaderThreads()
// 傳回一個 collection,它包含可能正在等待擷取讀取或寫入鎖的線程。
protected Collection<Thread> getQueuedThreads()
// 傳回一個 collection,它包含可能正在等待擷取寫入鎖的線程。
protected Collection<Thread> getQueuedWriterThreads()
// 傳回等待擷取讀取或寫入鎖的線程估計數目。
int getQueueLength()
// 查詢目前線程在此鎖上保持的重入讀取鎖數量。
int getReadHoldCount()
// 查詢為此鎖保持的讀取鎖數量。
int getReadLockCount()
// 傳回一個 collection,它包含可能正在等待與寫入鎖相關的給定條件的那些線程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 傳回正等待與寫入鎖相關的給定條件的線程估計數目。
int getWaitQueueLength(Condition condition)
// 查詢目前線程在此鎖上保持的重入寫入鎖數量。
int getWriteHoldCount()
// 查詢是否給定線程正在等待擷取讀取或寫入鎖。
boolean hasQueuedThread(Thread thread)
// 查詢是否所有的線程正在等待擷取讀取或寫入鎖。
boolean hasQueuedThreads()
// 查詢是否有些線程正在等待與寫入鎖有關的給定條件。
boolean hasWaiters(Condition condition)
// 如果此鎖将公平性設定為 ture,則傳回 true。
boolean isFair()
// 查詢是否某個線程保持了寫入鎖。
boolean isWriteLocked()
// 查詢目前線程是否保持了寫入鎖。
boolean isWriteLockedByCurrentThread()
// 傳回用于讀取操作的鎖。
ReentrantReadWriteLock.ReadLock readLock()
// 傳回用于寫入操作的鎖。
ReentrantReadWriteLock.WriteLock writeLock()
ReentrantReadWriteLock資料結構
ReentrantReadWriteLock的UML類圖如下:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISN5YjMzUDMwIzMyQDM1EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
從中可以看出:
(01) ReentrantReadWriteLock實作了ReadWriteLock接口。ReadWriteLock是一個讀寫鎖的接口,提供了”擷取讀鎖的readLock()函數” 和 “擷取寫鎖的writeLock()函數”。
(02) ReentrantReadWriteLock中包含:sync對象,讀鎖readerLock和寫鎖writerLock。讀鎖ReadLock和寫鎖WriteLock都實作了Lock接口。讀鎖ReadLock和寫鎖WriteLock中也都分别包含了”Sync對象”,它們的Sync對象和ReentrantReadWriteLock的Sync對象 是一樣的,就是通過sync,讀鎖和寫鎖實作了對同一個對象的通路。
(03) 和”ReentrantLock”一樣,sync是Sync類型;而且,Sync也是一個繼承于AQS的抽象類。Sync也包括”公平鎖”FairSync和”非公平鎖”NonfairSync。sync對象是”FairSync”和”NonfairSync”中的一個,預設是”NonfairSync”。
參考代碼(基于JDK1.7.0_40)
共享鎖源碼相關的代碼如下:
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -L;
// ReentrantReadWriteLock的AQS對象
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 擷取“共享鎖”
public void lock() {
sync.acquireShared();
}
// 如果線程是中斷狀态,則抛出一場,否則嘗試擷取共享鎖。
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
// 嘗試擷取“共享鎖”
public boolean tryLock() {
return sync.tryReadLock();
}
// 在指定時間内,嘗試擷取“共享鎖”
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(, unit.toNanos(timeout));
}
// 釋放“共享鎖”
public void unlock() {
sync.releaseShared();
}
// 建立條件
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
說明:
ReadLock中的sync是一個Sync對象,Sync繼承于AQS類,即Sync就是一個鎖。ReentrantReadWriteLock中也有一個Sync對象,而且ReadLock中的sync和ReentrantReadWriteLock中的sync是對應關系。即ReentrantReadWriteLock和ReadLock共享同一個AQS對象,共享同一把鎖。
ReentrantReadWriteLock中Sync的定義如下:
final Sync sync;
下面,分别從“擷取共享鎖”和“釋放共享鎖”兩個方面對共享鎖進行說明。
擷取共享鎖
擷取共享鎖的思想(即lock函數的步驟),是先通過tryAcquireShared()嘗試擷取共享鎖。嘗試成功的話,則直接傳回;嘗試失敗的話,則通過doAcquireShared()不斷的循環并嘗試擷取鎖,若有需要,則阻塞等待。doAcquireShared()在循環中每次嘗試擷取鎖時,都是通過tryAcquireShared()來進行嘗試的。下面看看“擷取共享鎖”的詳細流程。
1. lock()
lock()在ReadLock中,源碼如下:
public void lock() {
sync.acquireShared();
}
2. acquireShared()
Sync繼承于AQS,acquireShared()定義在AQS中。源碼如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < )
doAcquireShared(arg);
}
說明:acquireShared()首先會通過tryAcquireShared()來嘗試擷取鎖。
嘗試成功的話,則不再做任何動作(因為已經成功擷取到鎖了)。
嘗試失敗的話,則通過doAcquireShared()來擷取鎖。doAcquireShared()會擷取到鎖了才傳回。
3. tryAcquireShared()
tryAcquireShared()定義在ReentrantReadWriteLock.java的Sync中,源碼如下:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 擷取“鎖”的狀态
int c = getState();
// 如果“鎖”是“互斥鎖”,并且擷取鎖的線程不是current線程;則傳回-1。
if (exclusiveCount(c) != &&
getExclusiveOwnerThread() != current)
return -;
// 擷取“讀取鎖”的共享計數
int r = sharedCount(c);
// 如果“不需要阻塞等待”,并且“讀取鎖”的共享計數小于MAX_COUNT;
// 則通過CAS函數更新“鎖的狀态”,将“讀取鎖”的共享計數+1。
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 第1次擷取“讀取鎖”。
if (r == ) {
firstReader = current;
firstReaderHoldCount = ;
// 如果想要擷取鎖的線程(current)是第1個擷取鎖(firstReader)的線程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// HoldCounter是用來統計該線程擷取“讀取鎖”的次數。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == )
readHolds.set(rh);
// 将該線程擷取“讀取鎖”的次數+1。
rh.count++;
}
return ;
}
return fullTryAcquireShared(current);
}
說明:tryAcquireShared()的作用是嘗試擷取“共享鎖”。
如果在嘗試擷取鎖時,“不需要阻塞等待”并且“讀取鎖的共享計數小于MAX_COUNT”,則直接通過CAS函數更新“讀取鎖的共享計數”,以及将“目前線程擷取讀取鎖的次數+1”。
否則,通過fullTryAcquireShared()擷取讀取鎖。
4. fullTryAcquireShared()
fullTryAcquireShared()在ReentrantReadWriteLock中定義,源碼如下:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 擷取“鎖”的狀态
int c = getState();
// 如果“鎖”是“互斥鎖”,并且擷取鎖的線程不是current線程;則傳回-1。
if (exclusiveCount(c) != ) {
if (getExclusiveOwnerThread() != current)
return -;
// 如果“需要阻塞等待”。
// (01) 當“需要阻塞等待”的線程是第1個擷取鎖的線程的話,則繼續往下執行。
// (02) 當“需要阻塞等待”的線程擷取鎖的次數=0時,則傳回-1。
} else if (readerShouldBlock()) {
// 如果想要擷取鎖的線程(current)是第1個擷取鎖(firstReader)的線程
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == )
readHolds.remove();
}
}
// 如果目前線程擷取鎖的計數=0,則傳回-1。
if (rh.count == )
return -;
}
}
// 如果“不需要阻塞等待”,則擷取“讀取鎖”的共享統計數;
// 如果共享統計數超過MAX_COUNT,則抛出異常。
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 将線程擷取“讀取鎖”的次數+1。
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第1次擷取“讀取鎖”,則更新firstReader和firstReaderHoldCount。
if (sharedCount(c) == ) {
firstReader = current;
firstReaderHoldCount = ;
// 如果想要擷取鎖的線程(current)是第1個擷取鎖(firstReader)的線程,
// 則将firstReaderHoldCount+1。
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == )
readHolds.set(rh);
// 更新線程的擷取“讀取鎖”的共享計數
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return ;
}
}
}
說明:fullTryAcquireShared()會根據“是否需要阻塞等待”,“讀取鎖的共享計數是否超過限制”等等進行處理。如果不需要阻塞等待,并且鎖的共享計數沒有超過限制,則通過CAS嘗試擷取鎖,并傳回1。
5. doAcquireShared()
doAcquireShared()定義在AQS函數中,源碼如下:
private void doAcquireShared(int arg) {
// addWaiter(Node.SHARED)的作用是,建立“目前線程”對應的節點,并将該線程添加到CLH隊列中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 擷取“node”的前一節點
final Node p = node.predecessor();
// 如果“目前線程”是CLH隊列的表頭,則嘗試擷取共享鎖。
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= ) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果“目前線程”不是CLH隊列的表頭,則通過shouldParkAfterFailedAcquire()判斷是否需要等待,
// 需要的話,則通過parkAndCheckInterrupt()進行阻塞等待。若阻塞等待過程中,線程被中斷過,則設定interrupted為true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
說明:doAcquireShared()的作用是擷取共享鎖。
它會首先建立線程對應的CLH隊列的節點,然後将該節點添加到CLH隊列中。CLH隊列是管理擷取鎖的等待線程的隊列。
如果“目前線程”是CLH隊列的表頭,則嘗試擷取共享鎖;否則,則需要通過shouldParkAfterFailedAcquire()判斷是否阻塞等待,需要的話,則通過parkAndCheckInterrupt()進行阻塞等待。
doAcquireShared()會通過for循環,不斷的進行上面的操作;目的就是擷取共享鎖。需要注意的是:doAcquireShared()在每一次嘗試擷取鎖時,是通過tryAcquireShared()來執行的!
shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()等函數已經在“Java多線程系列–“JUC鎖”03之 公平鎖(一) ”中詳細介紹過,這裡就不再重複說明了。
釋放共享鎖
釋放共享鎖的思想,是先通過tryReleaseShared()嘗試釋放共享鎖。嘗試成功的話,則通過doReleaseShared()喚醒“其他等待擷取共享鎖的線程”,并傳回true;否則的話,傳回flase。
1. unlock()
public void unlock() {
sync.releaseShared();
}
說明:該函數實際上調用releaseShared(1)釋放共享鎖。
2. releaseShared()
releaseShared()在AQS中實作,源碼如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
說明:releaseShared()的目的是讓目前線程釋放它所持有的共享鎖。
它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接傳回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。
3. tryReleaseShared()
tryReleaseShared()定義在ReentrantReadWriteLock中,源碼如下:
protected final boolean tryReleaseShared(int unused) {
// 擷取目前線程,即釋放共享鎖的線程。
Thread current = Thread.currentThread();
// 如果想要釋放鎖的線程(current)是第1個擷取鎖(firstReader)的線程,
// 并且“第1個擷取鎖的線程擷取鎖的次數”=1,則設定firstReader為null;
// 否則,将“第1個擷取鎖的線程的擷取次數”-1。
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == )
firstReader = null;
else
firstReaderHoldCount--;
// 擷取rh對象,并更新“目前線程擷取鎖的資訊”。
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= ) {
readHolds.remove();
if (count <= )
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 擷取鎖的狀态
int c = getState();
// 将鎖的擷取次數-1。
int nextc = c - SHARED_UNIT;
// 通過CAS更新鎖的狀态。
if (compareAndSetState(c, nextc))
return nextc == ;
}
}
說明:tryReleaseShared()的作用是嘗試釋放共享鎖。
4. doReleaseShared()
doReleaseShared()定義在AQS中,源碼如下:
private void doReleaseShared() {
for (;;) {
// 擷取CLH隊列的頭節點
Node h = head;
// 如果頭節點不為null,并且頭節點不等于tail節點。
if (h != null && h != tail) {
// 擷取頭節點對應的線程的狀态
int ws = h.waitStatus;
// 如果頭節點對應的線程是SIGNAL狀态,則意味着“頭節點的下一個節點所對應的線程”需要被unpark喚醒。
if (ws == Node.SIGNAL) {
// 設定“頭節點對應的線程狀态”為空狀态。失敗的話,則繼續循環。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, ))
continue;
// 喚醒“頭節點的下一個節點所對應的線程”。
unparkSuccessor(h);
}
// 如果頭節點對應的線程是空狀态,則設定“檔案點對應的線程所擁有的共享鎖”為其它線程擷取鎖的空狀态。
else if (ws == &&
!compareAndSetWaitStatus(h, , Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果頭節點發生變化,則繼續循環。否則,退出循環。
if (h == head) // loop if head changed
break;
}
}
說明:doReleaseShared()會釋放“共享鎖”。它會從前往後的周遊CLH隊列,依次“喚醒”然後“執行”隊列中每個節點對應的線程;最終的目的是讓這些線程釋放它們所持有的鎖。
公平共享鎖和非公平共享鎖
和互斥鎖ReentrantLock一樣,ReadLock也分為公平鎖和非公平鎖。
公平鎖和非公平鎖的差別,展現在判斷是否需要阻塞的函數readerShouldBlock()是不同的。
公平鎖的readerShouldBlock()的源碼如下:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
在公平共享鎖中,如果在目前線程的前面有其他線程在等待擷取共享鎖,則傳回true;否則,傳回false。
非公平鎖的readerShouldBlock()的源碼如下:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
在非公平共享鎖中,它會無視目前線程的前面是否有其他線程在等待擷取共享鎖。隻要該非公平共享鎖對應的線程不為null,則傳回true。
ReentrantReadWriteLock示例
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest1 {
public static void main(String[] args) {
// 建立賬戶
MyCount myCount = new MyCount("4238920615242830", );
// 建立使用者,并指定賬戶
User user = new User("Tommy", myCount);
// 分别啟動3個“讀取賬戶金錢”的線程 和 3個“設定賬戶金錢”的線程
for (int i=; i<; i++) {
user.getCash();
user.setCash((i+)*);
}
}
}
class User {
private String name; //使用者名
private MyCount myCount; //所要操作的賬戶
private ReadWriteLock myLock; //執行操作所需的鎖對象
User(String name, MyCount myCount) {
this.name = name;
this.myCount = myCount;
this.myLock = new ReentrantReadWriteLock();
}
public void getCash() {
new Thread() {
public void run() {
myLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() +" getCash start");
myCount.getCash();
Thread.sleep();
System.out.println(Thread.currentThread().getName() +" getCash end");
} catch (InterruptedException e) {
} finally {
myLock.readLock().unlock();
}
}
}.start();
}
public void setCash(final int cash) {
new Thread() {
public void run() {
myLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() +" setCash start");
myCount.setCash(cash);
Thread.sleep();
System.out.println(Thread.currentThread().getName() +" setCash end");
} catch (InterruptedException e) {
} finally {
myLock.writeLock().unlock();
}
}
}.start();
}
}
class MyCount {
private String id; //賬号
private int cash; //賬戶餘額
MyCount(String id, int cash) {
this.id = id;
this.cash = cash;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getCash() {
System.out.println(Thread.currentThread().getName() +" getCash cash="+ cash);
return cash;
}
public void setCash(int cash) {
System.out.println(Thread.currentThread().getName() +" setCash cash="+ cash);
this.cash = cash;
}
}
運作結果:
Thread- getCash start
Thread- getCash start
Thread- getCash cash=
Thread- getCash cash=
Thread- getCash end
Thread- getCash end
Thread- setCash start
Thread- setCash cash=
Thread- setCash end
Thread- setCash start
Thread- setCash cash=
Thread- setCash end
Thread- getCash start
Thread- getCash cash=
Thread- getCash end
Thread- setCash start
Thread- setCash cash=
Thread- setCash end
結果說明:
(01) 觀察Thread0和Thread-2的運作結果,我們發現,Thread-0啟動并擷取到“讀取鎖”,在它還沒運作完畢的時候,Thread-2也啟動了并且也成功擷取到“讀取鎖”。
是以,“讀取鎖”支援被多個線程同時擷取。
(02) 觀察Thread-1,Thread-3,Thread-5這三個“寫入鎖”的線程。隻要“寫入鎖”被某線程擷取,則該線程運作完畢了,才釋放該鎖。
是以,“寫入鎖”不支援被多個線程同時擷取。
轉載自:http://www.cnblogs.com/skywang12345/p/3505809.html
參考:http://ifeve.com/juc-reentrantreadwritelock/
http://blog.csdn.net/yuhongye111/article/details/39055531