天天看點

Semaphore信号量源碼解析

Semaphore信号量源碼解析

對JUC包中的并發工具——Semaphore,也稱信号量,進行源碼剖析

<code>Semaphore</code>可以稱為信号量,這個原本是作業系統中的概念,是一種線程同步方法,配合PV操作實作線程之間的同步功能。信号量可以表示作業系統中某種資源的個數,是以可以用來控制同時通路該資源的最大線程數,以保證資源的合理使用

Java的JUC對<code>Semaphore</code>做了具體的實作,其功能和作業系統中的信号量基本一緻,也是一種線程同步并發工具

<code>Semaphore</code>常用于某些有限資源的并發使用場景,即限流

場景一

資料庫連接配接池對于同時連接配接的線程數有限制,當連接配接數達到限制後,接下來的線程必須等待前面的線程釋放連接配接才可以獲得資料庫連接配接

場景二

醫院叫号,放出的号數是固定的,不然醫院視窗來不及處理。是以隻有取到号才能去門診,沒取到号的隻能在外面等待放号

<code>Semaphore(int permits)</code>:建立一個許可數為<code>permits</code>的<code>Semaphore</code>

<code>Semaphore(int permits, boolean fair)</code>:建立一個許可數為<code>permits</code>的<code>Semaphore</code>,可以選擇公平模式或非公平模式

<code>void acquire() throws InterruptedException</code>:擷取一個許可,響應中斷,擷取不到則阻塞等待

<code>void acquire(int permits) throws InterruptedException</code>:擷取指定數量的許可,響應中斷,擷取不到則阻塞等待

<code>void acquireUninterruptibly()</code>:擷取一個許可,忽略中斷,擷取不到則阻塞等待

<code>void acquireUninterruptibly(int permits)</code>:擷取指定數量的許可,忽略中斷,擷取不到則阻塞等待

<code>int drainPermits()</code>:擷取目前所有可用的許可,并傳回獲得的許可數

<code>void release()</code>:釋放一個許可

<code>void release(int permits)</code>:釋放指定數量的許可

<code>boolean tryAcquire()</code>:嘗試擷取一個許可,如果擷取失敗不會被阻塞,而是傳回false。成功則傳回true

<code>boolean tryAcquire(int permits)</code>:嘗試擷取指定數量的許可,如果擷取失敗不會被阻塞,而是傳回false。成功則傳回true

<code>boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException</code>:嘗試擷取一個許可,如果擷取失敗則會等待指定時間,如果逾時還未獲得,則傳回false。擷取成功則傳回true。在等待過程中如果被中斷,則會立即抛出中斷異常

<code>boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException</code>:嘗試擷取指定數量的許可,如果擷取失敗則會等待指定時間,如果逾時還未獲得,則傳回false。擷取成功則傳回true。在等待過程中如果被中斷,則會立即抛出中斷異常

<code>boolean isFair()</code>:判斷信号量是否是公平模式

<code>int availablePermits()</code>:傳回目前可用的許可數

<code>boolean hasQueuedThreads()</code>:判斷是否有線程正在等待擷取許可

<code>int getQueueLength()</code>:傳回目前等待擷取許可的線程的估計值。該值并不是一個确定值,因為在執行該函數時,線程數可能已經發生了變化

針對“使用場景”的場景二,假設醫院最多接待2個人,如果接待滿了,那麼所有人都必須在大廳等待(不能“忙等”)

代碼如下:

可能的指令行輸出如下:

點選檢視代碼

無論如何,醫院内最多有5個病人同時接診。這也說明了<code>Semaphore</code>可以控制某種資源最多同時被指定數量的線程使用

作者:酒冽        出處:https://www.cnblogs.com/frankiedyz/p/15674098.html

版權:本文版權歸作者和部落格園共有

轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接配接;否則必究法律責任

<code>Semaphore</code>有兩個構造函數,都是需要設定許可總數,額外的另一個參數是用來控制公平模式or非公平模式的,如果不設定(預設)或設為false,則是非公平模式。如果設定true,則是公平模式

兩種構造函數的源碼如下:

公平模式和非公平模式的差別,就是在于<code>sync</code>域是<code>FairSync</code>類還是<code>NonfairSync</code>類,這兩種子類分别對應這兩種模式。而<code>permits</code>參數代表許可的個數,作為這兩個類的構造函數參數傳入,源碼如下:

這兩個類的構造函數都是進一步調用父類<code>Sync</code>的構造函數:

從這裡就可以明白,許可個數就是<code>state</code>的值。這裡為AQS的<code>state</code>域賦了初值,為<code>permits</code>

重要結論:在<code>Semaphore</code>的語境中,AQS的<code>state</code>就表示許可的個數!對于<code>Semaphore</code>的任何擷取、釋放許可操作,本質上都是<code>state</code>的增減操作

<code>acquire</code>是響應中斷的擷取許可方法,有兩個重載版本。如果擷取成功,則傳回;如果許可不夠而導緻擷取失敗,則會進入AQS的同步隊列阻塞等待。在整個過程中如果被中斷,則會抛出中斷異常

