天天看點

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量

AbstractQueuedSynchronizer(AQS)源碼解析(三)——條件變量

前兩期我們已經掌握了AQS的基本結構、以及AQS是如何釋放和擷取資源的。其實到這裡,我們已經掌握了AQS作為同步器的全部功能 不過,有些情況使用同步功能不夠靈活,是以AQS又引入了作業系統中的另一個高度相關的概念——條件變量。由于條件變量的使用緊密依賴于AQS提供的釋放、擷取資源功能和同步隊列,是以都放在了AQS源碼中 能堅持看到這裡的同學已經很不容易了,再接再厲,一起沖掉最後一座堡壘吧🦾🦾🦾

條件對象這一概念源自于作業系統,設計它是為了解決等待同步需求,實作線程間協作通信的一種機制。Java其實也已經内置了條件變量,它和螢幕鎖是綁定在一起的,即<code>Object</code>的<code>wait</code>和<code>notify</code>方法,使用這兩個方法就可以實作線程之間的協作

Java中的條件變量直到Java 5才出現,用它來代替傳統的<code>Object</code>的<code>wait</code>和<code>notify</code>方法。相比<code>wait</code>和<code>notify</code>,<code>Condition</code>的<code>await</code>和<code>signal</code>方法更加安全和高效,因為<code>Condition</code>是基于AQS實作的,加鎖、釋放鎖的效率更高

條件變量顧名思義就是表示某種條件的變量。不過需要說明的是,條件變量中的條件并沒有實際含義,僅僅隻是一個标記,條件的含義需要代碼來賦予

<code>Condition</code>是一個接口,條件變量都實作了<code>Condition</code>接口。該接口的基本方法是<code>await</code>、<code>signal</code>、<code>signalAll</code>這些

不過<code>Condition</code>依賴于<code>Lock</code>接口,需要借助<code>Lock.newCondition</code>方法來建立條件變量。是以<code>Condition</code>必然是和某個<code>Lock</code>綁定在一起的,這就和<code>await</code>和<code>signal</code>一定會和<code>Object</code>的螢幕鎖綁定在一起。是以,Java中的條件變量必須配合鎖使用

<code>Condition</code>和Java内置的條件變量方法之間的對應關系如下:

<code>Condition.await</code> 等價于 <code>Object.wait</code>

<code>Condition.signal</code> 等價于 <code>Object.notify</code>

不多BB了,直接看源碼吧~

AQS為條件變量的實作提供了95%以上的功能,<code>Lock</code>接口實作類一般隻需要實作一下<code>newCondition</code>方法,以及AQS的<code>isHeldExclusively</code>方法,就可以直接使用條件變量了,你說方不友善!

AQS實作的方式就是直接提供了<code>Condition</code>的一個實作類——<code>ConditionObject</code>,我接下來就将其稱為條件對象(可能不太嚴謹)。<code>ConditionObject</code>是AQS的内部類,可見性為<code>public</code>

每個<code>ConditionObject</code>都維護了一個條件隊列,首尾節點分别由<code>firstWaiter</code>、<code>lastWaiter</code>兩個域來管理:

每個在條件隊列中等待的線程都由<code>Node</code>類來維護,它們之間通過<code>Node</code>類的<code>nextWaiter</code>相連,形成一個單向連結清單。每個<code>Node</code>的<code>waitStatus</code>都是<code>Node.CONDITION</code>,表明該線程正在某個條件變量的條件隊列中等待ing~

十分的樸實無華且簡單通透,要是所有代碼都這麼簡單,那該有多好啊~

正如前面所說,如果基于AQS實作的<code>Lock</code>接口實作類想要使用AQS提供的條件變量,隻需要實作<code>newCondition</code>方法、<code>isHeldExclusively</code>方法即可。而實作<code>newCondition</code>方法一般隻需要直接調用<code>ConditionObject</code>的構造方法即可,多麼簡單!

<code>isHeldExclusively</code>方法會在條件喚醒(<code>signal</code>、<code>signalAll</code>)中被調用,因為隻有在持有互斥資源的情況下,才允許使用條件變量,在持有共享資源的情況下是不允許使用的!

接下來主要剖析一下<code>ConditionObject</code>是如何實作<code>Condition</code>接口的兩大重要功能——條件等待、條件喚醒

作者:酒冽        出處:https://www.cnblogs.com/frankiedyz/p/15676704.html

版權:本文版權歸作者和部落格園共有

轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接配接;否則必究法律責任

<code>await</code>方法就是最基本的條件等待,它是可以被中斷的。源碼如下:

<code>addConditionWaiter</code>方法會添加一個新節點到等待隊列的隊尾,該節點的<code>waitStatus</code>是<code>CONDITION</code>。源碼如下:

