共享模式acquire實作流程
上文我們講解了AbstractQueuedSynchronizer獨占模式的acquire實作流程,本文趁熱打鐵繼續看一下AbstractQueuedSynchronizer共享模式acquire的實作流程。連續兩篇文章的學習,也可以對比獨占模式acquire和共享模式acquire的差別,加深對于AbstractQueuedSynchronizer的了解。
先看一下共享模式acquire的實作,方法為acquireShared和acquireSharedInterruptibly,兩者差别不大,差別就在于後者有中斷處理,以acquireShared為例:
1 2 3 4 | |
這裡就能看出第一個差别來了:獨占模式acquire的時候子類重寫的方法tryAcquire傳回的是boolean,即是否tryAcquire成功;共享模式acquire的時候,傳回的是一個int型變量,判斷是否<0。doAcquireShared方法的實作為:
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | |
我們來分析一下這段代碼做了什麼:
- addWaiter,把所有tryAcquireShared<0的線程執行個體化出一個Node,建構為一個FIFO隊列,這和獨占鎖是一樣的
- 拿目前節點的前驅節點,隻有前驅節點是head的節點才能tryAcquireShared,這和獨占鎖也是一樣的
- 前驅節點不是head的,執行”shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()”,for(;;)循環,”shouldParkAfterFailedAcquire()”方法執行2次,目前線程阻塞,這和獨占鎖也是一樣的
确實,共享模式下的acquire和獨占模式下的acquire大部分邏輯差不多,最大的差别在于tryAcquireShared成功之後,獨占模式的acquire是直接将目前節點設定為head節點即可,共享模式會執行setHeadAndPropagate方法,顧名思義,即在設定head之後多執行了一步propagate操作。setHeadAndPropagate方法源碼為:
|
第3行的代碼設定重設head,第2行的代碼由于第3行的代碼要重設head,是以先定義一個Node型變量h獲得原head的位址,這兩行代碼很簡單。
第19行~第23行的代碼是獨占鎖和共享鎖最不一樣的一個地方,我們再看獨占鎖acquireQueued的代碼:
|
這意味着獨占鎖某個節點被喚醒之後,它隻需要将這個節點設定成head就完事了,而共享鎖不一樣,某個節點被設定為head之後,如果它的後繼節點是SHARED狀态的,那麼将繼續通過doReleaseShared方法嘗試往後喚醒節點,實作了共享狀态的向後傳播。
共享模式release實作流程
上面講了共享模式下acquire是如何實作的,下面再看一下release的實作流程,方法為releaseShared:
|
tryReleaseShared方法是子類實作的,如果tryReleaseShared成功,那麼執行doReleaseShared()方法:
28 29 | |
主要是兩層邏輯:
- 頭結點本身的waitStatus是SIGNAL且能通過CAS算法将頭結點的waitStatus從SIGNAL設定為0,喚醒頭結點的後繼節點
- 頭結點本身的waitStatus是0的話,嘗試将其設定為PROPAGATE狀态的,意味着共享狀态可以向後傳播
Condition的await()方法實作原理—-建構等待隊列
我們知道,Condition是用于實作通知/等待機制的,和Object的wait()/notify()一樣,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很長,加之Condition也是AbstractQueuedSynchronizer的一部分,是以将Condition也放在這裡寫了。
Condition分為await()和signal()兩部分,前者用于等待、後者用于喚醒,首先看一下await()是如何實作的。Condition本身是一個接口,其在AbstractQueuedSynchronizer中的實作為ConditionObject:
|
這裡貼了一些字段定義,後面都是方法就不貼了,會對重點方法進行分析的。從字段定義我們可以看到,ConditionObject全局性地記錄了第一個等待的節點與最後一個等待的節點。
像ReentrantLock每次要使用ConditionObject,直接new一個ConditionObject出來即可。我們關注一下await()方法的實作:
|
第2行~第3行的代碼用于進行中斷,第4行代碼比較關鍵,添加Condition的等待者,看一下實作:
|
首先拿到隊列(注意資料結構,Condition建構出來的也是一個隊列)中最後一個等待者,緊接着第4行的的判斷,判斷最後一個等待者的waitStatus不是CONDITION的話,執行第5行的代碼,解綁取消的等待者,因為通過第8行的代碼,我們看到,new出來的Node的狀态都是CONDITION的。
那麼unlinkCancelledWaiters做了什麼?裡面的流程就不看了,就是一些指針周遊并判斷狀态的操作,總結一下就是:從頭到尾周遊每一個Node,遇到Node的waitStatus不是CONDITION的就從隊列中踢掉,該節點的前後節點相連。
接着第8行的代碼前面說過了,new出來了一個Node,存儲了目前線程,waitStatus是CONDITION,接着第9行~第13行的操作很好了解:
- 如果lastWaiter是null,說明FIFO隊列中沒有任何Node,firstWaiter=Node
- 如果lastWaiter不是null,說明FIFO隊列中有Node,原lastWaiter的next指向Node
- 無論如何,新加入的Node程式設計lastWaiter,即新加入的Node一定是在最後面
用一張圖表示一下建構的資料結構就是:
對比學習,我們總結一下Condition建構出來的隊列和AbstractQueuedSynchronizer建構出來的隊列的差别,主要展現在2點上:
- AbstractQueuedSynchronizer建構出來的隊列,頭節點是一個沒有Thread的空節點,其辨別作用,而Condition建構出來的隊列,頭節點就是真正等待的節點
- AbstractQueuedSynchronizer建構出來的隊列,節點之間有next與pred互相辨別該節點的前一個節點與後一個節點的位址,而Condition建構出來的隊列,隻使用了nextWaiter辨別下一個等待節點的位址
整個過程中,我們看到沒有使用任何CAS操作,firstWaiter和lastWaiter也沒有用volatile修飾,其實原因很簡單:要await()必然要先lock(),既然lock()了就表示沒有競争,沒有競争自然也沒必要使用volatile+CAS的機制去保證什麼。
Condition的await()方法實作原理—-線程等待
前面我們看了Condition建構等待隊列的過程,接下來我們看一下等待的過程,await()方法的代碼比較短,再貼一下:
|
建構完畢隊列之後,執行第5行的fullyRelease方法,顧名思義:fullyRelease方法的作用是完全釋放Node的狀态。方法實作為:
|
這裡第4行擷取state,第5行release的時候将整個state傳過去,理由是某線程可能多次調用了lock()方法,比如調用了10次lock,那麼此線程就将state加到了10,是以這裡要将10傳過去,将狀态全部釋放,這樣後面的線程才能重新從state=0開始競争鎖,這也是方法被命名為fullyRelease的原因,因為要完全釋放鎖,釋放鎖之後,如果有競争鎖的線程,那麼就喚醒第一個,這都是release方法的邏輯了,前面的文章詳細講解過。
接着看await()方法的第7行判斷”while(!isOnSyncQueue(node))”:
|
注意這裡的判斷是Node是否在AbstractQueuedSynchronizer建構的隊列中而不是Node是否在Condition建構的隊列中,如果Node不在AbstractQueuedSynchronizer建構的隊列中,那麼調用LockSupport的park方法阻塞。
至此調用await()方法的線程建構Condition等待隊列–釋放鎖–等待的過程已經全部分析完畢。
Condition的signal()實作原理
上面的代碼分析了建構Condition等待隊列–釋放鎖–等待的過程,接着看一下signal()方法通知是如何實作的:
|
首先從第2行的代碼我們看到,要能signal(),目前線程必須持有獨占鎖,否則抛出異常IllegalMonitorStateException。
那麼真正操作的時候,擷取第一個waiter,如果有waiter,調用doSignal方法:
|
第3行~第5行的代碼很好了解:
- 重新設定firstWaiter,指向第一個waiter的nextWaiter
- 如果第一個waiter的nextWaiter為null,說明目前隊列中隻有一個waiter,lastWaiter置空
- 因為firstWaiter是要被signal的,是以它沒什麼用了,nextWaiter置空
接着執行第6行和第7行的代碼,這裡重點就是第6行的transferForSignal方法:
|
方法本意是将一個節點從Condition隊列轉換為AbstractQueuedSynchronizer隊列,總結一下方法的實作:
- 嘗試将Node的waitStatus從CONDITION置為0,這一步失敗直接傳回false
- 目前節點進入調用enq方法進入AbstractQueuedSynchronizer隊列
- 目前節點通過CAS機制将waitStatus置為SIGNAL
最後上面的步驟全部成功,傳回true,傳回true喚醒等待節點成功。從喚醒的代碼我們可以得出一個重要結論:某個await()的節點被喚醒之後并不意味着它後面的代碼會立即執行,它會被加入到AbstractQueuedSynchronizer隊列的尾部,隻有前面等待的節點擷取鎖全部完畢才能輪到它。
代碼分析到這裡,我想類似的signalAll方法也沒有必要再分析了,顯然signalAll方法的作用就是将所有Condition隊列中等待的節點逐一隊列中從移除,由CONDITION狀态變為SIGNAL狀态并加入AbstractQueuedSynchronizer隊列的尾部。
代碼示例
可能大家看了我分析半天代碼會有點迷糊,這裡最後我貼一段我用于驗證上面Condition結論的示例代碼,首先建立一個Thread,我将之命名為ConditionThread:
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | |
這個類裡面的方法就不解釋了,反正就三個方法片段,根據線程名判斷,每個線層執行的是其中的一個代碼片段。寫一段測試代碼:
|
測試代碼的意思是:
- 線程1先啟動,擷取鎖,調用await()方法等待
- 線程0後啟動,擷取鎖,休眠5秒準備signal()
- 線程2最後啟動,擷取鎖,由于線程0未使用完畢鎖,是以線程2排隊,可以此時由于線程0還未signal(),是以線程1線上程0執行signal()後,在AbstractQueuedSynchronizer隊列中的順序是線上程2後面的
代碼執行結果為:
|
符合我們的結論:signal()并不意味着被喚醒的線程立即執行。由于線程2先于線程0排隊,是以看到第5行列印的内容,線程2先擷取鎖。
熬夜不易,點選請老王喝杯烈酒!!!!!!!