簡介
在計算機行業有一個定律叫"摩爾定律",在此定律下,計算機的性能突飛猛進,而且價格也随之越來越便宜,cpu從單核到了多核,緩存性能也得到了很大提升,尤其是多核cpu技術的到來,計算機同一時刻可以處理多個任務。在硬體層面的發展帶來的效率極大提升中,軟體層面的多線程程式設計已經成為必然趨勢,然而多線程程式設計就會引入資料安全性問題,有矛必有盾,于是發明了“鎖”來解決線程安全問題。在這篇文章中,總結了java中幾把經典的JVM級别的鎖。
synchronized
synchronized關鍵字是一把經典的鎖,也是我們平時用得最多的。在jdk1.6之前,syncronized是一把重量級的鎖,不過随着jdk的更新,也在對它進行不斷的優化,如今它變得不那麼重了,甚至在某些場景下,它的性能反而優于輕量級鎖。在加了syncronized關鍵字的方法、代碼塊中,一次隻允許一個線程進入特定代碼段,進而避免多線程同時修改同一資料。
synchronized鎖有如下幾個特點:
a、有鎖更新過程
在jdk1.5(含)之前,synchronized的底層實作是重量級的,是以之前一緻稱呼它為"重量級鎖",在jdk1.5之後,對synchronized進行了各種優化,它變得不那麼重了,實作原理就是鎖更新的過程。我們先聊聊1.5之後的synchronized實作原理是怎樣的。說到synchronized加鎖原理,就不得不先說java對象在記憶體中的布局,java對象記憶體布局如下:

