深入學習java源碼之ReadWriteLock.readLock()與ReadWriteLock.writeLock()
假設你的程式中涉及到對一些共享資源的讀和寫操作,且寫操作沒有讀操作那麼頻繁。在沒有寫操作的時候,兩個線程同時讀一個資源沒有任何問題,是以應該允許多個線程能在同時讀取共享資源。但是如果有一個線程想去寫這些共享資源,就不應該再有其它線程對該資源進行讀或寫(譯者注:也就是說:讀-讀能共存,讀-寫不能共存,寫-寫不能共存)。這就需要一個讀/寫鎖來解決這個問題。
對于lock的讀寫鎖,可以通過new ReentrantReadWriteLock()擷取到一個讀寫鎖。所謂讀寫鎖,便是多線程之間讀不互斥,讀寫互斥。讀寫鎖是一種自旋鎖,如果目前沒有讀者,也沒有寫者,那麼寫者可以立刻獲得鎖,否則它必須自旋在那裡,直到沒有任何寫者或讀者。如果目前沒有寫者,那麼讀者可以立即獲得該讀寫鎖,否則讀者必須自旋在那裡,直到寫者釋放該鎖。
我們假設對寫操作的請求比對讀操作的請求更重要,就要提升寫請求的優先級。此外,如果讀操作發生的比較頻繁,我們又沒有提升寫操作的優先級,那麼就會産生“饑餓”現象。請求寫操作的線程會一直阻塞,直到所有的讀線程都從ReadWriteLock上解鎖了。如果一直保證新線程的讀操作權限,那麼等待寫操作的線程就會一直阻塞下去,結果就是發生“饑餓”。是以,隻有當沒有線程正在鎖住ReadWriteLock進行寫操作,且沒有線程請求該鎖準備執行寫操作時,才能保證讀操作繼續。
兩個線程在一起進行讀操作
public class Main {
public static void main(String[] args) {
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final Main m = new Main();
new Thread(){
public void run() {
m.read(Thread.currentThread());
};
}.start();
new Thread(){
public void run() {
m.read(Thread.currentThread());
};
}.start();
}
public void read(Thread thread) {
readWriteLock.readLock().lock();
try {
long startTime = System.currentTimeMillis();
while(System.currentTimeMillis() - startTime <= 1) {
System.out.println(thread.getName()+"線程在進行讀操作");
}
System.out.println(thread.getName()+"線程完成讀操作");
} finally {
readWriteLock.unlock();
}
}
}
可以看到兩個線程同時進行讀操作,效率大大的提升了。但是要注意的是,如果一個線程擷取了讀鎖,那麼另外的線程想要擷取寫鎖則需要等待釋放;而如果一個線程已經擷取了寫鎖,則另外的線程想擷取讀鎖或寫鎖都需要等待寫鎖被釋放。
當其它線程沒有對共享資源進行讀操作或者寫操作時,某個線程就有可能獲得該共享資源的寫鎖,進而對共享資源進行寫操作。有多少線程請求了寫鎖以及以何種順序請求寫鎖并不重要,除非你想保證寫鎖請求的公平性。
public class Main {
//靜态内部類實作線程共享
static class Example{
//建立lock
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//讀操作
public void read(){
//擷取讀鎖并上鎖
lock.readLock().lock();
try{
System.out.println("讀線程開始");
Thread.sleep(1000);
System.out.println("讀線程結束");
}catch (Exception e){
e.printStackTrace();
}finally{
//解鎖
lock.readLock().unlock();
}
}
//寫操作
public void write(){
//擷取寫鎖并上鎖
lock.writeLock().lock();
try{
System.out.println("寫線程開始");
Thread.sleep(1000);
System.out.println("寫線程結束");
}catch (Exception e){
e.printStackTrace();
}finally {
//解鎖
lock.writeLock().unlock();
}
}
}
public static void main(String[] args) {
final Example example = new Example();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
example.read();
example.write();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
example.read();
example.write();
}
}
}).start();
}
}
運作結果:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL2EDOxMDM0QTM2AjMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
根據結果可以發現,多線程下ReentrantReadWriteLock可以多線程同時讀,但寫的話同一時刻隻有一個線程執行。
讀寫鎖的應用場景,我們可以利用讀寫鎖可是實作一個多線程下資料緩存的功能,具體實作思路如下:
class dataCatch{
Object data; //緩存的資料
public volatile Boolean isCatch = false; //是否有緩存
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //生成讀寫鎖
public void process(){
lock.readLock().lock(); //先加讀鎖,此時資料不會被修改
//資料沒有緩存
if(!isCatch){
lock.readLock().unlock(); //解讀鎖
lock.writeLock().lock(); //加寫鎖,此時資料不會被讀到
/********
*
* 執行資料查詢操作并指派給data
*
********/
isCatch = true;
lock.readLock().lock(); //先加讀鎖後解寫鎖
lock.writeLock().lock();
}
/********
*
* 放回data資料給使用者
*
********/
lock.readLock().unlock(); //解讀鎖
}
}
總結:
1.synchronized是Java的關鍵字,是内置特性,而Lock是一個接口,可以用它來實作同步。
2.synchronized同步的時候,其中一條線程用完會自動釋放鎖,而Lock需要手動釋放,如果不手動釋放,可能會造成死鎖。
3.使用synchronized如果其中一個線程不釋放鎖,那麼其他需要擷取鎖的線程會一直等待下去,知道使用完釋放或者出現異常,而Lock可以使用可以響應中斷的鎖或者使用規定等待時間的鎖
4.synchronized無法得知是否擷取到鎖,而Lcok可以做到。
5.用ReadWriteLock可以提高多個線程進行讀操作的效率。
是以綜上所述,在兩種鎖的選擇上,當線程對于資源的競争不激烈的時候,效率差不太多,但是當大量線程同時競争的時候,Lock的性能會遠高于synchronized。
java源碼
A
ReadWriteLock
維護一對關聯的
locks
,一個用于隻讀操作,一個用于寫入。
read lock
可以由多個閱讀器線程同時進行,隻要沒有作者。
write lock
是獨家的。
所有
ReadWriteLock
實作必須保證的存儲器同步效應
writeLock
操作(如在指定
Lock
接口)也保持相對于所述相關聯的
readLock
。 也就是說,一個線程成功擷取讀鎖定将會看到在之前釋出的寫鎖定所做的所有更新。
讀寫鎖允許通路共享資料時的并發性高于互斥鎖所允許的并發性。 它利用了這樣一個事實:一次隻有一個線程( 寫入線程)可以修改共享資料,在許多情況下,任何數量的線程都可以同時讀取資料(是以讀取器線程)。 從理論上講,通過使用讀寫鎖允許的并發性增加将導緻性能改進超過使用互斥鎖。 實際上,并發性的增加隻能在多處理器上完全實作,然後隻有在共享資料的通路模式是合适的時才可以。
讀寫鎖是否會提高使用互斥鎖的性能取決于資料被讀取的頻率與被修改的頻率相比,讀取和寫入操作的持續時間以及資料的争用 - 即是,将嘗試同時讀取或寫入資料的線程數。 例如,最初填充資料的集合,然後經常被修改的頻繁搜尋(例如某種目錄)是使用讀寫鎖的理想候選。 然而,如果更新變得頻繁,那麼資料的大部分時間将被專門鎖定,并且并發性增加很少。 此外,如果讀取操作太短,則讀寫鎖定實作(其本身比互斥鎖更複雜)的開銷可以支配執行成本,特别是因為許多讀寫鎖定實作仍将序列化所有線程通過小部分代碼。 最終,隻有剖析和測量将确定使用讀寫鎖是否适合您的應用程式。
雖然讀寫鎖的基本操作是直接的,但是執行必須做出許多政策決策,這可能會影響給定應用程式中讀寫鎖定的有效性。 這些政策的例子包括:
在評估應用程式的給定實作的适用性時,應考慮所有這些問題。
- 在寫入器釋放寫入鎖定時,确定在讀取器和寫入器都在等待時是否授予讀取鎖定或寫入鎖定。 作家偏好是常見的,因為寫作預計會很短,很少見。 讀者喜好不常見,因為如果讀者經常和長期的預期,寫作可能導緻漫長的延遲。 公平的或“按順序”的實作也是可能的。
- 确定在讀卡器處于活動狀态并且寫入器正在等待時請求讀取鎖定的讀取器是否被授予讀取鎖定。 讀者的偏好可以無限期地拖延作者,而對作者的偏好可以減少并發的潛力。
- 确定鎖是否可重入:一個具有寫鎖的線程是否可以重新擷取? 持有寫鎖可以擷取讀鎖嗎? 讀鎖本身是否可重入?
- 寫入鎖可以降級到讀鎖,而不允許插入寫者? 讀鎖可以更新到寫鎖,優先于其他等待讀者或作者嗎?
Modifier and Type | Method and Description |
---|---|
| 傳回用于閱讀的鎖。 |
| 傳回用于寫入的鎖。 |
package java.util.concurrent.locks;
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
Modifier and Type | Method and Description |
---|---|
| 傳回目前擁有寫鎖的線程,如果不擁有,則傳回 。 |
| 傳回一個包含可能正在等待擷取讀取鎖的線程的集合。 |
| 傳回一個包含可能正在等待擷取讀取或寫入鎖定的線程的集合。 |
| 傳回一個包含可能正在等待擷取寫入鎖的線程的集合。 |
| 傳回等待擷取讀取或寫入鎖定的線程數的估計。 |
| 查詢目前線程對此鎖的可重入讀取保留數。 |
| 查詢為此鎖持有的讀取鎖的數量。 |
| 傳回包含可能在與寫鎖相關聯的給定條件下等待的線程的集合。 |
| 傳回與寫入鎖相關聯的給定條件等待的線程數的估計。 |
| 查詢目前線程對此鎖的可重入寫入數量。 |
| 查詢給定線程是否等待擷取讀取或寫入鎖定。 |
| 查詢是否有任何線程正在等待擷取讀取或寫入鎖定。 |
| 查詢任何線程是否等待與寫鎖相關聯的給定條件。 |
| 如果此鎖的公平設定為true,則傳回 。 |
| 查詢寫鎖是否由任何線程持有。 |
| 查詢寫鎖是否由目前線程持有。 |
| 傳回用于閱讀的鎖。 |
| 傳回一個辨別此鎖的字元串以及其鎖定狀态。 |
| 傳回用于寫入的鎖。 |
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Methods relayed to outer class
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
final int getReadLockCount() {
return sharedCount(getState());
}
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
final int getCount() { return getState(); }
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
// Instrumentation and status
public final boolean isFair() {
return sync instanceof FairSync;
}
protected Thread getOwner() {
return sync.getOwner();
}
public int getReadLockCount() {
return sync.getReadLockCount();
}
public boolean isWriteLocked() {
return sync.isWriteLocked();
}
public boolean isWriteLockedByCurrentThread() {
return sync.isHeldExclusively();
}
public int getWriteHoldCount() {
return sync.getWriteHoldCount();
}
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
protected Collection<Thread> getQueuedWriterThreads() {
return sync.getExclusiveQueuedThreads();
}
protected Collection<Thread> getQueuedReaderThreads() {
return sync.getSharedQueuedThreads();
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public String toString() {
int c = sync.getCount();
int w = Sync.exclusiveCount(c);
int r = Sync.sharedCount(c);
return super.toString() +
"[Write locks = " + w + ", Read locks = " + r + "]";
}
static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long TID_OFFSET;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}
}