<code>unlinkCancelledWaiters</code>會将所有被取消的<code>Node</code>從條件隊列中移除,該方法隻會在持有鎖的情況下才會被調用,源碼如下:

<code>unlinkCancelledWaiters</code>相當于是進行了一次正向周遊,将那些<code>waitStatus</code>不為<code>CONDITION</code>(即為<code>CANCELLED</code>)的<code>Node</code>給移除,挺簡單的~

回到<code>await</code>方法,接下來調用<code>fullyRelease</code>方法儲存目前的<code>state</code>,并調用一次<code>release</code>,最後傳回儲存的<code>state</code>。源碼如下:

調用<code>release</code>就是為了完全釋放目前線程所持有的資源,防止死鎖。如果釋放失敗,說明目前線程壓根沒有擷取到資源就調用了<code>await</code>方法,這是不允許的,會抛出<code>IllegalMonitorStateException</code>異常。這也解決了我之前的疑惑——如何保證調用<code>unlinkCancelledWaiters</code>之前必須持有鎖呢? 這裡就給出了答案!隻有在目前線程持有資源的前提下,代碼才能正常執行下去

回到<code>await</code>方法,接下來調用<code>isOnSyncQueue</code>方法來檢查目前線程對應的<code>Node</code>在同步隊列上還是條件隊列上,源碼如下:

該方法會先看目标<code>node</code>的<code>waitStatus</code>是否為<code>CONDITION</code>,或者它的<code>prev</code>域是否為空,如果是,那麼它一定在條件隊列上,傳回false;如果目标<code>node</code>的<code>next</code>域不為空,那麼它一定在同步隊列上,傳回true。如果以上兩種快捷判斷方法無法判定,那麼需要調用<code>findNodeFromTail</code>進行暴力判定。總之,如果目标node在條件隊列上等待,則傳回true,否則說明在同步隊列上,傳回false

<code>await</code>中的<code>while(!isOnSyncQueue)</code>語義:如果目前節點所等待的條件沒有滿足,那麼說明它還在條件隊列上等待,就會一直被困在while循環中不停地被阻塞,不能離開while循環去競争擷取資源

<code>findNodeFromTail</code>方法會從<code>tail</code>開始向前周遊,以确定目标<code>node</code>是否位于同步隊列上,很簡單哦~其源碼如下:

回到<code>await</code>方法,隻有當<code>node</code>在條件隊列上等待,才會執行<code>while</code>循環。在循環中,将目前線程(即<code>node</code>對應的線程)阻塞。如果被喚醒,就調用<code>checkInterruptWhileWaiting</code>方法檢查是否在阻塞過程中被中斷,如果發生了中斷,該方法會傳回非0值。<code>while</code>循環中如果發現被中斷,則退出循環

<code>checkInterruptWhileWaiting</code>方法即其調用到的<code>transferAfterCancelledWait</code>方法,它們的源碼如下:

總之,如果線程在阻塞過程中沒有發生中斷,則<code>checkInterruptWhileWaiting</code>傳回0,否則傳回非0值

具體來說,如果發生中斷,就會調用<code>transferAfterCancelledWait</code>方法将<code>node</code>轉移到同步隊列,并檢測中斷是發生在被<code>signal</code>之前還是之後。如果發生在被<code>signal</code>之前則傳回THROW_IE(-1),如果發生在被<code>signal</code>之後則傳回REINTERRUPT(1)

回到<code>await</code>方法的<code>while</code>循環中,如果沒有被<code>signal</code>或被中斷就會一直阻塞。如果被<code>signal</code>就不滿足<code>while</code>條件,會退出循環;如果被中斷而不是被<code>signal</code>,會直接<code>break</code>離開<code>while</code>循環

退出<code>while</code>循環說明此時已經位于同步隊列(要麼因為被<code>signal</code>而transfer到同步隊列,要麼因為被中斷而調用<code>transferAfterCancelledWait</code>方法移動到同步隊列),可以直接調用<code>acquireQueued</code>方法(調用該方法的前提:位于同步隊列中)排隊擷取鎖。擷取雖然可能導緻再次被阻塞,但這裡是在同步隊列上的阻塞,而不是在條件隊列上的阻塞

退出處理是通過<code>reportInterruptAfterWait</code>方法,具體如何處理,取決于<code>interruptMode</code>的值。源碼如下:

如果<code>interruptMode</code>是THROW_IE,那麼說明該線程并未真正收到<code>signal</code>信号,隻是因為被中斷而被喚醒,所等待的條件并不一定被滿足,于是抛出中斷異常;如果是<code>REINTERRUPT</code>,調用<code>selfInterrupt</code>方法設定線程的中斷狀态

總的來說,<code>await</code>執行分為6步:

1、如果目前線程中斷,那麼抛出中斷異常

