AQS全稱是AbstractQueuedSynchronizer,是JDK提供的一個同步器設計架構,很多并發資料結構如ReentrantLock、ReentrantReadWriteLock、Semaphore等都是基于AQS來實作的,下面來分析一下AQS的原理。
一、底層資料結構
AQS底層維護了一個state(代表共享資源)和一個CLH隊列(代表線程等待隊列)
state:state是一個volatile int型變量,用以表示共享資源的使用情況。
head、tail:分别表示CLH隊列的頭節點和尾節點。關于CLH隊列原理可以看這篇部落格:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
AQS定義了兩種資源:
一種是Exclusive(獨占方式,同一時間隻有一個線程能夠通路,比如ReentrantLock);
一種是Share(共享方式,同一時間允許多個線程通路,比如Semaphore)。
二、AQS原理
AQS被定義為一個同步器架構,那麼為什麼說AQS是一個“架構”呢,架構的定義應該是使用者根據架構的要求去實作一些業務邏輯,然後底層由架構自己去完成,比如使用Hadoop大資料架構的時候,我們隻需要定義好Map/Reduce函數就可以了,其他複雜的底層操作就有Hadoop幫我們完成。AQS也應該是這樣一個東西,事實上,AQS也确實是這樣做的,它隻需要開發者實作其中的幾個函數,其他的底層操作比如CLH隊列操作、CAS自旋等等都是由AQS架構來實作。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
上面的幾個方法就是AQS要我們實作的函數,這幾個方法都是AQS提供給開發者編寫代碼邏輯的,是以每個方法裡面都定義為直接抛出UnsupportedOperationException,下面解釋一下這幾個函數的作用。
tryAcquire:嘗試擷取獨占資源。成功傳回true,失敗傳回false。
tryRelease:嘗試釋放獨占資源。成功傳回true,失敗傳回false。
tryAcquireShared:嘗試擷取共享資源。成功傳回一個非負數,代表剩餘資源(0就表示沒有可用資源),負數表示失敗。
tryReleaseShared:嘗試釋放共享資源。成功傳回true,失敗傳回false。
isHeldExclusively:判斷線程是否正在獨占資源。
對于獨占方式的同步器,隻需要實作tryAcquire-tryRelease;而對于共享方式的同步器則隻需要實作tryAcquireShared-tryReleaseShared就行了,當然如果自定義同步器需要同時實作獨占和共享方式的話也可以,比如讀寫鎖ReentrantReadWriteLock,其中讀鎖是共享的,寫鎖是獨占的。
三、源碼解析
AQS中對于資源擷取的頂層接口是acquire-release和acquireShare-releaseShare,下面從這些頂層接口來分析:
1、acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代碼邏輯:
(1)嘗試擷取資源(tryAcquire),成功則傳回,失敗則跳轉到(2)
(2)請求将該線程加入線程等待隊列的尾部,并标記為獨占模式(addWaiter(Node.EXCLUSIVE)),并不斷請求隊列智鬥擷取到資源,如果該過程被打斷,則跳轉到(3),失敗則傳回。
(3)自我中斷(selfInterrupt)
(4)完成
2、release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
代碼邏輯:
(1)嘗試釋放資源,如果成功則跳轉到(2),失敗則傳回false
(2)擷取線程等待隊列的頭節點,如果頭節點線程不為空且處于等待狀态,則喚醒該線程(unparkSuccessor)
3、acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
代碼邏輯:
(1)嘗試擷取共享資源,如果擷取失敗則調用doAcquireShared自旋來請求共享資源。
4、releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
代碼邏輯:
(1)嘗試釋放共享資源,釋放成功則喚醒等待隊列中的後繼節點來擷取資源。
5、如果需要響應中斷程式的話,AQS還提供了acquireInterruptibly和acquireSharedInterruptibly函數來響應。
4、實際例子:ReentrantLock解析
一般用AQS來實作同步器主要使用内部類的方式來實作的,ReentrantLock裡面就是一個比較經典的實作,看一下這個内部類的實作源碼:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
這裡定義的Sync同步器類仍然是一個抽象類,是因為ReentrantLock同時支援公平鎖和非公平鎖,而這兩種同步方式是不一樣的,是以先定義一個Sync類作為兩種實作的基類,後面會看到具體的公平鎖和非公平鎖的實作。
由于ReentrantLock本身是一個獨享鎖,是以Sync類主要重寫了AQS裡面的tryRelease和isHeldExclusively方法,同時定義了非公平鎖的nonfairTryAcquire方法作為請求資源的方法。
非公平鎖實作:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
其實基類Sync完完全全就是一個非公平鎖的實作,是以NonfairSync并沒有新代碼,tryAcquire直接調用nonfairTryAcquire。
公平鎖實作:
static final class FairSync extends Sync {
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平鎖與非公平鎖的差別也就是資源請求的方式不一樣,是以在基類Sync的基礎上重寫tryAcquire方法,順序喚醒等待隊列中的線程。
基于AQS實作了公平鎖和非公平鎖之後,如何實作上鎖和解鎖的功能也就十分簡單了,源碼直接用一行代碼就實作了:
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}