AQS源碼詳解
好了,我們來開始今天的内容,首先我們來看下
AQS
是什麼,全稱是
AbstractQueuedSynchronizer
翻譯過來就是【抽象隊列同步】對吧。通過名字我們也能看出這是個抽象類

而且裡面定義了很多的方法
裡面這麼多方法,咱們當然不是一個個去翻。裡面還有很多的抽象方法,咱們還得找它的實作多麻煩對不對。是以我們換個方式來探索。
我們先來看下這樣一個場景
在這裡我們有一個能被多個線程共享操作的資源,在這個場景中應該能看出我們的資料是不安全的,因為我們并不能保證我們的操作是原子操作對吧。基于這個場景我們通過代碼來看看效果
package com.example.demo;
public class AtomicDemo {
// 共享變量
private static int count = 0;
// 操作共享變量的方法
public static void incr(){
// 為了示範效果 休眠一下子
try {
Thread.sleep(1);
count ++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000 ; i++) {
new Thread(()->AtomicDemo.incr()).start();
}
Thread.sleep(4000);
System.out.println("result:" + count);
}
}
通過執行發現,執行的結果是一個不确定的值,但總是會小于等于1000,至于原因,是因為incr() 方法不是一個原子操作。為什麼不是原子操作這個咱們今天就不深究此處了.
迎合今天的主題,我們通過Lock來解決
package com.example.demo;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AtomicDemo {
// 共享變量
private static int count = 0;
private static Lock lock = new ReentrantLock();
// 操作共享變量的方法
public static void incr(){
// 為了示範效果 休眠一下子
try {
lock.lock();
Thread.sleep(1);
count ++;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000 ; i++) {
new Thread(()->AtomicDemo.incr()).start();
}
Thread.sleep(4000);
System.out.println("result:" + count);
}
}
然後我們運作發現結果都是 1000了,這也就是1000個線程都去操作這個 count 變量,結果符合我們的預期了。那lock到底是怎麼實作的呢?
我們先來分析分析
這樣的圖檔看着比較複雜,咱們簡化下。
我們自己假設下,如果要你去設計這樣的方法,你應該要怎麼設計,他們需要實作哪些功能,
首先是lock方法,它是不是要滿足這幾個功能。
需求清楚了,那我們怎麼設計呢?
第一個互斥怎麼做,也就是多個線程隻有一個線程能搶占到資源,這個時候我們可以這樣設定
// 給一個共享資源
Int state = 0 ; // 0表示資源沒有被占用,可以搶占
if(state == 0 ){
// 表示可以擷取鎖
}else{
// 表示鎖被搶占 需要阻塞等待
}
然後就是沒有搶占到鎖的線程的存儲,我們可以通過一個隊列,利用FIFO來實作存儲。
最後就是線程的阻塞和喚醒。大家說說有哪些阻塞線程的方式呀?
- wait/notify: 不合适,不能喚醒指定的線程
- Sleep:休眠,類似于定時器
- Condition:可以喚醒特定線程
-
LockSupport:
LockSupport.park():阻塞目前線程
LockSupport.unpark(Thread t):喚醒特定線程
結合今天的主題,我們選擇LockSupport來實作阻塞和喚醒。
好了,到這兒我們已經猜想到了Lock中的實作邏輯,但是在探究源碼之前我們還有個概念需要先和大家講下,因為這個是我們源碼中會接觸到的一個,先講了,看的時候就比較輕松了對吧。
我們先來看看重入鎖的場景代碼
package com.example.demo;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AtomicDemo {
// 共享變量
private static int count = 0;
private static Lock lock = new ReentrantLock();
// 操作共享變量的方法
public static void incr(){
// 為了示範效果 休眠一下子
try {
lock.lock();
Thread.sleep(1);
count ++;
// 調用了另外一個方法。
decr();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void decr(){
try {
// 重入鎖
lock.lock();
count--;
}catch(Exception e){
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000 ; i++) {
new Thread(()->AtomicDemo.incr()).start();
}
Thread.sleep(4000);
System.out.println("result:" + count);
}
}
首先大家考慮這段代碼會死鎖嗎? 大家給我個回複,我看看大家的了解的怎麼樣
好了,有說會死鎖的,有說不會,其實這兒是不會死鎖的,而且結果就是0.為什麼呢?
這個其實是鎖的一個嵌套,因為這兩把鎖都是同一個 線程對象,我們講共享變量的設計是
當state=0;線程可以搶占到資源 state =1; 如果進去嵌套通路 共享資源,這時 state = 2 如果有多個嵌套 state會一直累加,釋放資源的時候, state–,直到所有重入的鎖都釋放掉 state=0,那麼其他線程才能繼續搶占資源,說白了重入鎖的設計目的就是為了防止
死鎖
!
通過類圖我們可以發現右車的業務應用其實内在都有相識的設計,這裡我們隻需要搞清楚其中的一個,其他的你自己應該就可以看懂~,好了我們就具體結合前面的案例代碼,以ReentrantLock為例來介紹AQS的代碼實作。
在看源碼之前先回顧下這個圖,帶着問題去看,會更輕松
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
這個方法邏輯比較簡單,if條件成立說明 搶占鎖成功并設定 目前線程為獨占鎖
else 表示搶占失敗,acquire(1) 方法我們後面具體介紹
compareAndSetState(0, 1):用到了CAS 是一個原子操作方法,底層是UnSafe.作用就是設定 共享操作的 state 由0到1. 如果state的值是0就修改為1
setExclusiveOwnerThread:代碼很簡單,進去看一眼即可
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回(這裡展現了非公平鎖,每個線程擷取鎖時會嘗試直接搶占加塞一次,而CLH隊列中可能還有别的線程在等待);
- addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;
- acquireQueued()使線程阻塞在等待隊列中擷取資源,一直擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。
當然這裡代碼的作用我是提前研究過的,對于大家肯定不是很清楚,我們繼續裡面去看,最後大家可以回到這兒再論證。
再次嘗試搶占鎖
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//再次嘗試搶占鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入鎖的情況
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// false 表示搶占失敗
return false;
}
将阻塞的線程添加到雙向連結清單的結尾
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
private Node enq(final Node node) {
//CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) { // 隊列為空,建立一個空的标志結點作為head結點,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一個if語句
else語句
線程3進來會執行如下代碼
那麼效果圖
OK,通過tryAcquire()和addWaiter(),該線程擷取資源失敗,已經被放入等待隊列尾部了。聰明的你立刻應該能想到該線程下一部該幹什麼了吧:進入等待狀态休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。沒錯,就是這樣!是不是跟醫院排隊拿号有點相似~~acquireQueued()就是幹這件事:在等待隊列中排隊拿号(中間沒其它事幹可以休息),直到拿到号後再傳回。這個函數非常關鍵,還是上源碼吧:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标記是否成功拿到資源
try {
boolean interrupted = false;//标記等待過程中是否被中斷過
//又是一個“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驅
//如果前驅是head,即該結點已成老二,那麼便有資格去嘗試擷取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到資源後,将head指向該結點。是以head所指的标杆結點,就是目前擷取到資源的那個結點或null。
p.next = null; // setHead中node.prev已置為null,此處再将head.next置為null,就是為了友善GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
failed = false; // 成功擷取資源
return interrupted;//傳回等待過程中是否被中斷過
}
//如果自己可以休息了,就通過park()進入waiting狀态,直到被unpark()。如果不可中斷的情況下被中斷了,那麼會從park()中醒過來,發現拿不到資源,進而繼續進入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待過程中被中斷過,哪怕隻有那麼一次,就将interrupted标記為true
}
} finally {
if (failed) // 如果等待過程中沒有成功擷取資源(如timeout,或者可中斷的情況下被中斷了),那麼取消結點在隊列中的等待。
cancelAcquire(node);
}
}
到這裡了,我們先不急着總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驅的狀态
if (ws == Node.SIGNAL)
//如果已經告訴前驅拿完号後通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀态,并排在它的後邊。
* 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鍊,稍後就會被保安大叔趕走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驅正常,那就把前驅的狀态設定成SIGNAL,告訴它拿完号後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整個流程中,如果前驅結點的狀态不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿号。
如果線程找好安全休息點後,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//調用park()使線程進入waiting狀态
return Thread.interrupted();//如果被喚醒,檢視自己是不是被中斷的。
}
好了,我們可以小結下了。
看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現在讓我們再回到acquireQueued(),總結下該函數的具體流程:
- 結點進入隊尾後,檢查狀态,找到安全休息點;
- 調用park()進入waiting狀态,等待unpark()或interrupt()喚醒自己;
- 被喚醒後,看自己是不是有資格能拿到号。如果拿到,head指向目前結點,并傳回從入隊到拿到号的整個過程中是否被中斷過;如果沒拿到,繼續流程1。
最後我們再回到前面的acquire方法來總結下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
總結下它的流程吧
- 調用自定義同步器的tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回;
- 沒成功,則addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;
- acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試擷取資源。擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
- 如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。
好了,lock方法看完後,我們再來看下unlock方法
它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裡的其他線程來擷取資源。這也正是unlock()的語義,當然不僅僅隻限于unlock()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到頭結點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//喚醒等待隊列裡的下一個線程
return true;
}
return false;
}
此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) {//這裡是先嘗試釋放一下資源,一般都可以釋放成功,除了多次重入但隻釋放一次的情況。
Node h = head;
//這裡判斷的是 阻塞隊列是否還存在和head節點是否是tail節點,因為之前說過,隊列的尾節點的waitStatus是為0的
if (h != null && h.waitStatus != 0)
//到這裡就說明head節點已經釋放成功啦,就先去叫醒後面的直接節點去搶資源吧
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//這裡,node一般為目前線程所在的結點。
int ws = node.waitStatus;
if (ws < 0)//置零目前線程所在的結點狀态,允許失敗。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一個需要喚醒的結點s
if (s == null || s.waitStatus > 0) {//如果為空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
這個函數并不複雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裡我們也用s來表示吧。此時,再和acquireQueued()聯系起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關系,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裡既然s已經是等待隊列中最前邊的那個未放棄線程了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然後s把自己設定成head标杆結點,表示自己已經擷取到資源了,acquire()也傳回了
好了,到這我們就因為把源碼看完了,再回頭來看下這張圖
是不是就清楚了AQS到底是怎麼實作的我們上面的猜想的了吧。那麼對應的下課後讓你自己去看
這幾個的源碼,你是不是就應該能看懂了,好了本文就介紹到此,本文對你有幫助的歡迎關注點贊,謝謝【歡迎加群交流哦:463257262】