首先來看看可以擷取指定數量許可的<code>acquire</code>方法,其源碼如下:

該方法會先檢查<code>permits</code>是否合法(非負數),再将後續的執行過程委托給<code>Sync</code>類的父類AQS的<code>acquireSharedInterruptibly</code>方法來執行,其源碼如下:

該方法響應中斷,首先檢查中斷狀态,如果已經被中斷則會抛出中斷異常。接下來調用鈎子方法<code>tryAcquireShared</code>。如果傳回值小于0,說明擷取許可失敗,會調用<code>doAcquireSharedInterruptibly</code>進入同步隊列阻塞等待,等待過程中響應中斷

<code>tryAcquireShared</code>由<code>Sync</code>的兩個子類實作,分别對應公平模式、非公平模式下的擷取。這裡由于許可是共享資源,是以使用到的AQS的鈎子方法都是針對共享資源的擷取、釋放的。這也很合理,因為許可是可以被多個線程同時持有的,是以<code>Semaphore</code>中的許可是一種共享資源!

接下來分别看一看公平模式和非公平模式下,<code>tryAcquireShared</code>的實作方式是怎樣的

公平模式

<code>FairSync</code>類實作的<code>tryAcquireShared</code>方法如下:

總體上是一個<code>for</code>循環,這是為了應對CAS失敗的情況。首先判斷是否有線程在等待擷取許可,如果有就選擇謙讓,這裡展現了公平性

接下來判斷擷取之後剩餘的許可數是否合法,如果小于0,說明沒有足夠的許可,擷取失敗,傳回值小于0;如果大于0且CAS修改<code>state</code>成功,說明擷取許可成功,傳回剩餘的許可數

這裡需要說明一下<code>tryAcquireShared</code>的傳回值含義,這個其實在《全網最詳細的ReentrantReadWriteLock源碼剖析(萬字長文)》也有講解過: 負數:擷取失敗,線程會進入同步隊列阻塞等待 0:擷取成功,但是後續以共享模式擷取的線程都不可能擷取成功 正數:擷取成功,且後續以共享模式擷取的線程也可能擷取成功

非公平模式

<code>NonfairSync</code>類實作的<code>tryAcquireShared</code>方法如下:

實際上委托給了父類<code>Sync</code>的<code>nonfairTryAcquireShared</code>方法來執行:

該方法是<code>Sync</code>類中的一個<code>final</code>方法,禁止子類重寫。邏輯上和公平模式的<code>tryAcquireShared</code>基本一緻,隻是沒有調用<code>hasQueuedPredecessors</code>,即使有其他線程在等待擷取許可,也不會謙讓,而是直接CAS競争。這裡展現了非公平性

這個不帶參數的<code>acquire</code>方法,預設擷取一個許可:

和<code>acquire(int)</code>基本上是一樣的,隻是這裡擷取數固定為1。在公平模式和非公平模式下的擷取方式也不相同。這裡不再解釋

<code>acquireUninterruptibly</code>是忽略中斷的擷取許可方法,也有兩個重載版本。如果擷取成功,則傳回;如果許可不夠而導緻擷取失敗,則會進入AQS的同步隊列阻塞等待。在整個過程中如果被中斷,不會抛出中斷異常,隻會記錄中斷狀态

首先來看看可以擷取指定數量許可的<code>acquireUninterruptibly</code>方法:

該方法會先檢查<code>permits</code>是否合法(非負數),再将後續的執行過程委托給<code>Sync</code>類的父類AQS的<code>acquireShared</code>方法來執行,其源碼如下:

這裡同樣會調用子類實作的<code>tryAcquireShared</code>方法,對于公平模式和非公平模式下的擷取方式略有不同,在上面都已經分析過,這裡不重複解釋了

如果<code>tryAcquireShared</code>擷取成功,則直接傳回;如果擷取失敗,則調用<code>doAcquireShared</code>方法,進入同步隊列阻塞等待,在等待過程中忽略中斷,隻記錄中斷狀态&lt;

和響應中斷的<code>acquire</code>方法相比,唯一的差別就在于如果擷取失敗,<code>acquire</code>調用的是<code>doAcquireSharedInterruptibly</code>,響應中斷;而則這裡的<code>acquireUninterruptibly</code>調用的是<code>doAcquireShared</code>,忽略中斷

這個不帶參數的<code>acquireUninterruptibly</code>方法,預設擷取一個許可:

和<code>acquireUninterruptibly(int)</code>基本上是一樣的,隻是這裡擷取數固定為1。在公平模式和非公平模式下的擷取方式也不相同。這裡也不再解釋

這個是<code>Semaphore</code>中比較特殊的一個擷取資源的方式,它提供了<code>drainPermits</code>方法,可以直接擷取目前剩餘的所有許可(資源),并傳回獲得的個數。其源碼如下:

該方法實際上委托給了<code>Sync</code>類的<code>drainPermits</code>方法來執行:

