思維導圖

文章已收錄Github精選,歡迎Star:https://github.com/yehongzhi/learningSummary
一、什麼是AQS
談到并發程式設計,不得不說AQS(AbstractQueuedSynchronizer),這可謂是Doug Lea老爺子的大作之一。AQS即是抽象隊列同步器,是用來建構Lock鎖和同步元件的基礎架構,很多我們熟知的鎖和同步元件都是基于AQS建構,比如ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。
實際上AQS是一個抽象類,我們不妨先看一下源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
//頭結點
private transient volatile Node head;
//尾節點
private transient volatile Node tail;
//共享狀态
private volatile int state;
//内部類,建構連結清單的Node節點
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
}
//AbstractQueuedSynchronizer的父類
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
//占用鎖的線程
private transient Thread exclusiveOwnerThread;
}
由源碼可以看出AQS是有以下幾個部分組成的:
1.1 state共享變量
AQS中裡一個很重要的字段state,表示同步狀态,是由
volatile
修飾的,用于展示目前臨界資源的獲鎖情況。通過getState(),setState(),compareAndSetState()三個方法進行維護。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//CAS操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
關于state的幾個要點:
- 使用volatile修飾,保證多線程間的可見性。
- getState()、setState()、compareAndSetState()使用final修飾,限制子類不能對其重寫。
- compareAndSetState()采用樂觀鎖思想的CAS算法,保證原子性操作。
1.2 CLH隊列
AQS裡另一個重要的概念就是CLH隊列,它是一個雙向連結清單隊列,其内部由head和tail分别記錄頭結點和尾結點,隊列的元素類型是Node。
簡單講一下這個隊列的作用,就是當一個線程擷取同步狀态(state)失敗時,AQS會将此線程以及等待的狀态等資訊封裝成Node加入到隊列中,同時阻塞該線程,等待後續的被喚醒。
隊列的元素就是一個個的Node節點,下面講一下Node節點的組成:
static final class Node {
//共享模式下的等待标記
static final Node SHARED = new Node();
//獨占模式下的等待标記
static final Node EXCLUSIVE = null;
//表示目前節點的線程因為逾時或者中斷被取消
static final int CANCELLED = 1;
//表示目前節點的後續節點的線程需要運作,也就是通過unpark操作
static final int SIGNAL = -1;
//表示目前節點在condition隊列中
static final int CONDITION = -2;
//共享模式下起作用,表示後續的節點會傳播喚醒的操作
static final int PROPAGATE = -3;
//狀态,包括上面的四種狀态值,初始值為0,一般是節點的初始狀态
volatile int waitStatus;
//上一個節點的引用
volatile Node prev;
//下一個節點的引用
volatile Node next;
//儲存在目前節點的線程引用
volatile Thread thread;
//condition隊列的後續節點
Node nextWaiter;
}
1.3 exclusiveOwnerThread
AQS通過繼承AbstractOwnableSynchronizer類,擁有的屬性。表示獨占模式下同步器的持有者。
二、AQS的實作原理
AQS有兩種模式,分别是獨占式和共享式。
2.1 獨占式
同一時刻僅有一個線程持有同步狀态,也就是其他線程隻有在占有的線程釋放後才能競争,比如ReentrantLock。下面從源碼切入,梳理獨占式的實作思路。
首先看acquire()方法,這是AQS在獨占模式下擷取同步狀态的方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先講這個方法的總體思路:
- tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回。
- 如果失敗則調用addWaiter()方法把目前線程包裝成Node(狀态為EXCLUSIVE,标記為獨占模式)插入到CLH隊列末尾。
- 然後acquireQueued()方法使線程阻塞在等待隊列中擷取資源,一直擷取到資源後才傳回,如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
- 線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。
我們展開來分析,看tryAcquire()方法,嘗試擷取資源,成功傳回true,失敗傳回false。
//直接抛出異常,這是由子類進行實作的方法,展現了模闆模式的思想
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
為什麼沒有具體實作呢,其實這是模闆模式的思想。這個方法是嘗試擷取資源,但是擷取資源的方式有很多種實作,比如公平鎖有公平鎖的擷取方式,非公平鎖有非公平鎖的擷取方式(後面會講,别急)。是以這裡是一個沒有具體實作的方法,需要由子類去實作。
接着看addWaiter()方法,這個方法的作用是把目前線程包裝成Node添加到隊列中。
private Node addWaiter(Node mode) {
//把目前線程包裝成Node節點
Node node = new Node(Thread.currentThread(), mode);
//擷取到尾結點
Node pred = tail;
//判斷尾結點是否為null,如果不為空,那就證明隊列已經初始化了
if (pred != null) {
//已經初始化了,就直接把Node節點添加到隊列的末尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
//傳回包含目前線程的節點Node
return node;
}
}
//如果隊列沒有初始化,那就調用enq()方法
enq(node);
return node;
}
接着我們看enq()方法,就是一個自旋的操作,把傳進來的node添加到隊列最後,如果隊列沒有初始化則進行初始化。
private Node enq(final Node node) {
//自旋操作,也就是死循環,隻有加入隊列成功才會return
for (;;) {
//把尾結點指派給t
Node t = tail;
//如果為空,證明沒有初始化,進行初始化
if (t == null) { // Must initialize
//建立一個空的Node節點,并且設定為頭結點
if (compareAndSetHead(new Node()))
//然後把頭結點指派給尾結點
tail = head;
} else {
//如果是第一次循環為空,就已經建立了一個一個Node,那麼第二次循環就不會為空了
//如果尾結點不為空,就把傳進來的node節點的前驅節點指向尾結點
node.prev = t;
//cas原子性操作,把傳進來的node節點設定為尾結點
if (compareAndSetTail(t, node)) {
//把原來的尾結點的後驅節點指向傳進來的node節點
t.next = node;
return t;
}
}
}
}
接着我們再把思路跳回去頂層的方法,看acquireQueued()方法。
//在隊列中的節點node通過acquireQueued()方法擷取資源,忽略中斷。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋的操作,一個死循環
for (;;) {
//擷取傳進來的node節點的前驅節點,指派給p
final Node p = node.predecessor();
//如果p是頭結點,node節點就是第二個節點,則再次去嘗試擷取資源
if (p == head && tryAcquire(arg)) {
//tryAcquire(arg)擷取資源成功的話,則把node節點設定為頭結點
setHead(node);
//把原來的頭結點p的後驅節點設定為null,等待GC垃圾回收
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果p不是頭結點,或者tryAcquire()擷取資源失敗,判斷是否可以被park,也就是把線程阻塞起來
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//&&前面如果傳回true,将目前線程阻塞并檢查是否被中斷
//如果阻塞過程中被中斷,則置interrupted标志位為true。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
最後是selfInterrupt()方法,自我中斷。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
過程記不住沒關系,下面畫張圖來總結一下,其實很簡單。
2.2 共享式
即共享資源可以被多個線程同時占有,直到共享資源被占用完畢。比如ReadWriteLock和CountdownLatch。下面我們從源碼去分析其實作原理。
首先還是看最頂層的acquireShared()方法。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
這段代碼很簡單,首先調用tryAcquireShared()方法,tryAcquireShared傳回是一個int數值,當傳回值大于等于0的時候,說明獲得成功擷取鎖,方法結束,否則傳回負數,表示擷取同步狀态失敗,執行doAcquireShared方法。
tryAcquireShared()方法是一個模闆方法由子類去重寫,意思是需要如何擷取同步資源由實作類去定義,AQS隻是一個架構。
那麼就看如果擷取資源失敗,執行的doAcquireShared()方法。
private void doAcquireShared(int arg) {
//調用addWaiter()方法,把目前線程包裝成Node,标志為共享式,插入到隊列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//擷取目前節點node的前驅節點
final Node p = node.predecessor();
//前驅節點是否是頭結點
if (p == head) {
//如果前驅節點是頭結點,則調用tryAcquireShared()擷取同步資源
int r = tryAcquireShared(arg);
//r>=0表示擷取同步資源成功,隻有擷取成功,才會執行到return退出for循環
if (r >= 0) {
//設定node為頭結點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//判斷是否可以被park,跟獨占式的邏輯一樣傳回true,則進行park操作,阻塞線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這段邏輯基本上跟獨占式的邏輯差不多,不同的地方在于入隊的Node是标志為SHARED共享式的,擷取同步資源的方式是tryAcquireShared()方法。
三、AQS的模闆模式
模闆模式在AQS中的應用可謂是一大精髓,在上文中有提到的tryAcquireShared()和tryAcquire()都是很重要的模闆方法。一般使用AQS往往都是使用一個内部類繼承AQS,然後重寫相應的模闆方法。
AQS已經把一些常用的,比如入隊,出隊,CAS操作等等建構了一個架構,使用者隻需要實作擷取資源,釋放資源的,因為很多鎖,還有同步器,其實就是擷取資源和釋放資源的方式有比較大的差別。
那麼我們看一下模闆方法有哪些。
3.1 tryAcquire()
tryAcquire()方法,獨占式擷取同步資源,傳回true表示擷取同步資源成功,false表示擷取失敗。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
3.2 tryRelease()
tryRelease()方法,獨占式使用,tryRelease()的傳回值來判斷該線程是否已經完成釋放資源,子類來決定是否能成功釋放鎖。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
3.3 tryAcquireShared()
tryAcquireShared()方法,共享式擷取同步資源,傳回大于等于0表示擷取資源成功,傳回小于0表示失敗。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
3.4 tryReleaseShared()
tryReleaseShared()方法,共享式嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
3.5 isHeldExclusively()
isHeldExclusively()方法,該線程是否正在獨占資源。隻有用到condition才需要去實作它。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
四、認識ReentrantLock
ReentrantLock是一個很經典的使用AQS的案例,不妨以此為切入點來繼續深入。ReentrantLock的特性有很多,首先它是一個悲觀鎖,其次有兩種模式分别是公平鎖和非公平鎖,最後它是重入鎖,也就是能夠對共享資源重複加鎖。
AQS通常是使用内部類實作,是以不難想象在ReentrantLock類裡有兩個内部類,我們看一張類圖。
FairSync是公平鎖的實作,NonfairSync則是非公平鎖的實作。通過構造器傳入的boolean值進行判斷。
public ReentrantLock(boolean fair) {
//true則使用公平鎖,false則使用非公平鎖
sync = fair ? new FairSync() : new NonfairSync();
}
//預設是非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
公平鎖是遵循FIFO(先進先出)原則的,先到的線程會優先擷取資源,後到的線程會進行排隊等待,能保證每個線程都能拿到鎖,不會存在有線程餓死的情況。
非公平鎖是則不遵守先進先出的原則,會出現有線程插隊的情況,不能保證每個線程都能拿到鎖,會存在有線程餓死的情況。
下面我們從源碼分析去找出這兩種鎖的差別。
五、源碼分析ReentrantLock
5.1 上鎖
ReentrantLock是通過lock()方法上鎖,是以看lock()方法。
public void lock() {
sync.lock();
}
sync就是NonfairSync或者FairSync。
//這裡就是調用AQS的acquire()方法,擷取同步資源
final void lock() {
acquire(1);
}
acquire()方法前面已經解析過了,主要看FairSync的tryAcquire()方法。
protected final boolean tryAcquire(int acquires) {
//擷取目前線程
final Thread current = Thread.currentThread();
//擷取同步狀态
int c = getState();
//判斷同步狀态是否為0
if (c == 0) {
//關鍵在這裡,公平鎖會判斷是否需要排隊
if (!hasQueuedPredecessors() &&
//如果不需要排隊,則直接cas操作更新同步狀态為1
compareAndSetState(0, acquires)) {
//設定占用鎖的線程為目前線程
setExclusiveOwnerThread(current);
//傳回true,表示上鎖成功
return true;
}
}
//判斷目前線程是否是擁有鎖的線程,主要是可重入鎖的邏輯
else if (current == getExclusiveOwnerThread()) {
//如果是目前線程,則同步狀态+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//設定同步狀态
setState(nextc);
return true;
}
//以上情況都不是,則傳回false,表示上鎖失敗。上鎖失敗根據AQS的架構設計,會入隊排隊
return false;
}
如果是非公平鎖NonfairSync的tryAcquire(),我們繼續分析。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//非公平式擷取鎖
final boolean nonfairTryAcquire(int acquires) {
//這段跟公平鎖是一樣的操作
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//關鍵在這裡,不再判斷是否需要排隊,而是直接去更新同步狀态,通俗點講就是插隊
if (compareAndSetState(0, acquires)) {
//如果擷取同步狀态成功,則設定占用鎖的線程為目前線程
setExclusiveOwnerThread(current);
//傳回true表示擷取鎖成功
return true;
}
}
//以下邏輯跟公平鎖的邏輯一樣
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
其實很明顯了,關鍵的差別就在于嘗試擷取鎖的時候,公平鎖會判斷是否需要排隊再去更新同步狀态,非公平鎖是直接就更新同步,不判斷是否需要排隊。
從性能上來說,公平鎖的性能是比非公平鎖要差的,因為公平鎖要遵守FIFO(先進先出)的原則,這就會增加了上下文切換與等待線程的狀态變換時間。
非公平鎖的缺點也是很明顯的,因為允許插隊,這就會存在有線程餓死的情況。
5.2 解鎖
解鎖對應的方法就是unlock()。
public void unlock() {
//調用AQS中的release()方法
sync.release(1);
}
//這是AQS架構定義的release()方法
public final boolean release(int arg) {
//目前鎖是不是沒有被線程持有,傳回true表示該鎖沒有被任何線程持有
if (tryRelease(arg)) {
//擷取頭結點h
Node h = head;
//判斷頭結點是否為null并且waitStatus不是初始化節點狀态,解除線程挂起狀态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
關鍵在于tryRelease(),這就不需要分公平鎖和非公平鎖的情況,隻需要考慮可重入的邏輯。
protected final boolean tryRelease(int releases) {
//減少可重入的次數
int c = getState() - releases;
//如果目前線程不是持有鎖的線程,抛出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有線程全部釋放,将目前獨占鎖所有線程設定為null,并更新state
if (c == 0) {
//狀态為0,表示持有線程被全部釋放,設定為true
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
總結
JUC可謂是學習java的一個難點,而學習AQS其實關鍵在于并發的思維,因為需要考慮的情況很多,其次需要了解模闆模式的思想,這才能了解為什麼AQS作為一個架構的作用。ReentrantLock這個類我覺得是了解AQS一個很好的切入點,看懂了之後再去看AQS的其他應用類應該會輕松很多。
那麼這篇文章就講到這裡了,希望看完能有所收獲,感謝你的閱讀。
覺得有用就點個贊吧,你的點贊是我創作的最大動力~
我是一個努力讓大家記住的程式員。我們下期再見!!!
能力有限,如果有什麼錯誤或者不當之處,請大家批評指正,一起學習交流!