2、為目前線程建立<code>Node</code>,并加入該條件變量的條件隊列隊尾

3、擷取并儲存下目前<code>state</code>,并使用儲存的<code>state</code>來調用<code>release</code>方法,如果失敗則抛出<code>IllegalMonitorStateException</code>異常

4、阻塞直到被<code>signal</code>或被中斷

5、利用儲存的<code>state</code>,調用<code>acquireQueued</code>方法重新擷取狀态

6、如果第四步期間被中斷,則抛出中斷異常

<code>await</code>方法有一個重載版本,可以設定逾時參數,如果逾時也會導緻等待停止。其源碼如下:

帶**逾時參數**的`await`和無參數`await`基本差不多,執行一共分6步,不過最後一步會**傳回是否逾時**:

2、為目前線程建立`Node`,并加入該條件變量的條件隊列隊尾

3、擷取并儲存下目前`state`,并使用儲存的`state`來調用`release`方法,如果失敗則抛出`IllegalMonitorStateException`異常

4、阻塞直到被`signal`或被中斷,或**逾時**

5、利用儲存的`state`,調用`acquireQueued`方法重新擷取狀态

6、如果第四步期間被中斷,則抛出中斷異常;如果第四步是**因為逾時而停止阻塞**,則傳回false,否則傳回true

調用<code>awaitUninterruptibly</code>方法在等待過程中,不會因為中斷而停止等待。源碼如下:

`awaitUninterruptibly`執行分4步:

1、為目前線程建立`Node`,并加入該條件變量的條件隊列隊尾

2、擷取并儲存下目前`state`,并使用儲存的`state`來調用`release`方法,如果失敗則抛出`IllegalMonitorStateException`異常

3、阻塞直到被`signal`

4、利用儲存的`state`,調用`acquireQueued`方法重新擷取狀态

<code>awaitNanos</code>最多等待一定時間,可以被中斷。其源碼如下:

`awaitNanos`執行分6步:

4、阻塞直到被`signal`或被中斷或**逾時**

6、如果第四步期間被中斷,則抛出中斷異常;最後**傳回距離逾時還剩多少納秒**

類似于<code>awaitNanos</code>,會傳回是否是因為逾時而終止等待。源碼如下:

`awaitUntil`和`awaitNanos`基本差不多,執行一共分6步,不過最後一步會傳回是否逾時:

4、阻塞直到被`signal`或被中斷或逾時

6、如果第四步期間被中斷,則抛出中斷異常;如果第四步是因為逾時而停止阻塞,則傳回false,否則傳回true

<code>signal</code>用于喚醒某個等待在條件隊列的線程。源碼如下:

首先調用<code>isHeldExclusively</code>方法來判斷目前線程是否持有互斥資源(<code>isHeldExclusively</code>方法隻會在<code>signal</code>或<code>signalAll</code>中才會被調用),如果沒有則抛出<code>IllegalMonitorStateException</code>異常。接下來調用<code>doSignal</code>方法将等待最久(隊首)的線程從條件隊列轉移到同步隊列

這裡說明一下,隻有當線程持有互斥資源時,才支援使用條件變量。關于這點,可以參考全網最詳細的ReentrantReadWriteLock源碼剖析(萬字長文) 其中,寫鎖支援條件變量,而讀鎖不支援。因為寫鎖是互斥資源,而讀鎖是共享資源。原因也可以參見這篇部落格裡的解析,這裡不再贅述

<code>doSignal</code>方法會從目标<code>node</code>(一般都是隊首)開始,将遇到的第一個未被取消的線程從條件隊列移除,并調用<code>transferForSignal</code>方法将其放到同步隊列隊尾去等待擷取資源。源碼如下:

transferForSignal方法不僅會判斷目标node是否被取消,也會将目标node從條件隊列移動到同步隊列。源碼如下:

總之,`signal`方法會将最早的(非取消)線程移動到同步隊列去排隊擷取資源

<code>signalAll</code>方法會将條件隊列中所有的線程都移動到同步隊列,讓它們去擷取資源。其源碼如下:

該方法會調用<code>doSignalAll</code>方法将條件隊列中所有的線程都移除,并放入同步隊列。源碼如下:

總之,`signalAll`方法會将條件隊列中所有的線程都移動到同步隊列去排隊擷取資源

使用上面的非可重入鎖<code>NonReentrantLock</code>的條件變量來實作簡單的生産者-消費者模型(單生産者、單消費者),實作代碼如下:

點選檢視代碼

好了,AQS系列共三期,到此為止,Bye~

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的擷取和釋放

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量

願歸來仍是少年!

    作者:酒冽

    出處:https://www.cnblogs.com/frankiedyz/p/15676704.html

    版權:本文版權歸作者和部落格園共有

    轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接配接;否則必究法律責任