如上圖所示,在建立一個對象後,在JVM虛拟機(HotSpot)中,對象在Java記憶體中的存儲布局 可分為三塊:
**(1)對象頭區域
**此處存儲的資訊包括兩部分:
- 對象自身的運作時資料(MarkWord)
存儲hashCode、GC分代年齡、鎖類型标記、偏向鎖線程ID、CAS鎖指向線程LockRecord的指針等,synconized鎖的機制與這個部分(markwork)密切相關,用markword中最低的三位代表鎖的狀态,其中一位是偏向鎖位,另外兩位是普通鎖位
- 對象類型指針(Class Pointer)
對象指向它的類中繼資料的指針、JVM就是通過它來确定是哪個Class的執行個體
(2)執行個體資料區域
此處存儲的是對象真正有效的資訊,比如對象中所有字段的内容
(3)對齊填充區域
JVM的實作HostSpot規定對象的起始位址必須是8位元組的整數倍,換句話來說,現在64位的OS往外讀取資料的時候一次性讀取64bit整數倍的資料,也就是8個位元組,是以HotSpot為了高效讀取對象,就做了"對齊",如果一個對象實際占的記憶體大小不是8byte的整數倍時,就"補位"到8byte的整數倍。是以對齊填充區域的大小不是固定的。
當線程進入到synchronized處嘗試擷取該鎖時,synchronized鎖更新流程如下:
如上圖所示,synchronized鎖更新的順序為: 偏向鎖->輕量級鎖->重量級鎖,每一步觸發鎖更新的情況如下:
偏向鎖
在JDK1.8中,其實預設是輕量級鎖,但如果設定了-XX:BiasedLockingStartupDelay = 0,那在對一個Object做syncronized的時候,會立即上一把偏向鎖。當處于偏向鎖狀态時,markwork會記錄目前線程ID
更新到輕量級鎖
當下一個線程參與到偏向鎖競争時,會先判斷markword中儲存的線程ID是否與這個線程ID相等,如果不相等,會立即撤銷偏向鎖,更新為輕量級鎖。每個線程在自己的線程棧中生成一個LockRecord(LR),然後每個線程通過CAS(自旋)的操作将鎖對象頭中的markwork設定為指向自己的LR的指針,哪個線程設定成功,就意味着獲得鎖。關于synchronized中此時執行的CAS操作是通過native的調用HotSpot中bytecodeInterpreter.cpp檔案C++代碼實作的,有興趣的可以繼續深挖
更新到重量級鎖
如果鎖競争加劇(如線程自旋次數或者自旋的線程數超過某門檻值,JDK1.6之後,由JVM自己控制改規則),就會更新為重量級鎖。此時就會向作業系統申請資源,線程挂起,進入到作業系統核心态的等待隊列中,等待作業系統排程,然後映射回使用者态。在重量級鎖中,由于需要做核心态到使用者态的轉換,而這個過程中需要消耗較多時間,也就是"重"的原因之一。
b、可重入
synchronized擁有強制原子性的内部鎖機制,是一把可重入鎖。是以,在一個線程使用synchronized方法時調用該對象另一個synchronized方法,即一個線程得到一個對象鎖後再次請求該對象鎖,是永遠可以拿到鎖的。在Java中線程獲得對象鎖的操作是以線程為機關的,而不是以調用為機關的。synchronized鎖的對象頭的markwork中會記錄該鎖的線程持有者和計數器,當一個線程請求成功後,JVM會記下持有鎖的線程,并将計數器計為1。此時其他線程請求該鎖,則必須等待。而該持有鎖的線程如果再次請求這個鎖,就可以再次拿到這個鎖,同時計數器會遞增。當線程退出一個synchronized方法/塊時,計數器會遞減,如果計數器為0則釋放該鎖鎖。
c、悲觀鎖(互斥鎖、排他鎖)
synchronized是一把悲觀鎖(獨占鎖),目前線程如果擷取到鎖,會導緻其它所有需要鎖該的線程等待,一直等待持有鎖的線程釋放鎖才繼續進行鎖的争搶
ReentrantLock**ReentrantLock從字面可以看出是一把可重入鎖,這點和synchronized一樣,但實作原理也與syncronized有很大差别,它是基于經典的AQS(AbstractQueueSyncronized)實作的,AQS是基于volitale和CAS實作的,其中AQS中維護一個valitale類型的變量state來做一個可重入鎖的重入次數,加鎖和釋放鎖也是圍繞這個變量來進行的。ReentrantLock也提供了一些synchronized沒有的特點,是以比synchronized好用
AQS模型如下圖:
ReentrantLock有如下特點:
a、可重入
ReentrantLock和syncronized關鍵字一樣,都是可重入鎖,不過兩者實作原理稍有差别,RetrantLock利用AQS的的state狀态來判斷資源是否已鎖,同一線程重入加鎖,state的狀态+1; 同一線程重入解鎖,state狀态-1(解鎖必須為目前獨占線程,否則異常); 當state為0時解鎖成功。
b、需要手動加鎖、解鎖
synchronized關鍵字是自動進行加鎖、解鎖的,而ReentrantLock需要lock()和unlock()方法配合try/finally語句塊來完成,來手動加鎖、解鎖
c、支援設定鎖的逾時時間
synchronized關鍵字無法設定鎖的逾時時間,如果一個獲得鎖的線程内部發生死鎖,那麼其他線程就會一直進入阻塞狀态,而ReentrantLock提供tryLock方法,允許設定線程擷取鎖的逾時時間,如果逾時,則跳過,不進行任何操作,避免死鎖的發生
d、支援公平/非公平鎖
synchronized關鍵字是一種非公平鎖,先搶到鎖的線程先執行。而ReentrantLock的構造方法中允許設定true/false來實作公平、非公平鎖,如果設定為true,則線程擷取鎖要遵循"先來後到"的規則,每次都會構造一個線程Node,然後到雙向連結清單的"尾巴"後面排隊,等待前面的Node釋放鎖資源。
e、可中斷鎖
ReentrantLock中的lockInterruptibly()方法使得線程可以在被阻塞時響應中斷,比如一個線程t1通過lockInterruptibly()方法擷取到一個可重入鎖,并執行一個長時間的任務,另一個線程通過interrupt()方法就可以立刻打斷t1線程的執行,來擷取t1持有的那個可重入鎖。而通過ReentrantLock的lock()方法或者Synchronized持有鎖的線程是不會響應其他線程的interrupt()方法的,直到該方法主動釋放鎖之後才會響應interrupt()方法。
ReentrantReadWriteLock
ReentrantReadWriteLock(讀寫鎖)其實是兩把鎖,一把是WriteLock(寫鎖),一把是讀鎖,ReadLock。讀寫鎖的規則是:讀讀不互斥、讀寫互斥、寫寫互斥。在一些實際的場景中,讀操作的頻率遠遠高于寫操作,如果直接用一般的鎖進行并發控制的話,就會讀讀互斥、讀寫互斥、寫寫互斥,效率低下,讀寫鎖的産生就是為了優化這種場景的操作效率。一般情況下獨占鎖的效率低來源于高并發下對臨界區的激烈競争導緻線程上下文切換。是以當并發不是很高的情況下,讀寫鎖由于需要額外維護讀鎖的狀态,可能還不如獨占鎖的效率高。是以需要根據實際情況選擇使用。ReentrantReadWriteLock的原理也是基于AQS進行實作的,與ReentrantLock的差别在于ReentrantReadWriteLock鎖擁有共享鎖、排他鎖屬性。讀寫鎖中的加鎖、釋放鎖也是基于Sync(繼承于AQS),并且主要使用AQS中的state和node中的waitState變量進行實作的。實作讀寫鎖與實作普通互斥鎖的主要差別在于需要分别記錄讀鎖狀态及寫鎖狀态,并且等待隊列中需要差別處理兩種加鎖操作。ReentrantReadWriteLock中将AQS中的int類型的state分為高16位與第16位分别記錄讀鎖和寫鎖的狀态,如下圖所示:
a、WriteLock(寫鎖)是悲觀鎖(排他鎖、互斥鎖)
通過計算 state&((1<<16)-1),将state的高16位全部抹去,是以state的低位記錄着寫鎖的重入計數
擷取寫鎖源碼:
/**
* 擷取寫鎖
Acquires the write lock.
* 如果此時沒有任何線程持有寫鎖或者讀鎖,那麼目前線程執行CAS操作更新status,
* 若更新成功,則設定讀鎖重入次數為1,并立即傳回
* <p>Acquires the write lock if neither the read nor write lock
* are held by another thread
* and returns immediately, setting the write lock hold count to
* one.
* 如果目前線程已經持有該寫鎖,那麼将寫鎖持有次數設定為1,并立即傳回
* <p>If the current thread already holds the write lock then the
* hold count is incremented by one and the method returns
* immediately.
* 如果該鎖已經被另外一個線程持有,那麼停止該線程的CPU排程并進入休眠狀态,
* 直到該寫鎖被釋放,且成功将寫鎖持有次數設定為1才表示擷取寫鎖成功
* <p>If the lock is held by another thread then the current
* thread becomes disabled for thread scheduling purposes and
* lies dormant until the write lock has been acquired, at which
* time the write lock hold count is set to one.
*/
public void lock() {
sync.acquire(1);
}
/**
* 該方法為以獨占模式擷取鎖,忽略中斷
* 如果調用一次該“tryAcquire”方法更新status成功,則直接傳回,代表搶鎖成功
* 否則,将會進入同步隊列等待,不斷執行“tryAcquire”方法嘗試CAS更新status狀态,直到成功搶到鎖
* 其中“tryAcquire”方法在NonfairSync(公平鎖)中和FairSync(非公平鎖)中都有各自的實作
*
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1、如果讀寫鎖的計數不為0,且持有鎖的線程不是目前線程,則傳回false
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2、如果持有鎖的計數不為0且計數總數超過限定的最大值,也傳回false
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3、如果該鎖是可重入或該線程在隊列中的政策是允許它嘗試搶鎖,那麼該線程就能擷取鎖
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
//擷取讀寫鎖的狀态
int c = getState();
//擷取該寫鎖重入的次數
int w = exclusiveCount(c);
//如果讀寫鎖狀态不為0,說明已經有其他線程擷取了讀鎖或寫鎖
if (c != 0) {
//如果寫鎖重入次數為0,說明有線程擷取到讀鎖,根據“讀寫鎖互斥”原則,傳回false
//或者如果寫鎖重入次數不為0,且擷取寫鎖的線程不是目前線程,根據"寫鎖獨占"原則,傳回false
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果寫鎖可重入次數超過最大次數(65535),則抛異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//到這裡說明該線程是重入寫鎖,更新重入寫鎖的計數(+1),傳回true
// Reentrant acquire
setState(c + acquires);
return true;
}
//如果讀寫鎖狀态為0,說明讀鎖和寫鎖都沒有被擷取,會走下面兩個分支:
//如果要阻塞或者執行CAS操作更新讀寫鎖的狀态失敗,則傳回false
//如果不需要阻塞且CAS操作成功,則目前線程成功拿到鎖,設定鎖的owner為目前線程,傳回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
釋放寫鎖源碼:
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
//若鎖的持有者不是目前線程,抛出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//寫鎖的可重入計數減掉releases個
int nextc = getState() - releases;
//如果寫鎖重入計數為0了,則說明寫鎖被釋放了
boolean free = exclusiveCount(nextc) == 0;
if (free)
//若寫鎖被釋放,則将鎖的持有者設定為null,進行GC
setExclusiveOwnerThread(null);
//更新寫鎖的重入計數
setState(nextc);
return free;
}
b、ReadLock(讀鎖)是共享鎖(樂觀鎖)
通過計算 state>>>16 進行無符号補0,右移16位,是以state的高位記錄着寫鎖的重入計數
讀鎖擷取鎖的過程比寫鎖稍微複雜些,首先判斷寫鎖是否為0并且目前線程不占有獨占鎖,直接傳回;否則,判斷讀線程是否需要被阻塞并且讀鎖數量是否小于最大值并且比較設定狀态成功,若目前沒有讀鎖,則設定第一個讀線程firstReader和firstReaderHoldCount;若目前線程線程為第一個讀線程,則增加firstReaderHoldCount;否則,将設定目前線程對應的HoldCounter對象的值,更新成功後會在firstReaderHoldCount中readHolds(ThreadLocal類型的)的本線程副本中記錄目前線程重入數,這是為了實作jdk1.6中加入的getReadHoldCount()方法的,這個方法能擷取目前線程重入共享鎖的次數(state中記錄的是多個線程的總重入次數),加入了這個方法讓代碼複雜了不少,但是其原理還是很簡單的:如果目前隻有一個線程的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變量裡存重入數,當有第二個線程來的時候,就要動用ThreadLocal變量readHolds了,每個線程擁有自己的副本,用來儲存自己的重入數。
擷取讀鎖源碼:
/**
* 擷取讀鎖
* Acquires the read lock.
* 如果寫鎖未被其他線程持有,執行CAS操作更新status值,擷取讀鎖後立即傳回
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* 如果寫鎖被其他線程持有,那麼停止該線程的CPU排程并進入休眠狀态,直到該讀鎖被釋放
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}
/**
* 該方法為以共享模式擷取讀鎖,忽略中斷
* 如果調用一次該“tryAcquireShared”方法更新status成功,則直接傳回,代表搶鎖成功
* 否則,将會進入同步隊列等待,不斷執行“tryAcquireShared”方法嘗試CAS更新status狀态,直到成功搶到鎖
* 其中“tryAcquireShared”方法在NonfairSync(公平鎖)中和FairSync(非公平鎖)中都有各自的實作
* (看這注釋是不是和寫鎖很對稱)
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1、如果已經有其他線程擷取到了寫鎖,根據“讀寫互斥”原則,搶鎖失敗,傳回-1
* 1.If write lock held by another thread, fail.
* 2、如果該線程本身持有寫鎖,那麼看一下是否要readerShouldBlock,如果不需要阻塞,
* 則執行CAS操作更新state和重入計數。
* 這裡要注意的是,上面的步驟不檢查是否可重入(因為讀鎖屬于共享鎖,天生支援可重入)
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3、如果因為CAS更新status失敗或者重入計數超過最大值導緻步驟2執行失敗
* 那就進入到fullTryAcquireShared方法進行死循環,直到搶鎖成功
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
//目前嘗試擷取讀鎖的線程
Thread current = Thread.currentThread();
//擷取該讀寫鎖狀态
int c = getState();
//如果有線程擷取到了寫鎖 ,且擷取寫鎖的不是目前線程則傳回失敗
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//擷取讀鎖的重入計數
int r = sharedCount(c);
//如果讀線程不應該被阻塞,且重入計數小于最大值,且CAS執行讀鎖重入計數+1成功,則執行線程重入的計數加1操作,傳回成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//如果還未有線程擷取到讀鎖,則将firstReader設定為目前線程,firstReaderHoldCount設定為1
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果firstReader是目前線程,則将firstReader的重入計數變量firstReaderHoldCount加1
firstReaderHoldCount++;
} else {
//否則說明有至少兩個線程共享讀鎖,擷取共享鎖重入計數器HoldCounter
//從HoldCounter中拿到目前線程的線程變量cachedHoldCounter,将此線程的重入計數count加1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//如果上面的if條件有一個都不滿足,則進入到這個方法裡進行死循環重新擷取
return fullTryAcquireShared(current);
}
/**
* 用于處理CAS操作state失敗和tryAcquireShared中未執行擷取可重入鎖動作的full方法(補償方法?)
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* 此代碼與tryAcquireShared中的代碼有部分相似的地方,
* 但總體上更簡單,因為不會使tryAcquireShared與重試和延遲讀取保持計數之間的複雜判斷
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
//死循環
for (;;) {
//擷取讀寫鎖狀态
int c = getState();
//如果有線程擷取到了寫鎖
if (exclusiveCount(c) != 0) {
//如果擷取寫鎖的線程不是目前線程,傳回失敗
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//如果沒有線程擷取到寫鎖,且讀線程要阻塞
// Make sure we're not acquiring read lock reentrantly
//如果目前線程為第一個擷取到讀鎖的線程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else { //如果目前線程不是第一個擷取到讀鎖的線程(也就是說至少有有一個線程擷取到了讀鎖)
//
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
/**
*下面是既沒有線程擷取寫鎖,目前線程又不需要阻塞的情況
*/
//重入次數等于最大重入次數,抛異常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//如果執行CAS操作成功将讀寫鎖的重入計數加1,則對目前持有這個共享讀鎖的線程的重入計數加1,然後傳回成功
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
釋放讀鎖源碼:
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試釋放一次共享鎖計數
doReleaseShared();//真正釋放鎖
return true;
}
return false;
}
/**
*此方法表示讀鎖線程釋放鎖。
*首先判斷目前線程是否為第一個讀線程firstReader,
*若是,則判斷第一個讀線程占有的資源數firstReaderHoldCount是否為1,
若是,則設定第一個讀線程firstReader為空,否則,将第一個讀線程占有的資源數firstReaderHoldCount減1;
若目前線程不是第一個讀線程,
那麼首先會擷取緩存計數器(上一個讀鎖線程對應的計數器 ),
若計數器為空或者tid不等于目前線程的tid值,則擷取目前線程的計數器,
如果計數器的計數count小于等于1,則移除目前線程對應的計數器,
如果計數器的計數count小于等于0,則抛出異常,之後再減少計數即可。
無論何種情況,都會進入死循環,該循環可以確定成功設定狀态state
*/
protected final boolean tryReleaseShared(int unused) {
// 擷取目前線程
Thread current = Thread.currentThread();
if (firstReader == current) { // 目前線程為第一個讀線程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 讀線程占用的資源數為1
firstReader = null;
else // 減少占用的資源
firstReaderHoldCount--;
} else { // 目前線程不為第一個讀線程
// 擷取緩存的計數器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 計數器為空或者計數器的tid不為目前正在運作的線程的tid
// 擷取目前線程對應的計數器
rh = readHolds.get();
// 擷取計數
int count = rh.count;
if (count <= 1) { // 計數小于等于1
// 移除
readHolds.remove();
if (count <= 0) // 計數小于等于0,抛出異常
throw unmatchedUnlockException();
}
// 減少計數
--rh.count;
}
for (;;) { // 死循環
// 擷取狀态
int c = getState();
// 擷取狀态
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 比較并進行設定
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
/**真正釋放鎖
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
通過分析可以看出:
線上程持有讀鎖的情況下,該線程不能取得寫鎖(因為擷取寫鎖的時候,如果發現目前的讀鎖被占用,就馬上擷取失敗,不管讀鎖是不是被目前線程持有)。
線上程持有寫鎖的情況下,該線程可以繼續擷取讀鎖(擷取讀鎖時如果發現寫鎖被占用,隻有寫鎖沒有被目前線程占用的情況才會擷取失敗)。
LongAdder
在高并發的情況下,我們對一個Integer類型的整數直接進行i++的時候,無法保證操作的原子性,會出現線程安全的問題。為此我們會用juc下的AtomicInteger,它是一個提供原子操作的Interger類,内部也是通過CAS實作線程安全的。但當大量線程同時去通路時,就會因為大量線程執行CAS操作失敗而進行空旋轉,導緻CPU資源消耗過多,而且執行效率也不高。Doug Lea大神應該也不滿意,于是在JDK1.8中對CAS進行了優化,提供了LongAdder,它是基于了CAS分段鎖的思想實作的。
線程去讀寫一個LongAdder類型的變量時,流程如下:
LongAdder也是基于Unsafe提供的CAS操作+valitale去實作的。在LongAdder的父類Striped64中維護着一個base變量和一個cell數組,當多個線程操作一個變量的時候,先會在這個base變量上進行cas操作,當它發現線程增多的時候,就會使用cell數組。比如當base将要更新的時候發現線程增多(也就是調用casBase方法更新base值失敗),那麼它會自動使用cell數組,每一個線程對應于一個cell,在每一個線程中對該cell進行cas操作,這樣就可以将單一value的更新壓力分擔到多個value中去,降低單個value的 “熱度”,同時也減少了線程大量線程的空轉,提高并發效率,分散并發壓力。這種分段鎖需要額外維護一個記憶體空間cells,不過在高并發場景下,這點成本幾乎可以忽略。分段鎖是一種優秀的優化思想,juc中提供的的ConcurrentHashMap也是基于分段鎖保證讀寫操作的線程安全。
作者資訊:
夏傑 ,花名楚昭,現就職于阿裡巴巴企業智能事業部 BUC&ACL&SSO 團隊,面向阿裡巴巴集團提供人員賬号的權限管控、應用資料安全通路治理,并通過現有的技術沉澱與領域模型,緻力于打造 To B、To G 領域的應用資訊化架構的基礎設施 SAAS 産品 MOZI 。