詳細介紹了AQS中的同步隊列以及同步狀态的獨占式擷取、釋放的原理。
AQS相關文章:
AQS(AbstractQueuedSynchronizer)源碼深度解析(1)—AQS的設計與總體結構
AQS(AbstractQueuedSynchronizer)源碼深度解析(2)—Lock接口以及自定義鎖的實作
AQS(AbstractQueuedSynchronizer)源碼深度解析(3)—同步隊列以及獨占式擷取鎖、釋放鎖的原理【一萬字】
AQS(AbstractQueuedSynchronizer)源碼深度解析(4)—共享式擷取鎖、釋放鎖的原理【一萬字】
AQS(AbstractQueuedSynchronizer)源碼深度解析(5)—條件隊列的等待、通知的實作以及AQS的總結【一萬字】
AQS中的同步隊列與同步狀态的擷取、釋放、阻塞有緊密的關聯關系,這兩個知識點必須要連起來學習。
文章目錄
- 1 同步隊列的結構
- 2 鎖的擷取與釋放
- 3 acquire獨占式擷取鎖
-
- 3.1 tryAcquire嘗試擷取獨占鎖
- 3.2 addWaiter加入到同步隊列
-
- 3.2.1 enq保證結點入隊
- 3.3 acquireQueued結點自旋擷取鎖
-
- 3.3.1 shouldParkAfterFailedAcquire結點是否應該挂起
- 3.3.2 parkAndCheckInterrupt挂起線程&判斷中斷狀态
- 3.3.3 finally代碼塊
- 3.4 selfInterrupt安全中斷
- 4 release獨占式鎖釋放
-
- 4.1 unparkSuccessor喚醒後繼結點
- 5 acquirelnterruptibly獨占式可中斷擷取鎖
-
- 5.1 doAcquireInterruptibly獨占式可中斷擷取鎖
- 5.2 finally代碼塊
-
- 5.2.1 cancelAcquire取消擷取鎖請求
- 5.2.2 cancelAcquire案例示範
- 6 tryAcquireNanos獨占式逾時擷取鎖
-
- 6.1 doAcquireNanos獨占式逾時擷取鎖
- 6.2 finally代碼塊
- 7 獨占式擷取/釋放鎖總結
-
- 7.1 acquire/release流程圖
- 7.2 acquire一般流程
- 7.3 release一般流程
1 同步隊列的結構
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* 目前擷取鎖的線程,該變量定義在父類中,AQS直接繼承。在獨占鎖的擷取時,如果是重入鎖,那麼需要知道到底是哪個線程獲得了鎖。沒有就是null
*/
private transient Thread exclusiveOwnerThread;
/**
* AQS中保持的對同步隊列的引用
* 隊列頭結點,實際上是一個哨兵結點,不代表任何線程,head所指向的Node的thread屬性永遠是null。
*/
private transient volatile Node head;
/**
* 隊列尾結點,後續的結點都加入到隊列尾部
*/
private transient volatile Node tail;
/**
* 同步狀态
*/
private volatile int state;
/**
* Node内部類,同步隊列的結點類型
*/
static final class Node {
/*AQS支援共享模式和獨占模式兩種類型,下面表示構造的結點類型标記*/
/**
* 共享模式下構造的結點,用來标記該線程是擷取共享資源時被阻塞挂起後放入AQS 隊列的
*/
static final Node SHARED = new Node();
/**
* 獨占模式下構造的結點,用來标記該線程是擷取獨占資源時被阻塞挂起後放入AQS 隊列的
*/
static final Node EXCLUSIVE = null;
/*線程結點的等待狀态,用來表示該線程所處的等待鎖的狀态*/
/**
* 訓示目前結點(線程)需要取消等待
* 由于在同步隊列中等待的線程發生等待逾時、中斷、異常,即放棄擷取鎖,需要從同步隊列中取消等待,就會變成這個狀态
* 如果結點進入該狀态,那麼不會再變成其他狀态
*/
static final int CANCELLED = 1;
/**
* 訓示目前結點(線程)的後續結點(線程)需要取消等待(被喚醒)
* 如果一個結點狀态被設定為SIGNAL,那麼後繼結點的線程處于挂起或者即将挂起的狀态
* 目前結點的線程如果釋放了鎖或者放棄擷取鎖并且結點狀态為SIGNAL,那麼将會嘗試喚醒後繼結點的線程以運作
* 這個狀态通常是由後繼結點給前驅結點設定的。一個結點的線程将被挂起時,會嘗試設定前驅結點的狀态為SIGNAL
*/
static final int SIGNAL = -1;
/**
* 線程在等待隊列裡面等待,waitStatus值表示線程正在等待條件
* 原本結點在等待隊列中,結點線程等待在Condition上,當其他線程對Condition調用了signal()方法之後
* 該結點會從從等待隊列中轉移到同步隊列中,進行同步狀态的擷取
*/
static final int CONDITION = -2;
/**
* 釋放共享資源時需要通知其他結點,waitStatus值表示下一個共享式同步狀态的擷取應該無條件傳播下去
*/
static final int PROPAGATE = -3;
/**
* 記錄目前線程等待狀态值,包括以上4中的狀态,還有0,表示初始化狀态
*/
volatile int waitStatus;
/**
* 前驅結點,當結點加入同步隊列将會被設定前驅結點資訊
*/
volatile Node prev;
/**
* 後繼結點
*/
volatile Node next;
/**
* 目前擷取到同步狀态的線程
*/
volatile Thread thread;
/**
* 等待隊列中的後繼結點,如果目前結點是共享模式的,那麼這個字段是一個SHARED常量
* 在獨占鎖模式下永遠為null,僅僅起到一個标記作用,沒有實際意義。
*/
Node nextWaiter;
/**
* 如果是共享模式下等待,那麼傳回true(因為上面的Node nextWaiter字段在共享模式下是一個SHARED常量)
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 用于建立初始頭結點或SHARED标記
*/
Node() {
}
/**
* 用于添加到等待隊列
*
* @param thread
* @param mode
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//......
}
}
}
由上面的源碼可知,同步隊列的基本結構如下圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR5EeBpnTyMmeNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL0MTOxMDOxMjMyAzNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
在AQS内部Node的源碼中我們能看到,同步隊列是"CLH" (Craig, Landin, andHagersten) 鎖隊列的變體,它的head引用指向的頭結點作為哨兵結點,不存儲任何與等待線程相關的資訊,或者可以看成已經獲得鎖的結點。第二個結點開始才是真正的等待線程建構的結點,後續的結點會加入到連結清單尾部。
将新結點添加到連結清單尾部的方法是compareAndSetTail(Node expect,Node update)方法,該方法是一個CAS方法,能夠保證線程安全。
最終擷取鎖的線程所在的結點,會被設定成為頭結點(setHead方法),該設定步驟是通過擷取鎖成功的線程來完成的,由于隻有一個線程能夠成功擷取到鎖,是以設定的方法并不需要使用CAS來保證。
同步隊列遵循先進先出(FIFO),頭結點的next結點是将要擷取到鎖的結點,線程在釋放鎖的時候将會喚醒後繼結點,然後後繼結點會嘗試擷取鎖。
2 鎖的擷取與釋放
Lock中“鎖”的狀态使用state變量來表示,一般來說0表示鎖沒被占用,大于0表示所已經被占用了。
AQS提供的鎖的擷取和釋放分為獨占式的和共享式的:
- 獨占式:顧名思義就是同一時刻隻能有一個線程擷取到鎖,其他擷取鎖線程隻能處于同步隊列中等待,隻有擷取鎖的線程釋放了鎖,後繼的線程才能夠擷取到鎖。
- 共享式:同一時刻能夠有多個線程擷取到鎖。
對于AQS 來說,線程同步的關鍵是對同步狀态state的操作:
- 在獨占式下擷取和釋放鎖使用的方法為:
。void acquire( int arg) 、void acquirelnterruptibly(int arg) 、boolean release( int arg)
- 在共享式下擷取和釋放鎖的方法為:
。void acquireShared(int arg) 、void acquireSharedInterruptibly(int arg)、 boolean reaseShared(int arg)
擷取鎖的大概通用流程如下:
線程會首先嘗試擷取鎖,如果失敗,則将目前線程以及等待狀态等資訊包成一個Node結點加到同步隊列裡。接着會不斷循環嘗試擷取鎖(擷取鎖的條件是目前結點為head的直接後繼才會嘗試),如果失敗則會嘗試阻塞自己(阻塞的條件是目前節結點的前驅結點是SIGNAL狀态),阻塞後将不會執行後續代碼,直至被喚醒;當持有鎖的線程釋放鎖時,會喚醒隊列中的後繼線程,或者阻塞的線程被中斷或者時間到了,那麼阻塞的線程也會被喚醒。
如果分獨占式和共享式,那麼在上面的通用步驟之下有這些差別:
- 獨占式擷取的鎖是與具體線程綁定的,就是說如果一個線程擷取到了鎖,exclusiveOwnerThread字段就會記錄這個線程,其他線程再嘗試操作state 擷取鎖時會發現目前該鎖不是自己持有的,就會在擷取失敗後被放入AQS 同步隊列。比如獨占鎖ReentrantLock 的實作, 當一個線程擷取了ReentrantLock 的鎖後,在AQS 内部會首先使用CAS操作把state 狀态值從0變為1 ,然後設定目前鎖的持有者為目前線程,當該線程再次擷取鎖時發現它就是鎖的持有者,則會把狀态值從1變為2,也就是設定可重入次數,而當另外一個線程擷取鎖時發現自己并不是該鎖的持有者就會被放入AQS 同步隊列後挂起。
- 共享式擷取的鎖與具體線程是不相關的,當多個線程去請求鎖時通過CAS 方式競争擷取鎖,當一個線程擷取到了鎖後,另外一個線程再次去擷取時如果目前鎖還能滿足它的需要,則目前線程隻需要使用CAS 方式進行擷取即可。比如Semaphore 信号量, 當一個線程通過acquire()方法擷取信号量時,會首先看目前信号量個數是否滿足需要,不滿足則把目前線程放入同步隊列,如果滿足則通過自旋CAS 擷取信号量,相應的信号量個數減少對應的值。
實際上,具體的步驟更加複雜,下面講解源碼的時候會提到!
3 acquire獨占式擷取鎖
通過調用AQS的acquire模版方法可以獨占式的擷取鎖,該方法不會響應中斷,也就是由于線程擷取同步狀态失敗後進入同步隊列中,後續對線程進行中斷操作時,線程不會從同步隊列中移出。基于獨占式實作的元件有ReentrantLock等。
該方法大概步驟如下:
- 首先調用tryAcquire方法嘗試擷取鎖,如果擷取鎖成功會傳回true,方法結束;否則擷取鎖失敗傳回false,然後進行下一步的操作。
- 通過addWaiter方法将線程按照獨占模式Node.EXCLUSIVE構造同步結點,并添加到同步隊列的尾部。
- 然後通過acquireQueued(Node node,int arg)方法繼續自旋擷取鎖。
- 一次自旋中如果擷取不到鎖,那麼判斷是否可以挂起并嘗試挂起結點中的線程(調用LockSupport.park(this)方法挂起自己,注意這裡的線程狀态是WAITING)。而挂起線程的喚醒主要依靠前驅結點或線程被中斷來實作,注意喚醒之後會繼續自旋嘗試獲得鎖。
- 最終隻有獲得鎖的線程才能從acquireQueued方法傳回,然後根據傳回值判斷是否調用selfInterrupt設定中斷标志位,但此時線程處于運作态,即使設定中斷标志位也不會抛出異常(即acquire(lock)方法不會響應中斷)。
- 線程獲得鎖,acquire方法結束,從lock方法中傳回,繼續後續執行同步代碼!
/**
* 獨占式的嘗試擷取鎖,一直擷取不成功就進入同步隊列等待
*/
public final void acquire(int arg) {
//内部是由4個方法的調用組成的
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
3.1 tryAcquire嘗試擷取獨占鎖
熟悉的tryAcquire方法,這個方法我們在最開頭講“AQS的設計”時就提到過,該方法是AQS的子類即我們自己實作的,用于首次嘗試擷取獨占鎖,一般來說就是對state的改變、或者重入鎖的檢查、設定目前獲得鎖的線程等等,不同的鎖有自己相應的邏輯判斷,這裡不多講,後面講具體鎖的實作的時候(比如ReentrantLock)會講到。總之,擷取成功該方法就傳回true,失敗就傳回false。
在AQS的中tryAcquire的實作為抛出異常,是以需要子類重寫:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
3.2 addWaiter加入到同步隊列
addWaiter方法是AQS提供的,也不需要我們重寫,或者說是鎖的通用方法!
addWaiter方法用于将按照獨占模式構造的同步結點Node.EXCLUSIVE添加到同步隊列的尾部。大概步驟為:
- 按照給定模式,建構新結點。
- 如果同步隊列不為null,則嘗試将新結點添加到隊列尾部(隻嘗試一次),如果添加成功則傳回新結點,方法結束。
- 如果隊列為null或者添加失敗,則調用enq方法循環嘗試添加,直到成功,傳回新結點,方法結束。
/**
* addWaiter(Node node)方法将擷取鎖失敗的線程構造成結點加入到同步隊列的尾部
*
* @param mode 模式。獨占模式傳入的是一個Node.EXCLUSIVE,即null;共享模式傳入的是一個Node.SHARED,即一個靜态結點對象(共享的、同一個)
* @return 傳回構造的結點
*/
private Node addWaiter(Node mode) {
/*1 首先構造結點*/
Node node = new Node(Thread.currentThread(), mode);
/*2 嘗試将結點直接放在隊尾*/
//直接擷取同步器的tail結點,使用pred來儲存
Node pred = tail;
/*如果pred不為null,實際上就是隊列不為null
* 那麼使用CAS方式将目前結點設為尾結點
* */
if (pred != null) {
node.prev = pred;
//通過使用compareAndSetTail的CAS方法來確定結點能夠被線程安全的添加,雖然不一定能成功。
if (compareAndSetTail(pred, node)) {
//将新構造的結點置為原隊尾結點的後繼
pred.next = node;
//傳回新結點
return node;
}
}
/*
* 3 走到這裡,可能是:
* (1) 由于可能是并發條件,并且上面的CAS操作并沒有循環嘗試,是以可能添加失敗
* (2) 隊列可能為null
* 調用enq方法,采用自旋方式保證構造的新結點成功添加到同步隊列中
* */
enq(node);
return node;
}
/**
* addWaiter方法中使用到的Node構造器
*
* @param thread 目前線程
* @param mode 模式
*/
Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {
//等待隊列中的後繼結點 就等于該結點的模式
//由此可知,共享模式該值為Node.SHARED結點常量,獨占模式該值為null
this.nextWaiter = mode;
//目前線程
this.thread = thread;
}
3.2.1 enq保證結點入隊
enq方法用在同步隊列為null或者一次CAS添加失敗的時候,enq要保證結點最終必定添加成功。大概步驟為:
- 開啟一個死循環,在死循環中進行如下操作;
- 如果隊列為空,那麼初始化隊列,添加一個哨兵結點,結束本次循環,繼續下一次循環;
- 如果隊列不為空,那麼向前面的方法一樣,則嘗試将新結點添加到隊列尾部,如果添加成功則傳回新結點的前驅,循環結束;如果不成功,結束本次循環,繼續下一次循環。
enq方法傳回的是新結點的前驅,當然在addWaiter方法中沒有用到。
另外,添加頭結點使用的compareAndSetHead方法和添加尾結點使用的compareAndSetTail方法都是CAS方法,并且都是調用Unsafe類中的本地方法,因為線程挂機、恢複、CAS操作等最終會通過作業系統中實作,Unsafe類就提供了Java與底層作業系統進行互動的直接接口,這個類的裡面的許多操作類似于C的指針操作,通過找到對某個屬性的偏移量,直接對該屬性指派,因為與Java本地方法對接都是Hospot源碼中的方法,而這些的方法都是采用C++寫的,必須使用指針!
也可以說Unsafe是AQS的實作并發控制機制基石。是以在學習AQS的時候,可以先了解Unsafe:Java中的Unsafe類的原理詳解與使用案例。
/**
* 循環,直到尾結點添加成功
*/
private Node enq(final Node node) {
/*死循環操作,直到添加成功*/
for (; ; ) {
//擷取尾結點t
Node t = tail;
/*如果隊列為null,則初始化同步隊列*/
if (t == null) {
/*調用compareAndSetHead方法,初始化同步隊列
* 注意:這裡是建立了一個空白結點,這就是傳說中的哨兵結點
* CAS成功之後,head将指向該哨兵結點,傳回true
* */
if (compareAndSetHead(new Node()))
//尾結點指向頭結點(哨兵結點)
tail = head;
/*之後并沒有結束,而是繼續循環,此時隊列已經不為空了,是以會進行下面的邏輯*/
}
/*如果隊列不為null,則和外面的的方法類似,調用compareAndSetTail方法,建立新結點到同步隊列尾部*/
else {
/*1 首先修改新結點前驅的指向,這一步不是安全的
但是沒關系,因為這一步如果發生了沖突,那麼下面的CAS操作必然之後有一條線程會成功
其他線程将會重新循環嘗試*/
node.prev = t;
/*
* 2 調用compareAndSetTail方法通過CAS方式嘗試将結點添加到同步隊列尾部
* 如果添加成功,那麼才能繼續下一步,結束這個死循環,否則就會不斷循環嘗試添加
* */
if (compareAndSetTail(t, node)) {
//3 修改原尾結點後繼結點的指向
t.next = node;
//傳回新結點,結束死循環
return t;
}
}
}
}
/**
* CAS添加頭結點. 僅僅在enq方法中用到
*
* @param update 頭結點
* @return true 成功;false 失敗
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS添加尾結點. 僅僅在enq方法中用到
*
* @param expect 預期原尾結點
* @param update 新尾結點
* @return true 成功;false 失敗
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
在addWaiter和enq方法中,成為尾結點需要三步:
- 設定前驅prev
- 設定tail
- 設定後繼next
由于第二步設定tail是CAS操作,那麼隻能保證node的前驅prev一定是正确的,但是此後設定後繼的操作卻不一定能夠馬上成功就切換到了其他線程,此時next可能為null,但實際他的後繼并不一定真的為null。
是以同步隊列隻能保證前驅prev一定是可靠的,但是next卻不一定可靠,是以後面的源碼的周遊操作基本上都是從後向前通過前驅prev進行周遊的。
3.3 acquireQueued結點自旋擷取鎖
能夠走到該方法,那麼說明通過了tryAcquire()和addWaiter()方法,表示該線程擷取鎖已經失敗并且被放入同步隊列尾部了。
acquireQueued方法表示結點進入同步隊列之後的動作,實際上就進入了一個自旋的過程,自旋過程中,當條件滿足,擷取到了鎖,就可以從這個自旋中退出并傳回,否則可能會阻塞該結點的線程,後續即使阻塞被喚醒,還是會自旋嘗試擷取鎖,直到成功或者而抛出異常。
最終如果該方法會因為擷取到鎖而退出,則會傳回否被中斷标志的标志位 或者 因為異常而退出,則會抛出異常!大概步驟為:
- 同樣開啟一個死循環,在死循環中進行下面的操作;
- 如果目前結點的前驅是head結點,那麼嘗試擷取鎖,如果擷取鎖成功,那麼目前結點設定為頭結點head,目前結點線程出隊,表示目前線程已經擷取到了鎖,然後傳回是否被中斷标志,結束循環,進入finally;
- 如果目前結點的前驅不是head結點或者嘗試擷取鎖失敗,那麼判斷目前線程是否應該被挂起,如果傳回true,那麼調用parkAndCheckInterrupt挂起目前結點的線程(LockSupport.park 方法挂起線程,線程出于WAITING),此時不再執行後續的步驟、代碼。
- 如果目前線程不應該被挂起,即傳回false,那本次循環結束,繼續下一次循環。
- 如果線程被其他線程喚醒,那麼判斷是否是因為中斷而被喚醒并修改标志位,同時繼續循環,直到在步驟2獲得鎖,才能跳出循環!(這也是acquire方法不會響應中斷的原理—park方法被中斷時不會抛出異常,僅僅是從挂起狀态傳回,然後需要繼續嘗試擷取鎖)
- 最終,線程獲得了鎖跳出循環,或者發生異常跳出循環,那麼會執行finally語句塊,finally中判斷線程是否是因為發生異常而跳出循環,如果是,那麼執行cancelAcquire方法取消該結點擷取鎖的請求;如果不是,即因為獲得鎖跳出循環,則finally中什麼也不幹!
/**
* @param node 新結點
* @param arg 參數
* @return 如果在等待時中斷,則傳回true
*/
final boolean acquireQueued(final Node node, int arg) {
//failed表示擷取鎖是否失敗标志
boolean failed = true;
try {
//interrupted表示是否被中斷标志
boolean interrupted = false;
/*死循環*/
for (; ; ) {
//擷取新結點的前驅結點
final Node p = node.predecessor();
/*隻有前驅結點是頭結點的時候才能嘗試擷取鎖
* 同樣調用tryAcquire方法擷取鎖
* */
if (p == head && tryAcquire(arg)) {
//擷取到鎖之後,就将自己設定為頭結點(哨兵結點),線程出隊列
setHead(node);
//前驅結點(原哨兵結點)的連結置空,由JVM回收
p.next = null;
//擷取鎖是否失敗改成false,表示成功擷取到了鎖
failed = false;
//傳回interrupted,即傳回線程是否被中斷
return interrupted;
}
/*前驅結點不是頭結點或者擷取同步狀态失敗*/
/*shouldParkAfterFailedAcquire檢測線程是否應該被挂起,如果傳回true
* 則調用parkAndCheckInterrupt用于将線程挂起
* 否則重新開始循環
* */
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*到這一步,說明是目前結點(線程)因為被中斷而喚醒,那就改變自己的中斷标志位狀态資訊為true
* 然後又從新開始循環,直到擷取到鎖,才能傳回
* */
interrupted = true;
}
}
/*線程擷取到鎖或者發生異常之後都會執行的finally語句塊*/ finally {
/*如果failed為true,表示擷取鎖失敗,即對應發生異常的情況,
這裡發生異常的情況隻有在tryAcquire方法和predecessor方法中可能會抛出異常,此時還沒有獲得鎖,failed=true
那麼執行cancelAcquire方法,該方法用于取消該線程擷取鎖的請求,将該結點的線程狀态改為CANCELLED,并嘗試移除結點(如果是尾結點)
另外,在逾時等待擷取鎖的的方法中,如果超過時間沒有擷取到鎖,也會調用該方法
如果failed為false,表示擷取到了鎖,那麼該方法直接結束,繼續往下執行;*/
if (failed)
//取消擷取鎖請求,将目前結點從隊列中移除,
cancelAcquire(node);
}
}
/**
* 位于Node結點類中的方法
* 傳回上一個結點,或在 null 時引發 NullPointerException。 目前置不能為空時使用。 空檢查可以取消,表示此異常無代碼層面的意義,但可以幫助 VM?是以這個異常到底有啥用?
*
* @return 此結點的前驅
*/
final Node predecessor() throws NullPointerException {
//擷取前驅
Node p = prev;
//如果為null,則抛出異常
if (p == null)
throw new NullPointerException();
else
//傳回前驅
return p;
}
/**
* head指向node新結點,該方法是在tryAcquire擷取鎖之後調用,不會産生線程安全問題
*
* @param node 新結點
*/
private void setHead(Node node) {
head = node;
//新結點的thread和prev屬性置空
//即丢棄原來的頭結點,新結點成為哨兵結點,内部線程出隊
//設定裡雖然線程引用置空了,但是一般在tryAcquire方法中軌記錄擷取到鎖的線程,是以不擔心找不到是哪個線程擷取到了鎖
//這裡也能看出,哨兵結點或許也可以叫做"擷取到鎖的結點"
node.thread = null;
node.prev = null;
}
3.3.1 shouldParkAfterFailedAcquire結點是否應該挂起
shouldParkAfterFailedAcquire方法在沒有擷取到鎖之後調用,用于判斷目前結點是否需要被挂起。大概步驟如下:
- 如果前驅結點已經是SIGNAL(-1)狀态,即表示目前結點可以挂起,傳回true,方法結束;
- 否則,如果前驅結點狀态大于0,即 Node.CANCELLED,表示前驅結點放棄了鎖的等待,那麼由該前驅向前查找,直到找到一個狀态小于等于0的結點,目前結點排在該結點後面,傳回false,方法結束;
- 否則,前驅結點的狀态既不是SIGNAL(-1),也不是CANCELLED(1),嘗試CAS設定前驅結點的狀态為SIGNAL(-1),傳回false,方法結束!
隻有前驅結點狀态為SIGNAL時,目前結點才能安心挂起,否則一直自旋!
從這裡能看出來,一個結點的SIGNAL狀态一般都是由它的後繼結點設定的,但是這個狀态卻是表示後繼結點的狀态,表示的意思就是前驅結點如果釋放了鎖,那麼就有義務喚醒後繼結點!
/**
* 檢測目前結點(線程)是否應該被挂起
*
* @param pred 該結點的前驅
* @param node 該結點
* @return 如果前驅結點已經是SIGNAL狀态,目前結點才能挂起,傳回true;否則,可能會查找新的前驅結點或者嘗試将前驅結點設定為SIGNAL狀态,傳回false
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//擷取 前取的waitStatus_等待狀态
//回顧建立結點時候,并沒有給waitStatus指派,是以每一個結點最開始的時候waitStatus的值都為0
int ws = pred.waitStatus;
/*如果前驅結點已經是SIGNAL狀态,即表示目前結點可以挂起*/
if (ws == Node.SIGNAL)
return true;
/*如果前驅結點狀态大于0,即 Node.CANCELLED 表示前驅結點放棄了鎖的等待*/
if (ws > 0) {
/*由該前驅向前查找,直到找到一個狀态小于等于0的結點(即沒有被取消的結點),目前結點成為該結點的後驅,這一步很重要,可能會清理一段被取消了的結點,并且如果該前驅釋放了鎖,還會喚醒它的後繼,保持隊列活性*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
/*否則,前驅結點的狀态既不是SIGNAL(-1),也不是CANCELLED(1)*/
else {
/*前驅結點的狀态CAS設定為SIGNAL(-1),可能失敗,但沒關系,因為失敗之後會一直循環*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//傳回false,表示目前結點不能挂起
return false;
}
3.3.2 parkAndCheckInterrupt挂起線程&判斷中斷狀态
shouldParkAfterFailedAcquire方法傳回true之後,将會調用parkAndCheckInterrupt方法挂起線程并且後續判斷中斷狀态,分兩步:
- 使用LockSupport.park(this)挂起該線程,不再執行後續的步驟、代碼。直到該線程被中斷或者被喚醒(unpark)!
- 如果該線程被中斷或者喚醒,那麼傳回Thread.interrupted()方法的傳回值,該方法用于判斷前線程的中斷狀态,并且清除該中斷狀态,即,如果該線程因為被中斷而喚醒,則中斷狀态為true,将中斷狀态重置為false,并傳回true,如果該線程不是因為中斷被喚醒,則中斷狀态為false,并傳回false。
/**
* 挂起線程,線上程傳回後傳回中斷狀态
*
* @return 如果因為線程中斷而傳回,而傳回true,否則傳回false
*/
private final boolean parkAndCheckInterrupt() {
/*1)使用LockSupport.park(this)挂起該線程,不再執行後續的步驟、代碼。直到該線程被中斷或者被喚醒(unpark)*/
LockSupport.park(this);
/*2)如果該線程被中斷或者喚醒,那麼傳回Thread.interrupted()方法的傳回值,
該方法用于判斷前線程的中斷狀态,并且清除該中斷狀态,即,如果該線程因為被中斷而喚醒,則中斷狀态為true,将中斷狀态重置為false,并傳回true,注意park方法被中斷時不會抛出異常!
如果該線程不是因為中斷被喚醒,則中斷狀态為false,并傳回false*/
return Thread.interrupted();
}
3.3.3 finally代碼塊
在acquireQueued方法中,具有一個finally代碼塊,那麼無論try中發生了什麼,finally代碼塊都會執行的。在acquire獨占式不可中斷擷取鎖的方法中,執行finally的隻有兩種情況:
- 目前結點(線程)最終擷取到了鎖,此時會進入finally,而在擷取到鎖之後會設定failed = false。
- 在try中發生了異常,此時直接跳到finally中。這裡發生異常的情況隻可能在tryAcquire或predecessor方法中發生,然後直接進入finally代碼塊中,此時還沒有獲得鎖,failed=true!
- tryAcquire方法是我們自己實作的,抛出什麼異常由我們來定,就算抛出異常一般也不會在acquireQueued中抛出,可能在最開始調用tryAcquire時就抛出了。
- predecessor方法中,會檢查如果前驅結點為null則抛出NullPointerException。但是注釋中又說這個檢查無代碼層面的意義,或許是這個異常永遠不會抛出?
finally代碼塊中的邏輯為:
- 如果failed = true,表示沒有擷取鎖而進行finally,即發生了異常。那麼執行cancelAcquire方法取消目前結點線程擷取鎖的請求,acquireQueued方法結束,然後抛出異常。
- 如果failed = false,表示已經擷取到了鎖,那麼實際上finally中什麼都不會執行。acquireQueued方法結束,傳回interrupted—是否被中斷标志。
綜上所述,在acquire獨占式不可中斷擷取鎖的方法中,大部分情況在finally中都是什麼也不幹就傳回了,或者說抛出異常的情況基本沒有,是以cancelAcquire方法基本不考慮。
但是在可中斷擷取鎖或者逾時擷取鎖的方法中,執行到cancelAcquire方法的情況還是比較常見的。是以将cancelAcquire方法的源碼分析放到可中斷擷取鎖方法的源碼分析部分!
3.4 selfInterrupt安全中斷
selfInterrupt是acquire中最後可能調用的一個方法,顧名思義,用于安全的中斷,什麼意思呢,就是根據!tryAcquire和acquireQueued傳回值判斷是否需要設定中斷标志位。
隻有tryAcquire嘗試失敗,并且acquireQueued方法true時,才表示該線程是被中斷過了的,但是在parkAndCheckInterrupt裡面判斷中斷标志位之後又重置的中斷标志位(interrupted方法會重置中斷标志位)。
雖然看起來沒啥用,但是本着負責的态度,還是将中斷标志位記錄下來。那麼此時重新設定該線程的中斷标志位為true。
/**
* 中斷目前線程,由于此時目前線程出于運作态,是以隻會設定中斷标志位,并不會抛出異常
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
4 release獨占式鎖釋放
目前線程擷取到鎖并執行了相應邏輯之後,就需要釋放鎖,使得後續結點能夠繼續擷取鎖。通過調用AQS的release(int arg)模版方法可以獨占式的釋放鎖,在該方法大概步驟如下:
- 嘗試使用tryRelease(arg)釋放鎖,該方法在最開始我們就講過,是自己實作的方法,通常來說就是将state值為0或者減少、清除目前獲得鎖的線程等等,如果符合自己的邏輯,鎖釋放成功則傳回true,否則傳回false;
- 如果tryRelease釋放成功傳回true,判斷如果head不為null且head的狀态不為0,那麼嘗試調用unparkSuccessor方法喚醒頭結點之後的一個非取消狀态(非CANCELLED狀态)的後繼結點,讓其可以進行鎖擷取。傳回true,方法結束;
- 如果tryRelease釋放失敗,那麼傳回false,方法結束。
/**
* 獨占式的釋放同步狀态
*
* @param arg 參數
* @return 釋放成功傳回true, 否則傳回false
*/
public final boolean release(int arg) {
/*tryRelease釋放同步狀态,該方法是自己重寫實作的方法
釋放成功将傳回true,否則傳回false或者自己實作的邏輯*/
if (tryRelease(arg)) {
//擷取頭結點
Node h = head;
//如果頭結點不為null并且狀态不等于0
if (h != null && h.waitStatus != 0)
/*那麼喚醒頭結點的一個出于等待鎖狀态的後繼結點
* 該方法在acquire中已經講過了
* */
unparkSuccessor(h);
return true;
}
return false;
}
4.1 unparkSuccessor喚醒後繼結點
unparkSuccessor用于喚醒參數結點的某個非取消的後繼結點,該方法在很多地方法都被調用,大概步驟:
- 如果目前結點的狀态小于0,那麼CAS設定為0,表示後繼結點可以繼續嘗試擷取鎖。
- 如果目前結點的後繼s為null或者狀态為取消CANCELLED,則将s先指向null;然後從tail開始到node之間倒序向前查找,找到離tail最近的非取消結點賦給s。需要從後向前周遊,因為同步隊列隻保證結點前驅關系的正确性。
- 如果s不為null,那麼狀态肯定不是取消CANCELLED,則直接喚醒s的線程,調用LockSupport.unpark方法喚醒,被喚醒的結點将從被park的位置繼續執行!
/**
* 喚醒指定結點的後繼結點
*
* @param node 指定結點
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
/*
* 1) 如果目前結點的狀态小于0,那麼CAS設定為0,表示後繼結點線程可以先嘗試獲鎖,而不是直接挂起。
* */
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//先擷取node的直接後繼
Node s = node.next;
/*
* 2) 如果s為null或者狀态為取消CANCELLED,則從tail開始到node之間倒序向前查找,找到離tail最近的非取消結點賦給s。
* */
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
/*
* 3)如果s不為null,那麼狀态肯定不是取消CANCELLED,則直接喚醒s的線程,調用LockSupport.unpark方法喚醒,被喚醒的結點将從被park的位置向後執行!
* */
if (s != null)
LockSupport.unpark(s.thread);
}
5 acquirelnterruptibly獨占式可中斷擷取鎖
在JDK1.5之前,當一個線程擷取不到鎖而被阻塞在synchronized之外時,如果對該線程進行中斷操作,此時該線程的中斷标志位會被修改,但線程依舊會阻塞在synchronized上,等待着擷取鎖,即無法響應中斷。
上面分析的獨占式擷取鎖的方法acquire,同樣是不會響應中斷的。但是AQS提供了另外一個acquireInterruptibly模版方法,調用該方法的線程在等待擷取鎖時,如果目前線程被中斷,會立刻傳回,并抛出InterruptedException。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//如果目前線程被中斷,直接抛出異常
if (Thread.interrupted())
throw new InterruptedException();
//嘗試擷取鎖
if (!tryAcquire(arg))
//如果沒擷取到,那麼調用AQS 可被中斷的方法
doAcquireInterruptibly(arg);
}
5.1 doAcquireInterruptibly獨占式可中斷擷取鎖
doAcquireInterruptibly會首先判斷線程是否是中斷狀态,如果是則直接傳回并抛出異常其他不步驟和獨占式不可中斷擷取鎖基本原理一緻。還有一點的差別就是在後續挂起的線程因為線程被中斷而傳回時的處理方式不一樣:
- 獨占式不可中斷擷取鎖僅僅是記錄該狀态,interrupted = true,緊接着又繼續循環擷取鎖;
- 獨占式可中斷擷取鎖則直接抛出異常,是以會直接跳出循環去執行finally代碼塊。
/**
* 獨占可中斷式的鎖擷取
*
* @param arg 參數
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//同樣調用addWaiter将目前線程構造成結點加入到同步隊列尾部
final Node node = addWaiter(Node.EXCLUSIVE);
//擷取鎖失敗标志,預設為true
boolean failed = true;
try {
/*和獨占式不可中斷方法acquireQueued一樣,循環擷取鎖*/
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*
* 這裡就是差別所在,獨占不可中斷式方法acquireQueued中
* 如果線程被中斷,此處僅僅會記錄該狀态,interrupted = true,緊接着又繼續循環擷取鎖
*
* 但是在該獨占可中斷式的鎖擷取方法中
* 如果線程被中斷,此處直接抛出異常,是以會直接跳出循環去執行finally代碼塊
* */
throw new InterruptedException();
}
}
/*擷取到鎖或者抛出異常都會執行finally代碼塊*/
finally {
/*如果擷取鎖失敗。可能就是線程被中斷了,那麼執行cancelAcquire方法取消該結點對鎖的請求,該線程結束*/
if (failed)
cancelAcquire(node);
}
}
5.2 finally代碼塊
在doAcquireInterruptibly方法中,具有一個finally代碼塊,那麼無論try中發生了什麼,finally代碼塊都會執行的。在acquireInterruptibly獨占式可中斷擷取鎖的方法中,執行finally的隻有兩種情況:
- 目前結點(線程)最終擷取到了鎖,此時會進入finally,而在擷取到鎖之後會設定failed = false。
- 在try中發生了異常,此時直接跳到finally中,這裡發生異常的情況可能在tryAcquire、predecessor方法中,更加有可能的原因是因為線程被中斷而抛出InterruptedException異常,然後直接進入finally代碼塊中,此時還沒有獲得鎖,failed=true!
- tryAcquire方法是我們自己實作的,抛出什麼異常由我們來定,就算抛出異常一般也不會在doAcquireInterruptibly中抛出,可能在最開始調用tryAcquire時就抛出了。
- predecessor方法中,會檢查如果前驅結點為null則抛出NullPointerException。但是注釋中又說這個檢查無代碼層面的意義,或許是這個異常永遠不會抛出?
- 根據doAcquireInterruptibly邏輯,如果線程在挂起過程中被中斷,那麼将主動抛出InterruptedException異常,這也是被稱為“可中斷”的邏輯
finally代碼塊中的邏輯為:
- 如果failed = true,表示沒有擷取鎖而進行finally,即發生了異常。那麼執行cancelAcquire方法取消目前結點線程擷取鎖的請求,doAcquireInterruptibly方法結束,抛出異常!
- 如果failed = false,表示已經擷取到了鎖,那麼實際上finally中什麼都不會執行,doAcquireInterruptibly方法結束。
5.2.1 cancelAcquire取消擷取鎖請求
由于獨占式可中斷擷取鎖的方法中,線程被中斷而抛出異常的情況比較常見,是以這裡分析finally中cancelAcquire的源碼。cancelAcquire方法用于取消結點擷取鎖的請求,參數為需要取消的結點node,大概步驟為:
- node記錄的線程thread置為null
- 跳過已取消的前置結點。由node向前查找,直到找到一個狀态小于等于0的結點pred (即找一個沒有取消的結點),更新node.prev為找到的pred。
- node的等待狀态waitStatus置為CANCELLED,即取消請求鎖。
- 如果node是尾結點,那麼嘗試CAS更新tail指向pred,成功之後繼續CAS設定pred.next為null。
- 否則,說明node不是尾結點或者CAS失敗(可能存在對尾結點的并發操作):
- 如果node不是head的後繼 并且 (pred的狀态為SIGNAL或者将pred的waitStatus置為SIGNAL成功) 并且 pred記錄的線程不為null。那麼設定pred.next指向node.next。最後node.next指向node自己。
- 否則,說明node是head的後繼 或者pred狀态設定失敗 或者 pred記錄的線程為null。那麼調用unparkSuccessor喚醒node的一個沒取消的後繼結點。最後node.next指向node自己。
/**
* 取消指定結點擷取鎖的請求
*
* @param node 指定結點
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
/*1 node記錄的線程thread置為null*/
node.thread = null;
/*2 類似于shouldParkAfterFailedAcquire方法中查找有效前驅的代碼:
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
這裡同樣由node向前查找,直到找到一個狀态小于等于0的結點(即沒有被取消的結點),作為前驅
但是這裡隻更新了node.prev,沒有更新pred.next*/
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//predNext記錄pred的後繼,後續CAS會用到。
Node predNext = pred.next;
/*3 node的等待狀态設定為CANCELLED,即取消請求鎖*/
node.waitStatus = Node.CANCELLED;
/*4 如果目前結點是尾結點,那麼嘗試CAS更新tail指向pred,成功之後繼續CAS設定pred.next為null。*/
if (node == tail && compareAndSetTail(node, pred)) {
//新尾結點pred的next結點設定為null,即使失敗了也沒關系,說明有其它新入隊線程或者其它取消線程更新掉了。
compareAndSetNext(pred, predNext, null);
}
/*5 否則,說明node不是尾結點或者CAS失敗(可能存在對尾結點的并發操作),這種情況要做的事情是把pred和node的後繼非取消結點拼起來。*/
else {
int ws;
/*5.1 如果node不是head的後繼 并且 (pred的狀态為SIGNAL或者将pred的waitStatus置為SIGNAL成功) 并且 pred記錄的線程不為null。
那麼設定pred.next指向node.next。這裡沒有設定prev,但是沒關系。
此時pred的後繼變成了node的後繼—next,後續next結點如果擷取到鎖,那麼在shouldParkAfterFailedAcquire方法中查找有效前驅時,
也會找到這個沒取消的pred,同時将next.prev指向pred,也就設定了prev關系了。
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//擷取next結點
Node next = node.next;
//如果next結點存在且未被取消
if (next != null && next.waitStatus <= 0)
//那麼CAS設定perd.next指向node.next
compareAndSetNext(pred, predNext, next);
}
/*5.2 否則,說明node是head的後繼 或者pred狀态設定失敗 或者 pred記錄的線程為null。
*
* 此時需要調用unparkSuccessor方法嘗試喚醒node結點的後繼結點,因為node作為head的後繼結點是唯一有資格取嘗試擷取鎖的結點。
* 如果外部線程A釋放鎖,但是還沒有調用unpark喚醒node的時候,此時node被中斷或者發生異常,這時node将會調用cancelAcquire取消,結點内部的記錄線程變成null,
* 此時就是算A線程的unpark方法執行,也隻是LockSupport.unpark(null)而已,也就不會喚醒任何結點了
* 那麼node後面的結點也不會被喚醒了,隊列就失活了;如果在這種情況下,在node将會調用cancelAcquire取消的代碼中
* 調用一次unparkSuccessor,那麼将喚醒被取消結點的後繼結點,讓後繼結點可以嘗試擷取鎖,進而保證隊列活性!
*
* 前面對node進行取消的代碼中,并沒有将node徹底移除隊列,
* 而被喚醒的結點會嘗試擷取鎖,而在在擷取到鎖之後,在
* setHead(node);
* p.next = null; // help GC
* 部分,可能将這些被取消的結點清除
* */
else {
unparkSuccessor(node);
}
/*最後node.next指向node自身,友善後續GC時直接銷毀無效結點
同時也是為了Condition的isOnSyncQueue方法,判斷一個原先屬于條件隊列的結點是否轉移到了同步隊列。
因為同步隊列中會用到結點的next域,取消結點的next也有值的話,可以斷言next域有值的結點一定在同步隊列上。
這裡也能看出來,周遊的時候應該采用倒序周遊,否則采用正序周遊可能出現死循環*/
node.next = node;
}
}
5.2.2 cancelAcquire案例示範
設一個同步隊列結構如下,有ABCDE五個線程調用acquireInterruptibly方法争奪鎖,并且BCDE線程都是因為擷取不到鎖而導緻的阻塞。
我們來看看幾種情況下cancelAcquire方法怎麼處理的:
如果此時線程D被中斷,那麼抛出異常進入finally代碼塊,屬于node不是尾結點,node不是head的後繼的情況,如下圖:
在cancelAcquire方法之後的結構如下:
如果此時線程E被中斷,那麼抛出異常進入finally代碼塊,屬于node是尾結點的情況,如下圖:
在cancelAcquire方法之後的結構如下:
如果此時進來了兩個新線程F、G,并且又都被挂起了,那麼此時同步隊列結構如下圖:
可以看到,實際上該隊列出現了分叉,這種情況在同步隊列中是很常見的,因為被取消的結點并沒有主動去除自己的prev引用。那麼這部分被取消的結點無法被删除嗎,其實是可以的,隻不過需要滿足一定的條件結構!
如果此時線程B被中斷,那麼抛出異常進入finally代碼塊,屬于node不是尾結點,node是head的後繼的情況,如下圖:
在cancelAcquire方法之後的結構如下:
注意在這種情況下,node還會調用unparkSuccessor方法喚醒後繼結點C,讓C嘗試擷取鎖,如果假設此時線程A的鎖還沒有使用完畢,那麼此時C肯定不能擷取到鎖。
但是C也不是什麼都沒做,C在被喚醒之後獲得CPU執行權的那段時間裡,在doAcquireInterruptibly方法的for循環中,改變了一些引用關系。
它會判斷自己是否可以被挂起,此時它的前驅被取消了waitStatus=1,明顯不能,是以會繼續向前尋找有效的前驅,具體的過程在前面的“acquire- acquireQueued”部分有詳解,最終C被挂起之後的結構如下:
可以看到C最終和head結點直接連結了起來,但是此時被取消的B由于具有prev引用,是以還沒有被GC,不要急,這是因為還沒到指定結構,到了就自然會被GC了。
如果此時線程A的資源使用完畢,那麼首先釋放鎖,然後會嘗試喚醒一個沒有取消的後繼線程,明顯選擇C。
如果在A釋放鎖之後,調用LockSupport.unpark方法喚醒C之前,C被先一步因中斷而喚醒了。此時C抛出異常,不會再去獲得鎖,而是去finally執行cancelAcquire方法去了,此時還是屬于node不是尾結點,node是head的後繼的情況,如下圖:
那麼在C執行完cancelAcquire方法之後的結構如下:
如果此時線程A又擷取到了CPU的執行權,執行LockSupport.unpark,但此時結點C因為被中斷而取消,其内部記錄的線程變量變成了null,LockSupport.unpark(null),将會什麼也不做。那麼這時隊列豈不是失活了?其實并沒有!
此時,cancelAcquire方法中的“node不是尾結點,node是head的後繼”這種情況下的unparkSuccessor方法就非常關鍵了。該方法用于喚醒被取消結點C的一個沒被取消的後繼結點F,讓其嘗試擷取鎖,這樣就能保證隊列不失活。
F被喚醒之後,會判斷是否能夠休眠,明顯不能,因為前驅node的狀态為1,此時經過循環中一系列方法的操作,會變成如下結構:
明顯結點F是head的直接後繼,可以擷取鎖。
在擷取鎖成功之後,F會将自己設定為新的head,此時又會改變一些引用關系,即将F與前驅結點的prev和next關系都移除:
setHead(node);
p.next = null; // help GC
引用關系改變之後的結構下:
可以看到,到這一步,才會真正的将哪些無效結點删除,被GC回收。那麼,需要真正删除一個結點需要有什麼條件?條件就是:如果某個結點擷取到了鎖,那麼該結點的前驅以及和該結點前驅相關的結點都将會被删除!
但是,在上面的分析中,我們認為隻要有節點引用關聯就不會被GC回收,然而實際上現代Java虛拟機采用可達性分析算法來分析垃圾,是以,在上面的隊列中,對于那些“分叉”,即那些被取消的、隻剩下prev引用的、最重要的是不能通過head和tail的引用鍊到達也沒有外部引用可達的節點,将會在可達性分析算法中被标記為垃圾并在一次GC中被直接回收!
比如此時F線程執行完了,下一個就是G,那麼G獲得鎖之後,F将會被删除,最終結構如下:
6 tryAcquireNanos獨占式逾時擷取鎖
獨占式逾時擷取鎖tryAcquireNanos模版方法可以被視作響應中斷擷取鎖acquireInterruptibly方法的“增強版”,支援中斷,支援逾時時間!
/**
* 獨占式逾時擷取鎖,支援中斷
*
* @param arg 參數
* @param nanosTimeout 逾時時間,納秒
* @return 是否擷取鎖成功
* @throws InterruptedException 如果被中斷,則抛出InterruptedException異常
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//如果目前線程被中斷,直接抛出異常
if (Thread.interrupted())
throw new InterruptedException();
//同樣調用tryAcquire嘗試擷取鎖,如果擷取成功則直接傳回true
//否則調用doAcquireNanos方法挂起指定一段時間,該短時間内擷取到了鎖則傳回true,逾時還未擷取到鎖則傳回false
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
6.1 doAcquireNanos獨占式逾時擷取鎖
doAcquireNanos(int arg,long nanosTimeout)方法在支援響應中斷的基礎上, 增加了逾時擷取的特性。
該方法在自旋過程中,當結點的前驅結點為頭結點時嘗試擷取鎖,如果擷取成功則從該方法傳回,這個過程和獨占式同步擷取的過程類似,但是在鎖擷取失敗的處理上有所不同。
如果目前線程擷取鎖失敗,則判斷是否逾時(nanosTimeout小于等于0表示已經逾時),如果沒有逾時,重新計算逾時間隔nanosTimeout,然後使目前線程等待nanosTimeout納秒(當已到設定的逾時時間,該線程會從LockSupport.parkNanos(Objectblocker,long nanos)方法傳回)。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000納秒)時,将不會使該線程進行逾時等待,而是進入快速的自旋過程。原因在于,非常短的逾時等待無法做到十分精确,如果這時再進行逾時等待,相反會讓nanosTimeout的逾時從整體上表現得反而不精确。
是以,在逾時非常短的場景下,AQS會進入無條件的快速自旋而不是挂起線程。
static final long spinForTimeoutThreshold = 1000L;
/**
* 獨占式逾時擷取鎖
*
* @param arg 參數
* @param nanosTimeout 剩餘逾時時間,納秒
* @return true 成功 ;false 失敗
* @throws InterruptedException 如果被中斷,則抛出InterruptedException異常
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//擷取目前的納秒時間
long lastTime = System.nanoTime();
//同樣調用addWaiter将目前線程構造成結點加入到同步隊列尾部
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
/*和獨占式不可中斷方法acquireQueued一樣,循環擷取鎖*/
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
/*這裡就是差別所在*/
//如果剩餘逾時時間小于0,則退出循環,傳回false,表示沒擷取到鎖
if (nanosTimeout <= 0)
return false;
//如果需要挂起 并且 剩餘nanosTimeout大于spinForTimeoutThreshold,即大于1000納秒
if (shouldParkAfterFailedAcquire(p, node)
&& nanosTimeout > spinForTimeoutThreshold)
//那麼調用LockSupport.parkNanos方法将目前線程挂起nanosTimeout
LockSupport.parkNanos(this, nanosTimeout);
//擷取目前納秒,走到這一步可能是線程中途被喚醒了
long now = System.nanoTime();
//計算 新的剩餘逾時時間:原剩餘逾時時間 - (目前時間now - 上一次計算時的時間lastTime)
nanosTimeout -= now - lastTime;
//lastIme指派為本次計算時的時間
lastTime = now;
//如果線程被中斷了,那麼直接抛出異常
if (Thread.interrupted())
throw new InterruptedException();
}
}
/*擷取到鎖、逾時時間到了、抛出異常都會執行finally代碼塊*/
finally {
/*如果擷取鎖失敗。可能就是線程被中斷了,那麼執行cancelAcquire方法取消該結點對鎖的請求,該線程結束
* 或者是逾時時間到了,那麼執行cancelAcquire方法取消該結點對鎖的請求,将傳回false
* */
if (failed)
cancelAcquire(node);
}
}
6.2 finally代碼塊
在doAcquireNanos方法中,具有一個finally代碼塊,那麼無論try中發生了什麼,finally代碼塊都會執行的。在tryAcquireNanos獨占式逾時擷取鎖的方法中,執行finally的隻有三種情況:
- 目前結點(線程)最終擷取到了鎖,此時會進入finally,而在擷取到鎖之後會設定failed = false。
- 在try中發生了異常,此時直接跳到finally中,這裡發生異常的情況可能在tryAcquire、predecessor方法中,更加有可能的原因是因為線程被中斷而抛出InterruptedException異常,然後直接進入finally代碼塊中,此時還沒有獲得鎖,failed=true!
- tryAcquire方法是我們自己實作的,據抛出什麼異常由我們來定,就算抛出異常一般也不會在doAcquireNanos中抛出,可能在最開始調用tryAcquire時就抛出了。
- predecessor方法中,會檢查如果前驅結點為null則抛出NullPointerException。但是注釋中又說這個檢查無代碼層面的意義,或許是這個異常永遠不會抛出?
- 根據doAcquireNanos邏輯,如果線程在挂起過程中被中斷,那麼将主動抛出InterruptedException異常,這也是被稱為“可中斷”的邏輯。
- 方法的逾時時間到了,目前線程還沒有擷取到鎖,那麼,将會跳出循環,直接進入finally代碼塊中,此時還沒有獲得鎖,failed=true!
finally代碼塊中的邏輯為:
- 如果failed = true,表示沒有擷取鎖而進行finally,可能發生了異常 或者 逾時時間到了。那麼執行cancelAcquire方法取消目前結點線程擷取鎖的請求,doAcquireNanos方法結束,抛出異常 或者傳回 false。
- 如果failed = false,表示已經擷取到了鎖,那麼實際上finally中什麼都不會執行,doAcquireNanos方法結束,傳回true。
7 獨占式擷取/釋放鎖總結
獨占式的擷取鎖和釋放鎖的方法中,我們需要重寫tryAcquire 和tryRelease 方法。
獨占式的擷取鎖和釋放鎖時,需要在tryAcquire方法中記錄到底是哪一個線程擷取了鎖。一般使用exclusiveOwnerThread字段(setExclusiveOwnerThread方法)記錄,在tryRelease 方法釋放鎖成功之後清楚該字段的值。
7.1 acquire/release流程圖
acquire流程:
release流程:
7.2 acquire一般流程
根據在上面的源碼,我們嘗試總結出acquire方法(獨占式擷取鎖)建構同步隊列的一般流程為。
首先第一個線程A調用lock方法,此時還沒有線程擷取鎖,那麼線程A在acquire的tryAcquire方法中即獲得了鎖,此時同步隊列還沒有初始化,head和tail都是null。
此時第二個線程B進來了,由于A已經擷取了鎖,此時該線程将會被構造成結點添加到隊列中,enq方法中,第一次循環時,由于tail為null,是以将會構造一個空結點作為同步隊列的頭結點和尾結點:
第二次循環時,該結點将會添加到結點尾部,tail指向該結點!
然後在acquireQueued方法中,假設結點自旋沒有獲得鎖,那麼在shouldParkAfterFailedAcquire方法中将會設定前驅結點的waitStatus=-1,然後該結點的線程B将會被挂起:
接下來,如果線程C也嘗試擷取鎖,假設沒有擷取到,那麼此時C也将會被挂起:
從這裡能夠看出來,一個結點的SIGNAL狀态(-1)是它的後繼子結點給它設定的,那多條線程情況下,最有可能的情況為:
到此acquire一般流程分析完畢!
7.3 release一般流程
根據在上面的源碼以上面的圖為基礎,我們嘗試總結出release方法(獨占式鎖釋放)的一般流程為:
假如線程A共享資源使用完畢,調用unlock方法,内部調用了release方法,此時先調用tryRelease 釋放鎖,釋放成功之後調用unparkSuccessor方法,設定head結點狀态為0,并喚醒head結點的沒有取消的後繼結點(waitStatus不大于0),這裡明顯是B線程結點。resize方法到這裡其實已經結束了,下面就是被喚醒結點的操作。
調用unpark喚醒線程B之後,線程B在parkAndCheckInterrupt方法中繼續執行,首先判斷中斷狀态,記錄是因為什麼原因被喚醒的,這裡不是因為中斷而被喚醒,是以傳回false,那麼acquireQueued的interrupted字段為false。
然後線程B在acquireQueued方法中繼續自旋,假設此時B擷取到了鎖,那麼調用setHead方法清除線程記錄,并将B結點設定為頭結點。這裡清除了結點内部的線程記錄也沒關系,因為在我們實作tryAcquire方法中一般會記錄是哪個線程擷取了鎖。
當最後一個阻塞結點被喚醒,并且線程E擷取鎖之後,同步隊列的結構如下:
當最後一個線程E共享資源使用完畢調用unlock時,在release中釋放鎖之後,再嘗試利用head喚醒後繼結點時,判斷此時head結點的waitStatus還是等于0,是以不會再調用unparkSuccessor方法。
到此release一般流程分析完畢!
如果有什麼不懂或者需要交流,可以留言。另外希望點贊、收藏、關注,我将不間斷更新各種Java學習部落格!