JUC 中 Semaphore 的使用與原理分析,Semaphore 也是 Java 中的一個同步器,與 CountDownLatch 和 CycleBarrier 不同在于它内部的計數器是遞增的,那麼,Semaphore 的内部實作是怎樣的呢?
Semaphore 信号量也是Java 中一個同步容器,與CountDownLatch 和 CyclicBarrier 不同之處在于它内部的計數器是遞增的。為了能夠一覽Semaphore的内部結構,我們首先要看一下Semaphore的類圖,類圖,如下所示:

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new
NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
如上面代碼所示,Semaphore預設使用的是非公平政策,如果你需要公平政策,則可以使用帶兩個參數的構造函數來構造Semaphore對象,另外和CountDownLatch一樣,構造函數裡面傳遞的初始化信号量個數 permits 被指派給了AQS 的state狀态變量,也就是說這裡AQS的state值表示目前持有的信号量個數。
接下來我們主要看看Semaphore實作的主要方法的源碼,如下:
1.void acquire() 目前線程調用該方法的時候,目的是希望擷取一個信号量資源,如果目前信号量計數個數大于 0 ,并且目前線程擷取到了一個信号量則該方法直接傳回,目前信号量的計數會減少 1 。否則會被放入AQS的阻塞隊列,目前線程被挂起,直到其他線程調用了release方法釋放了信号量,并且目前線程通過競争擷取到了改信号量。目前線程被其他線程調用了 interrupte()方法中斷後,目前線程會抛出 InterruptedException異常傳回。源碼如下:
public void acquire() throws InterruptedException {
//傳遞參數為1,說明要擷取1個信号量資源
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)如果線程被中斷,則抛出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
//(2)否者調用sync子類方法嘗試擷取,這裡根據構造函數确定使用公平政策
if (tryAcquireShared(arg) < 0)
//如果擷取失敗則放入阻塞隊列,然後再次嘗試如果失敗則調用park方法挂起目前線程
doAcquireSharedInterruptibly(arg);
}
如上代碼可知,acquire()内部調用了sync的acquireSharedInterruptibly 方法,後者是對中斷響應的(如果目前線程被中斷,則抛出中斷異常),嘗試擷取信号量資源的AQS的方法tryAcquireShared 是由 sync 的子類實作,是以這裡就要分公平性了,這裡先讨論非公平政策 NonfairSync 類的 tryAcquireShared 方法,源碼如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//擷取目前信号量值
int available = getState();
//計算目前剩餘值
int remaining = available - acquires;
//如果目前剩餘小于0或者CAS設定成功則傳回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
如上代碼,先計算目前信号量值(available)減去需要擷取的值(acquires) 得到剩餘的信号量個數(remaining),如果剩餘值小于 0 說明目前信号量個數滿足不了需求,則直接傳回負數,然後目前線程會被放入AQS的阻塞隊列,目前線程被挂起。如果剩餘值大于 0 則使用CAS操作設定目前信号量值為剩餘值,然後傳回剩餘值。另外可以知道NonFairSync是非公平性擷取的,是說先調用aquire方法擷取信号量的線程不一定比後來者先擷取鎖。
接下來我們要看看公平性的FairSync 類是如何保證公平性的,源碼如下:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
可以知道公平性還是靠 hasQueuedPredecessors 這個方法來做的,以前的随筆已經講過公平性是看目前線程節點是否有前驅節點也在等待擷取該資源,如果是則自己放棄擷取的權力,然後目前線程會被放入AQS阻塞隊列,否則就去擷取。hasQueuedPredecessors源碼如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
如上面代碼所示,如果目前線程節點有前驅節點則傳回true,否則如果目前AQS隊列為空 或者 目前線程節點是AQS的第一個節點則傳回 false ,其中,如果 h == t 則說明目前隊列為空則直接傳回 false,如果 h !=t 并且 s == null 說明有一個元素将要作為AQS的第一個節點入隊列(回顧下 enq 函數第一個元素入隊列是兩步操作,首先建立一個哨兵頭節點,然後第一個元素插入到哨兵節點後面),那麼傳回 true,如果 h !=t 并且 s != null 并且 s.thread != Thread.currentThread() 則說明隊列裡面的第一個元素不是目前線程則傳回 true。
2.void acquire(int permits) 該方法與 acquire() 不同在與後者隻需要擷取一個信号量值,而前者則擷取指定 permits 個,源碼如下:
public void acquire(int permits) throws InterruptedException {
if (permits < 0)
throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
3.void acquireUninterruptibly() 該方法與 acquire() 類似,不同之處在于該方法對中斷不響應,也就是當目前線程調用了 acquireUninterruptibly 擷取資源過程中(包含被阻塞後)其它線程調用了目前線程的 interrupt()方法設定了目前線程的中斷标志目前線程并不會抛出 InterruptedException 異常而傳回。源碼如下:
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
4.void acquireUninterruptibly(int permits) 該方法與 acquire(int permits) 不同在于該方法對中斷不響應。源碼如如下:
public void acquireUninterruptibly(int permits) {
if (permits < 0)
throw new IllegalArgumentException();
sync.acquireShared(permits);
}
5.void release() 該方法作用是把目前 semaphore對象的信号量值增加 1 ,如果目前有線程因為調用 acquire 方法被阻塞放入了 AQS的阻塞隊列,則會根據公平政策選擇一個線程進行激活,激活的線程會嘗試擷取剛增加的信号量,源碼如下:
public void release() {
//(1)arg=1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//(2)嘗試釋放資源
if (tryReleaseShared(arg)) {
//(3)資源釋放成功則調用park喚醒AQS隊列裡面最先挂起的線程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//(4)擷取目前信号量值
int current = getState();
//(5)目前信号量值增加releases,這裡為增加1
int next = current + releases;
if (next < current) // 移除處理
throw new Error("Maximum permit count exceeded");
//(6)使用cas保證更新信号量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
如上面代碼可以看到 release()方法中對 sync.releaseShared(1),可以知道release方法每次隻會對信号量值增加 1 ,tryReleaseShared方法是無限循環,使用CAS保證了 release 方法對信号量遞增 1 的原子性操作。當tryReleaseShared 方法增加信号量成功後會執行代碼(3),調用AQS的方法來激活因為調用acquire方法而被阻塞的線程。
6.void release(int permits) 該方法與不帶參數的不同之處在于前者每次調用會在信号量值原來基礎上增加 permits,而後者每次增加 1。源碼如下:
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
另外注意到這裡調用的是 sync.releaseShared 是共享方法,這說明該信号量是線程共享的,信号量沒有和固定線程綁定,多個線程可以同時使用CAS去更新信号量的值而不會阻塞。
到目前已經知道了其原理,接下來用一個例子來加深對Semaphore的了解,例子如下:
package com.hjc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* Created by cong on 2018/7/8.
*/
public class SemaphoreTest {
// 建立一個Semaphore執行個體
private static volatile Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 加入線程A到線程池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + " over");
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 加入線程B到線程池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + " over");
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 等待子線程執行完畢,傳回
semaphore.acquire(2);
System.out.println("all child thread over!");
//關閉線程池
executorService.shutdown();
}
}
運作結果如下:
類似于 CountDownLatch,上面我們的例子也是在主線程中開啟兩個子線程進行執行,等所有子線程執行完畢後主線程在繼續向下運作。
如上代碼首先首先建立了一個信号量執行個體,構造函數的入參為 0,說明目前信号量計數器為 0,然後 main 函數添加兩個線程任務到線程池,每個線程内部調用了信号量的 release 方法,相當于計數值遞增一,最後在 main 線程裡面調用信号量的 acquire 方法,參數傳遞為 2 說明調用 acquire 方法的線程會一直阻塞,直到信号量的計數變為 2 時才會傳回。
看到這裡也就明白了,如果構造 Semaphore 時候傳遞的參數為 N,在 M 個線程中調用了該信号量的 release 方法,那麼在調用 acquire 對 M 個線程進行同步時候傳遞的參數應該是 M+N;
對CountDownLatch,CyclicBarrier,Semaphored這三者之間的比較總結:
1.CountDownLatch 通過計數器提供了更靈活的控制,隻要檢測到計數器為 0,而不管目前線程是否結束調用 await 的線程就可以往下執行,相比使用 jion 必須等待線程執行完畢後主線程才會繼續向下運作更靈活。
2.CyclicBarrier 也可以達到 CountDownLatch 的效果,但是後者當計數器變為 0 後,就不能在被複用,而前者則使用 reset 方法可以重置後複用,前者對同一個算法但是輸入參數不同的類似場景下比較适用。
3.而 semaphore 采用了信号量遞增的政策,一開始并不需要關心需要同步的線程個數,等調用 aquire 時候在指定需要同步個數,并且提供了擷取信号量的公平性政策。