AQS 原理
AbstractQueuedSynchronizer
抽象同步隊列簡稱
AQS
,它是實作同步器的基礎元件,類圖如下:
由上圖可知,
AQS
是一個
FIFO
的雙向隊列,其内部通過節點
head
和
tail
記錄的隊首和隊尾元素,隊列元素的類型為
Node
。其中
Node
的
thread
變量來存放進入AQS 隊列裡面的線程。
Node
内部的
SHARED
用來标記該線程是擷取共享資源時被阻塞挂起後放入
AQS
隊列中,
EXCLUSIVE
是用來标記線程是擷取獨占資源時被挂起後放入
AQS
隊列的;
waitStatus
記錄目前線程等待狀态,可以為
CANCELLED
(線程被取消了),
SIGNAL
(線程需要被喚醒),
CONDITION
(線程在條件隊列裡面等待),
PROPAGATE
(釋放共享資源時需要通知其他節點);
prev
記錄目前節點的前驅節點,
next
記錄單前節點的後繼節點。
state
作用
state
在
AQS
中維持了一個單一的狀态資訊
state
可以通過
getState() , setState(int newState),compareAndSetState(int expect, int update)
修改其值。
- 對于
來說,ReentrantLock
可以用來表示目前線程擷取鎖的可重入次數state
- 對于
來說,ReentrantReadWriteLock
的高16為表示讀狀态,也就是擷取該讀鎖的次數,低16位表示擷取到寫鎖的線程的可重入次數。state
- 對于
來說,Semaphore
是用來表示可用信号的個數。state
- 對于
來說,CountDownLatch
用來表示計數器目前的值。state
ConditionObject
作用
ConditionObject
AQS
有個内部類
ConditionObject
, 用來結合鎖實作線程同步。
ConditionObject
可以直接通路
AQS
對象内部的變量,比如 state 狀态值和
AQS
隊列。
ConditionObject
是條件變量,每個條件變量對應一個條件隊列(單向連結清單隊列),其用來存放調用條件變量的
await
方法後被阻塞的線程。如類圖所示,這個條件度列的頭、尾元素分别為
firstWaiter
和
lastWaiter
。
對于
AQS
來說,線程同步的關鍵是對狀态值
state
進行操作。根據
state
是否屬于一個線程,操作
state
的方式分為獨占方式和共享方式。
獨占方式,擷取與釋放資源的流程
使用獨占方式擷取的資源時與具體線程綁定的,如果一個線程擷取到了資源,就會标記是這線程擷取到了,其它線程再嘗試操作
state
擷取資源時,會發現目前資源不是自己持有,就會在擷取失敗後被阻塞。如:
ReentrantLock
的實作,當一個線程擷取了
ReentrantLock
的鎖後,在
AQS
内部會首先使用
CAS
操作将
state
狀态值從0 改為1,然後設定目前鎖的持有者為目前線程,目前線程再次擷取鎖時發現它就是鎖的持有者,則會把 state 狀态值從1改為2,也就是設定鎖的重入次數。而當另外一個線程擷取鎖時發現自己并不是該鎖的持有者就會被放入
AQS
阻塞隊列後挂起
acquire(int arg)
擷取資源
acquire(int arg)
當一個資源調用 acquire(int arg) 方法擷取獨占資源時,首先使用 tryAcquire(arg) 方法嘗試擷取資源。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
release(int arg)
釋放資源
release(int arg)
當一個線程調用 release(int arg) 方法時,首先使用 tryRelease(arg) 操作釋放資源。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
獨占鎖總結
繼承自AQS 實作的獨占鎖
ReentrantLock
, 定義當
state
為 0 時表示鎖空閑,為1時表示鎖已經被占用。在重寫
tryAcquire(int acquires)
時,在内部需要使用CAS 算法檢視目前
state
是否為0,如果為0則使用CAS 設定為1,并設定目前鎖的持有者為目前線程,而後傳回true,如果CAS 失敗則傳回false。在重寫
tryRelease(int releases)
時,在内部需要使用CAS 算法把
state
的值從1改為0,并設定目前鎖的持有者為
null
,然後傳回true,CAS 操作失敗則傳回false。
共享方式,擷取與釋放資源的流程
共享方式的資源與具體線程是不相關的,當多個線程去請求資源時通過CAS 方式去競争擷取資源,當一個線程擷取到了資源後,另外一個線程再次去擷取時如果目前資源還能滿足它的需要,則目前線程隻需要使用CAS 方式進行擷取。比如
Semaphore
信号量,當一個線程通過
acquire(int arg)
擷取信号量時,會首先看目前信号量個數是否滿足需要,不滿足則把目前線程放入阻塞隊列,如果滿足則通過自旋 CAS 擷取信号量。
acquireShared(int arg)
擷取資源
acquireShared(int arg)
當線程調用
acquireShared(int arg)
擷取共享資源時,會首先使用
tryAcquireShared(int arg)
嘗試擷取資源,設定state的值,成功則直接傳回,失敗則将目前線程封裝類型為
Node.SHARED
的
Node
節點後插入到
AQS
阻塞隊列的尾部,并使用
LockSupport.park(this)
方法挂起自己。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
releaseShared(int arg) 釋放資源
當一個線程調用
releaseShared(int arg)
時會嘗試使用
tryReleaseShared
操作釋放資源,這裡設定狀态變量
state
的值,然後使用
LockSupport.unpark(thread)
激活
AQS
隊列裡面被阻塞的一個線程。被激活的線程則使用
tryReleaseShared
檢視目前狀态變量
state
的值是否能滿足自己的需要,滿足則該線程被激活,然後繼續向下運作,否則還是會被放入
AQS
隊列并被挂起。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
注意點
AQS 沒有提供可用的
tryAcquireShared(int arg)
和
tryReleaseShared(int arg)
方法,正如 AQS 是鎖阻塞和同步器一樣,
tryAcquireShared
和
tryReleaseShared
需要由具體的子類來實作。子類在實作
tryAcquireShared
和
tryReleaseShared
是要根據具體場景使用 CAS 算法嘗試修改 state 狀态值,成功則傳回true,否則傳回false。如繼承自AQS 實作的讀寫鎖
ReentrantReadWriteLock
裡面的讀鎖在重寫
tryAcquireShared
時,首先檢視寫鎖是否被其他線程持有,如果是則直接返false,否則使用CAS 遞增 state 的高16位,讀鎖在重寫
tryReleaseShared
時,在内部需要使用CAS 算法把目前state 的值高16位減1,成功則傳回true,失敗則傳回false。