前言
AQS (AbstractQueuedSynchronizer)成為同步容器,主要用于建構鎖或者其他同步元件的基礎架構。通過維護一個共享狀态(Volatile int state )和一個先進先出的線程等待隊列來來實作一個多線通路共享的資源同步架構。這些同步架構有哪些呢?我們JUC中常用到的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。在這些同步架構中都定義了一個靜态内部類Sync基礎于AbstractQueuedSynchronizer并實作AQS的抽象方法來管理同步狀态state。
AQS三大要素
- state
- 控制線性搶占鎖的FIFO隊列
- 期望同步元件類實作的擷取、釋放等重要方法
state
AQS提供三種方式通路sate(getState(),setState(int newState) ,compareAndSetState(int expect, int update))
volatile修飾的state保證可見性。
protected final int getState() {
return state;
}
getState():擷取目前同步狀态
protected final void setState(int newState) {
state = newState;
}
setState(int newState):設定目前同步狀态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
compareAndSetState(int expect, int update) :使用CAS設定目前的同步狀态,底層的CAS保證狀态設定的原子性。
同步器主要重寫的方法
以下為這裡方法在AQS中定義
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire(int arg) :獨占式擷取同步資源,擷取成功傳回true,否則傳回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryRelease(int arg) :獨占式擷取釋放資源,擷取成功傳回true,否則傳回false。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
tryAcquireShared(int arg):共享時擷取資源,負數表示失敗;0表示成功(但沒有可用資源);正數表示成功,而且有可用資源
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared(int arg):共享式釋放資源,如果釋放資源後允許喚醒後續等待線程則傳回true,否則傳回false。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
isHeldExclusively() :判斷目前線程是否獨占資源。
補充:什麼是獨占式和共享式。或者引用獨占鎖和共享鎖更加了解些,獨占鎖:同一個時刻隻允許隻有一個線程能擷取鎖,其他擷取不到鎖的線程在同步隊列中等待。當擷取鎖的線程釋放了鎖,後續的線程才能繼續擷取鎖。ReentrantLock實作的鎖就是一個獨占鎖。同樣的如果一把鎖同一個時刻可以被多個線程持有,那麼該鎖就是共享鎖,常見的共享鎖有ReentrantReadWriteLock中的讀鎖。
選擇那些方法進行重寫?
以上的方法,用protected 修飾,方法中無具體實作。因為具體實作留給子類(模闆方法設計模式)。自定義的同步器一般采用獨占式(重寫tryAcquire()和tryRelease()方法)或者共享式(重寫tryAcquireShared()和tryReleaseShared()方法)。但是也可以也可以同時實作獨占和共享,例如ReentrantReadWriteLock。
同步器
下面結合源碼簡單分析AQS在各個同步元件中是如果使用的
CountDownLatch
CountDownLatch類主要的兩個方法主要是
countDown():對count數值進行減1操作
await():調用該方法的線程會被挂起,直到count為0時,該線程才會繼續執行;
下面CountDownLatch類主要結構
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiclRnblN2XjlGcjAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL90zZOFzYE10dVRkWqx2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLycDN3UDNxEjMzEDNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
CountDownLatch中定義了一個私有靜态内部類Sync,并且該類基礎AbstractQueuedSynchronizer類,以共享的方式重寫了tryAcquireShared()和tryReleaseShared()方法。其他同步器或者我們需要自定義同步器的時候可按照這個套路來。
其實AQS主要是為了維護state,那麼下面我們主要看sate在CountDownLatch中是怎麼使用的。
構造函數
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
//AbstractQueuedSynchronizer類中的SetSate方法
protected final void setState(int newState) {
state = newState;
}
不難發現執行個體一個CountDownLatch對象,其實就是将定義count值設定給底層AQS的state
await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
調用了AQS的acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //嘗試擷取共享鎖 tryAcquireShared
doAcquireSharedInterruptibly(arg); //如果擷取失敗進入等待隊列。
}
tryAcquireShared()
方法由 CountDownLatch 的内部類 Sync 實作
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果state為0,即getState() == 0結果true,傳回1。此時上層tryAcquireShared(arg)<0得到的結果false,執行就直接傳回了。但如果不為零,才進入等待隊列。可以留意到調用 tryAcquireShared 僅僅檢查 state值,而不會對其減 1,可以看到傳入的參數 acquires根本沒有用。
countDown ()方法
public void countDown() {
sync.releaseShared(1);
}
調用了AQS的releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //調用 countDown 中的 Sync 類中的方法,如果為ture
doReleaseShared();//釋放調用await()的線程
return true;
}
return false;
}
而在這個方法裡,
tryReleaseShared()
是由 countDown 中的 Sync 類實作,實作代碼如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {//自旋
int c = getState(); //擷取state的值,也即是CountDownLatch初始化時的count
if (c == 0)//如果staet為0說明,已經count已經别的線程倒數到0。直接傳回false
return false;
int nextc = c-1; //否則對state進行減一
if (compareAndSetState(c, nextc)) //CAS方式更新state的值
return nextc == 0;//如果倒數到 0(即最後一次倒數),傳回true。那麼 releaseShared 中會調用 doReleaseShared(),讓 AQS 釋放資源出來。
}
}
小結
在調用CountDownLatch的
await()
方法的時,便會
嘗試擷取共享鎖
,但是剛開始的時候
state是大于0
(執行個體CountDownLatch時的count參數需要大于0)的,此時(tryAcquireShared(arg) < 0為true),于是·線程會被阻塞·。state的初始值為count,當每一個線程調用·countDown()·方法時,就會對state的值進行·減一·。直到
state為0
.前面被阻塞的線程繼續運作。
Semaphore
Semaphore 是信号量或許可證的意思,通過信号量可以對同一資源通路做數量的限制。适合控制有 “池” 概念的資源通路。例如控資料庫連接配接池的并發通路數量。
下面結合源碼簡單分析AQS在Semaphore的使用。
構造函數
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
permits設定信号量的大小;fair設定是否公平性,分别對應這FairSync 和 NonfairSync ,這兩個内部類都繼承于Sync,和CountDownLatch一樣Sync也是繼承于AQS。預設為NonfairSync。
acquire()方法
acquire():申請許可,預設申請一個許可
acquire(int permits):申請指定數量的許可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//調用了 AQS 中的 acquireSharedInterruptibly 方法
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //嘗試擷取信号量,擷取失敗傳回負數,進入自旋等待隊列
doAcquireSharedInterruptibly(arg); //排隊
}
tryAcquireShared()
方法的具體實作在 Semaphore 内部靜态類
FairSync
或
NonfairSync
中,如下:
Semaphore中的 NonfairSync
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) { //自旋
int available = getState();//查詢剩餘的許可數量
int remaining = available - acquires;//剩餘的許可将去本次線程申請的許可
//1.如果remaining < 0,說明可用許可不足,擷取失敗
//2.如果remaining >= 0,則更新CAS更新許可的數量,即更新state的值,并傳回0或正數
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
Semaphore中的 FairSync
protected int tryAcquireShared(int acquires) {
for (;;) { //自旋
//用下面的if條件來維持公平性
if (hasQueuedPredecessors())//看是否有更早等待的線程,如果有,擷取失敗
return -1;
int available = getState(); //查詢剩餘的許可數量
int remaining = available - acquires; //剩餘的許可将去本次線程申請的許可
//1.如果remaining < 0,說明可用許可不足,擷取失敗
//2.如果remaining >= 0,則更新CAS更新許可的數量,即更新state的值,并傳回0或正數
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
release()方法
release():釋放許可,預設釋放一個許可
release(int permits):釋放指定數量的許可
public void release() {
sync.releaseShared(1); //調用AQS的releaseShared()方法
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//釋放許可
doReleaseShared();//通知下個線程節點來擷取資源
return true;
}
return false;
}
tryReleaseShared()
的實作也在 Semaphore 内部靜态類
Sync
中,如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) { //自旋
int current = getState(); //擷取目前的許可數量
int next = current + releases; //加上釋放的許可
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //CAS更新許可
return true;
}
}
小結
在Semophore中,state表示
剩餘的許可證(信号量)的數量
。對state操作邏輯主要放在了重寫的方法
tryReleaseShared()
和
tryAcquireShared()
中,可看出Semaphore也是共享式的擷取和釋放資源。 擷取許可時,
首先
檢查剩餘的許可數目是否滿足本次線程需要的數目,如果
不夠
則傳回
負數
(擷取許可失敗),如果
夠
則
自旋加compareAndSetState
更新state的數值,知道成功就傳回正數;如果期間被别線程修改了導緻剩餘的許可數量不夠,那麼傳回負數(擷取許可失敗)。