天天看點

并發程式設計實踐二:AbstractQueuedSynchronizerUnsafeAQS的運用同步狀态AQS的主體流程等待隊列阻塞機制 結束語

abstractqueuedsynchronizer,簡稱aqs,是java.util.concurrent包的synchronizer的基礎架構,其它的synchronizer(包括lock、semaphore、countdownlatch、futuretask等)都是以它作為基礎建構的,這篇文章我将對aqs的架構結構作出介紹,包括它對同步狀态的管理,功能流程,等待隊列的管理等,并涉及到一些實作的細節,但這裡不涉及原理性的東西,原理将放到後面具體的實作類中去講述。

這片文章采用從整體到局部的方式來講述,從總體架構一步一步細化,但并不涉及所有代碼,了解了這些後,你就可以自己閱讀源碼學習了。doug lea在論文《the java.util.concurrent synchronizer framework》中講述了aqs的設計理念,有興趣的可以看看,在這裡()你可以看到。

由于文章過長,這篇文章沒有涉及到conditionobject,我将在下一篇文章中講述。

好了,接下來我們就開始了,從unsafe開始。

首先對用到的unsafe類的方法做一下說明:

aqs根據synchronizer的共同特征提供了一套基礎架構,這些共同特征包括:一個或者多個acquire操作用于阻塞線程直到存在空閑鎖允許線程通過,一個或者多個release操作釋放一個或者多個鎖使一個或者多個線程喚醒。我們可以使用僞碼來表示這些操作,acqure操作可以表示為:

release的操作可以表示為:

下面通過一個實際的例子來看aqs是怎麼被應用的,這個是java.util.concurrent.semaphore的一個簡化版(隻包含非公平設定的semaphore的代碼),semaphore的作用是為某些資源的通路提供最大線程數目限制,你可以為它設定一個最大許可值permits,則permits個線程可以同時通過acquire調用,超出的将被阻塞,直到有線程釋放鎖(即調用release操作)。

semaphore的所有操作都是通過一個内部類來完成,這個内部類是aqs的子類,:

 1)為aqs設定了同步狀态(即鎖的數量);

 2)繼承了aqs的tryacquireshared方法,該方法用于為acquire操作的線程擷取鎖,并傳回剩下的鎖的數量,如果剩下的鎖為負數,則表示擷取鎖失敗,線程将被阻塞;

 3)繼承了aqs的tryreleaseshared方法,該方法用于為release操作的線程釋放鎖,并傳回釋放是否成功,true則表示成功,成功後會執行喚醒阻塞線程的操作。

下面看具體的代碼:

mysemaphore展示了aqs的運用:同步狀态的設定和管理、tryacquireshared和tryreleaseshared的實作,而mysemaphore的acquire和release操作都是通過使用aqs來實作的。當多個線程同時操作acquire和release時,aqs是怎麼保證操作的正确性的呢?通過對aqs的内部機制的學習,你就會知道了,下面我們從同步狀态的管理開始。

aqs中同步狀态的申明和提供的操作方法如下:

同步狀态是一個32位的整數,表示鎖的數量,之是以使用一個32位整數,主要是考慮到一般情況下鎖的數量不會需要那麼多(見《the java.util.concurrent synchronizer framework》),子類可以通過aqs提供的方法擷取和改變同步狀态的值,而在tryacquireshared和tryreleaseshared中将使用同步狀态來判斷線程是否該阻塞和喚醒。

aqs提供了兩種模式,獨占模式和共享模式,對應的方法如下:

我們可以把獨占模式看作共享模式的一個特例,即獨占模式是将鎖數量設定為1的共享模式。另外,aqs還提供了一組提供時間限制的方法:

這組就是在比上面的方法增加了一個時間的限制,當時間較短的情況下(小于等于1微秒)使用輪詢的方式,否則采用阻塞方式。

我對流程的講解将以共享模式為主,其它的有興趣可以檢視源代碼。從宏觀上來看,線程通過acquiresharedinterruptibly擷取鎖,操作完成後,并通過releaseshared來釋放鎖,我們先來看擷取鎖的操作:

線程操作完成後,釋放鎖的操作:

線程擷取鎖失敗後(通過調用acquiresharedinterruptibly),将進入等待隊列,然後進入阻塞狀态(經過一些判斷之後);當另一個線程釋放鎖後(通過調用releaseshared),将執行喚醒阻塞線程的操作;被喚醒的線程将把自己移出等待隊列,并執行一些其它操作。可以說,操作都是圍繞等待隊列來做的,下面我們就來看看等待隊列。