該方法是一個<code>final</code>方法,禁止子類重寫。整體上是一個<code>for</code>循環,為了應對CAS失敗的情況

首先通過AQS的<code>getState</code>方法擷取目前可用的許可數,再一次性全部擷取,并傳回獲得的許可數,很簡單~

<code>Semaphore</code>提供了<code>release</code>作為釋放許可的方法,和擷取許可一樣,<code>release</code>也有兩個重載版本。但是,釋放許可和擷取有兩點不同:

釋放許可的方法都是忽略異常的,沒有響應異常的版本

對于公平模式和非公平模式來說,釋放許可的方式都是一樣的,是以在Sync類這一層就提供了統一的實作

首先來看看釋放指定數量許可的<code>release</code>方法:

該方法會先檢查<code>permits</code>是否合法(非負數),再将後續的執行過程委托給<code>Sync</code>類的父類AQS的<code>releaseShared</code>方法來執行,其源碼如下:

<code>releaseShared</code>會先調用子類重寫的<code>tryReleaseShared</code>方法,在公平模式和非公平模式下的釋放許可邏輯是一緻的,是以在<code>Sync</code>類就對其進行了統一的實作,而沒有下放到子類中實作。<code>Sync</code>類的<code>tryReleaseShared</code>方法如下:

這也是一個<code>final</code>方法,禁止子類重寫。整體上是一個<code>for</code>循環,為了應對CAS失敗的情況

循環體内對AQS的<code>state</code>進行修改,不過需要避免釋放許可後導緻<code>state</code>溢出,否則會抛出錯誤

使用<code>Semaphore</code>的一點注意事項

從<code>relsease(int)</code>的邏輯中,并沒有發現對釋放的數量有所檢查,即一個線程可以釋放任意數量的許可,而不管它真正擁有多少許可

比如:一個線程可以釋放100個許可,即使它沒有獲得任何許可,這樣必然會導緻程式錯誤

是以,使用<code>Semaphore</code>必須遵守“釋放許可的數量一定不能超過目前持有許可的數量”這一規定,即使<code>Semaphore</code>不會對其進行檢查!

源碼中的注釋也指出了這一點:

There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire. Correct usage of a semaphore is established by programming convention in the application.

這個版本的<code>release</code>預設釋放一個許可:

其他和<code>release(int)</code>一緻,這裡不再解釋,不過使用<code>release()</code>也必須注意遵守上面的規定,<code>Semaphore</code>也不會主動進行檢查

<code>Semaphore</code>雖然提供了阻塞版本的擷取方式,也提供了四個版本的嘗試擷取方式,包括兩種:一種是非計時等待版本,一種是計時等待版本

該方法會先檢查<code>permits</code>是否合法(非負數),再将後續的執行過程委托給<code>Sync</code>類的<code>nonfairTryAcquireShared</code>方法來執行。此方法就是非公平版本的嘗試擷取許可方式,即使目前<code>Semaphore</code>使用的是公平政策。如果傳回值不小于0,說明擷取成功,傳回true;否則擷取失敗,傳回false。不管成功與否,都會立即傳回,不會阻塞等待

這個就是預設擷取一個許可的<code>tryAcquire</code>版本,跟上面基本一樣,不解釋

該方法會先檢查<code>permits</code>是否合法(非負數),再将後續的執行過程委托給<code>Sync</code>類的父類AQS的<code>tryAcquireSharedNanos</code>方法來執行。該方法會嘗試擷取許可,如果擷取成功,則立即傳回;如果擷取不到,則阻塞一段時間。如果等待過程中被中斷,則會抛出中斷異常

這個就是預設擷取一個許可的計時等待<code>tryAcquire</code>版本,跟上面基本一樣,不解釋

<code>Semaphore</code>還提供了一些方法以擷取信号量的狀态,比如:

目前信号量使用的公平政策

目前可擷取的許可數量

是否有線程正在等待擷取許可

因為擷取許可而被阻塞的線程數

下面一一來看

如果<code>Semaphore</code>的<code>sync</code>域是<code>FariSync</code>類對象,則說明使用的是公平政策,傳回true,否則傳回false

本質上調用的就是AQS的<code>getState</code>方法,傳回<code>state</code>的值,而<code>state</code>就代表了目前可擷取的許可數量

本質上調用的是AQS的<code>hasQueuedThreads</code>方法,判斷同步隊列的<code>head</code>和<code>tail</code>是否相同,如果相同,則說明隊列為空,沒有線程正在等待;否則說明有線程正在等待

這個方法實際調用的是<code>AQS.getQueueLength</code>方法。此方法會對同步隊列進行一個反向全周遊,傳回目前隊列長度的估計值。該值并不是一個确定值,因為在執行該函數時,其中的線程數可能已經發生了變化

願歸來仍是少年!

    作者:酒冽

    出處:https://www.cnblogs.com/frankiedyz/p/15726593.html

    版權:本文版權歸作者和部落格園共有

    轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接配接;否則必究法律責任