aqs中使用了一個fifo隊列來管理等待線程,而底層鎖的實作使用clh鎖的一個變種,clh鎖通常被用于自旋鎖,我們可以通過一個例子看了解一下clh鎖,可以幫助我們了解aqs的隊列算法:

clhlock的思想就是使用一個鎖隊列,後面的線程反複檢視前面線程的狀态,如果狀态為鎖定,則自旋等待,否則通過。

采用隊列的方式可以帶來性能上的好處,在現代的處理器架構中,每個處理器自身都有一個cache,用于存儲處理器感興趣的資料,處理器從cache中擷取資料的效率要遠遠高于從記憶體中擷取資料,處理器之間通過總線進行通信,而總線是獨占裝置,也就是說,每次隻能有一個處理器使用總線,基于這樣的架構,為了提高性能,我們應該盡量使用本地cache,盡量避免通過總線存取資料,或者盡量少的通過總線存取資料。采用隊列就可以帶來這樣的好處,每個線程修改自己的locked隻會影響後續的線程,而其它線程不會受到影響,這樣就隻會有一個線程會因為資料的改變而去記憶體中取資料,減少了資料競争,進而提高性能。

aqs的隊列算法也是基于這樣的理念,但實作要比clhlock複雜的多,為了盡量減少資料競争,每個節點的狀态隻和它後續的節點相關,等待隊列中的線程依次喚醒,減少擷取鎖的競争。下面我們就開始詳細講述。

aqs使用了一個非阻塞隊列來儲存資料(如果想更多的了解無阻塞隊列,可以參考我的“并發程式設計實踐一”),我們先來看隊列節點的定義:

nextwaiter表示了兩種不同的模式:

 1)共享模式(shared,允許多個線程通過);

 2)排它模式(exclusive,隻允許一個線程通過)。

我們的講解隻涉及到共享模式。

waitstatus表示了節點的狀态:

 1)cancelled:節點對應的線程已經被取消;

 2)signal:節點的下一個節點的線程等待被喚醒;

 3)condition:節點對應的線程等待在condition隊列中,在後面講condition的時候會涉及到;

 4)propagate:該節點不需要處理,直接越過。

下面我們看看算法的主要流程:

 1)入隊列:線程擷取鎖失敗後,建立一個節點,并将節點添加到等待隊列尾,然後将線程阻塞,等待喚醒;

 2)喚醒:另一個線程釋放鎖,取隊列的第一個節點,将節點對應線程喚醒;

 3)出隊列:喚醒後的線程将嘗試擷取鎖,成功後将自己移出隊列,同時判斷是否任然存在空閑的鎖,如果存在則繼續喚醒下一個節點。

每次隻會喚醒第一個節點,如果同時釋放多個鎖,後續的節點将由前面被喚醒的節點來喚醒,盡量減少資料競争。

下面我們來看具體的代碼。從總體流程中,線程通過acquiresharedinterruptibly請求鎖,當嘗試擷取鎖(tryacquireshared)失敗後,将進入doacquiresharedinterruptibly處理:

入隊列的操作是由addwaiter來完成的,它首先嘗試入隊列一次,失敗後再在enq中循環嘗試,直到成功:

節點進入等待隊列後,如果節點的前續節點不是head,線程将在parkandcheckinterrupt中進入阻塞狀态。

線程阻塞直到另一個線程調用releaseshared釋放鎖,然後在doreleaseshared中将把等待隊列中的第一個節點喚醒:

線程被喚醒後,将重新嘗試擷取鎖,成功後,将開始将自己移出隊列,移出隊列的代碼在setheadandpropagate中:

這裡執行喚醒操作和release時執行的操作一緻,由于多個線程同時調用release操作的情況下,雖然釋放了多個鎖,但可能隻會執行一次doreleaseshared的操作,這裡就做了彌補,在隊列的第一個線程被喚醒,擷取鎖後,将再次調用doreleaseshared喚醒下一個線程,一直往下,直到鎖全部用完或者隊列為空。這樣就能做到線程一個一個喚醒,依次擷取鎖,盡量減少了資料競争(如果有新增線程請求鎖,任然會存在資料競争,但這裡減少了已阻塞線程的資料競争)。

aqs的阻塞操作使用locksupport類,最終使用unsafe的park和unpark操作實作,如下:

aqs向我們充分展現了并發程式設計的複雜性,在多個線程的互動下,情況将變得非常複雜,你往往需要将整個流程作為一個整體來分析,是以建議和源代碼結合來看這篇文章,充分考慮每一個互動的環節可能出現的問題。

這是我自己對代碼的分析結果,給你作為學習參考,可能存在錯誤,發現問題麻煩給我留言,我将非常感謝。

謝謝。