1、簡介
Semaphore,信号量,它儲存了一系列的許可(permits),每次調用acquire()都将消耗一個許可,每次調用release()都将歸還一個許可。
Semaphore通常用于限制同一時間對共享資源的通路次數上,也就是常說的限流。
Semaphore信号量,擷取通行證流程圖:

2、入門案例
案例1:Pool.java
/**
* date: 2021/5/10
* @author csp
*/
public class Pool {
/**
* 可同時通路資源的最大線程數
*/
private static final int MAX_AVAILABLE = 100;
/**
* 信号量 表示:可擷取的對象通行證
*/
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/**
* 共享資源,可以想象成 items 數組記憶體儲的都是Connection對象 模拟是連接配接池
*/
protected Object[] items = new Object[MAX_AVAILABLE];
/**
* 共享資源占用情況,與items數組一一對應,比如:
* items[0]對象被外部線程占用,那麼 used[0] == true,否則used[0] == false
*/
protected boolean[] used = new boolean[MAX_AVAILABLE];
/**
* 擷取一個空閑對象
* 如果目前池中無空閑對象,則等待..直到有空閑對象為止
*/
public Object getItem() throws InterruptedException {
// 每次調用acquire()都将消耗一個許可(permits)
available.acquire();
return getNextAvailableItem();
}
/**
* 歸還對象到池中
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
/**
* 擷取池内一個空閑對象,擷取成功則傳回Object,失敗傳回Null
* 成功後将對應的 used[i] = true
*/
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
/**
* 歸還對象到池中,歸還成功傳回true
* 歸還失敗:
* 1.池中不存在該對象引用,傳回false
* 2.池中存在該對象引用,但該對象目前狀态為空閑狀态,也傳回false
*/
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
案例2:SemaphoreTest02.java
/**
* date: 2020/5/10
* @author csp
*/
public class SemaphoreTest02 {
public static void main(String[] args) throws InterruptedException {
// 聲明信号量,初始的許可(permits)為2
// 公平模式:fair為true
final Semaphore semaphore = new Semaphore(2, true);
Thread tA = new Thread(() ->{
try {
// 每次調用acquire()都将消耗一個許可(permits)
semaphore.acquire();
System.out.println("線程A擷取通行證成功");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}finally {
// 每次調用release()都将歸還一個許可(permits)
semaphore.release();
}
});
tA.start();
// 確定線程A已經執行
TimeUnit.MILLISECONDS.sleep(200);
Thread tB = new Thread(() ->{
try {
// 調用acquire(2)都将消耗2個許可(permits)
semaphore.acquire(2);
System.out.println("線程B擷取通行證成功");
} catch (InterruptedException e) {
}finally {
// 調用release(2)都将歸還2個許可(permits)
semaphore.release(2);
}
});
tB.start();
// 確定線程B已經執行
TimeUnit.MILLISECONDS.sleep(200);
Thread tC = new Thread(() ->{
try {
// 每次調用acquire()都将消耗一個許可(permits)
semaphore.acquire();
System.out.println("線程C擷取通行證成功");
} catch (InterruptedException e) {
}finally {
// 每次調用release()都将歸還一個許可(permits)
semaphore.release();
}
});
tC.start();
}
}
執行結果:
線程A擷取通行證成功
線程B擷取通行證成功
線程C擷取通行證成功
3、源碼分析
内部類Sync
通過Sync的幾個實作方法,我們擷取到以下幾點資訊:
許可是在構造方法時傳入的;
許可存放在狀态變量state中;
嘗試擷取一個許可的時候,則state的值減1;
當state的值為0的時候,則無法再擷取許可;
釋放一個許可的時候,則state的值加1;
許可的個數可以動态改變;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 構造方法,傳入許可次數,放入state中
Sync(int permits) {
setState(permits);
}
// 擷取許可次數
final int getPermits() {
return getState();
}
// 非公平模式嘗試擷取許可
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 先看看還有幾個許可
int available = getState();
// 減去這次需要擷取的許可還剩下幾個許可
int remaining = available - acquires;
// 如果剩餘許可小于0了則直接傳回
// 如果剩餘許可不小于0,則嘗試原子更新state的值,成功了傳回剩餘許可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 釋放許可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 先看看還有幾個許可
int current = getState();
// 加上這次釋放的許可
int next = current + releases;
// 檢測溢出
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 如果原子更新state的值成功,就說明釋放許可成功,則傳回true
if (compareAndSetState(current, next))
return true;
}
}
// 減少許可
final void reducePermits(int reductions) {
for (;;) {
// 先看看還有幾個許可
int current = getState();
// 減去将要減少的許可
int next = current - reductions;
// 檢測溢出
if (next > current) // underflow
throw new Error("Permit count underflow");
// 原子更新state的值,成功了傳回true
if (compareAndSetState(current, next))
return;
}
}
// 銷毀許可
final int drainPermits() {
for (;;) {
// 先看看還有幾個許可
int current = getState();
// 如果為0,直接傳回
// 如果不為0,把state原子更新為0
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
内部類NonfairSync
非公平模式下,直接調用父類的
nonfairTryAcquireShared()
嘗試擷取許可。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// 構造方法,調用父類的構造方法
NonfairSync(int permits) {
super(permits);
}
// 嘗試擷取許可,調用父類的nonfairTryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
内部類FairSync
公平模式下,先檢測前面是否有排隊的,如果有排隊的則擷取許可失敗,進入隊列排隊,否則嘗試原子更新state的值。
**注意:**為了閱讀友善,該内部類中将一些AQS中的方法粘貼過來了,在方法頭注釋加有标注!
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
/**
* 該方法位于AQS中:
* 嘗試擷取通行證,擷取成功傳回 >= 0的值;
* 擷取失敗 傳回 < 0 值
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判斷目前 AQS 阻塞隊列内 是否有等待者線程,如果有直接傳回-1,表示目前aquire操作的線程需要進入到隊列等待..
if (hasQueuedPredecessors())
return -1;
// 執行到這裡,有哪幾種情況?
// 1.調用aquire時 AQS阻塞隊列内沒有其它等待者
// 2.目前節點 在阻塞隊列内是headNext節點
// 擷取state ,state這裡表示 通行證
int available = getState();
// remaining 表示目前線程 擷取通行證完成之後,semaphore還剩餘數量
int remaining = available - acquires;
// 條件一:remaining < 0 成立,說明線程擷取通行證失敗..
// 條件二:前置條件,remaning >= 0, CAS更新state 成功,說明線程擷取通行證成功,CAS失敗,則自旋。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* 該方法位于AQS中:
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 條件成立:說明目前調用acquire方法的線程 已經是 中斷狀态了,直接抛出異常..
if (Thread.interrupted())
throw new InterruptedException();
// 對應業務層面 執行任務的線程已經将latch打破了。然後其他再調用latch.await的線程,就不會在這裡阻塞了
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* 該方法位于AQS中:
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将調用Semaphore.aquire方法的線程 包裝成node加入到 AQS的阻塞隊列當中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 擷取目前線程節點的前驅節點
final Node p = node.predecessor();
// 條件成立,說明目前線程對應的節點 為 head.next節點
if (p == head) {
// head.next節點就有權利擷取 共享鎖了..
int r = tryAcquireShared(arg);
// 站在Semaphore角度:r 表示還剩餘的通行證數量
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire 會給目前線程找一個好爸爸,最終給爸爸節點設定狀态為 signal(-1),傳回true
// parkAndCheckInterrupt 挂起目前節點對應的線程...
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 該方法位于AQS中:
* 設定目前節點為 head節點,并且向後傳播!(依次喚醒!)
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将目前節點設定為 新的 head節點。
setHead(node);
// 調用setHeadAndPropagete 時 propagate == 1 一定成立
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 擷取目前節點的後繼節點..
Node s = node.next;
// 條件一:s == null 什麼時候成立呢? 目前node節點已經是 tail了,條件一會成立。 doReleaseShared() 裡面會處理這種情況..
// 條件二:前置條件,s != null , 要求s節點的模式必須是 共享模式。 latch.await() -> addWaiter(Node.SHARED)
if (s == null || s.isShared())
// 基本上所有情況都會執行到 doReleasseShared() 方法。
doReleaseShared();
}
}
//AQS.releaseShared 該方法位于AQS中:
public final boolean releaseShared(int arg) {
// 條件成立:表示目前線程釋放資源成功,釋放資源成功後,去喚醒擷取資源失敗的線程..
if (tryReleaseShared(arg)) {
// 喚醒擷取資源失敗的線程...
doReleaseShared();
return true;
}
return false;
}
/**
* 喚醒擷取資源失敗的線程
*
* CountDownLatch版本
* 都有哪幾種路徑會調用到doReleaseShared方法呢?
* 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 喚醒目前阻塞隊列内的 head.next 對應的線程。
* 2.被喚醒的線程 -> doAcquireSharedInterruptibly parkAndCheckInterrupt() 喚醒 -> setHeadAndPropagate() -> doReleaseShared()
*
* Semaphore版本
* 都有哪幾種路徑會調用到doReleaseShared方法呢?
*
*/
//AQS.doReleaseShared 該方法位于AQS中:
private void doReleaseShared() {
for (;;) {
// 擷取目前AQS 内的 頭結點
Node h = head;
// 條件一:h != null 成立,說明阻塞隊列不為空..
// 不成立:h == null 什麼時候會是這樣呢?
// latch建立出來後,沒有任何線程調用過 await() 方法之前,有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯..
// 條件二:h != tail 成立,說明目前阻塞隊列内,除了head節點以外 還有其他節點。
// h == tail -> head 和 tail 指向的是同一個node對象。 什麼時候會有這種情況呢?
// 1. 正常喚醒情況下,依次擷取到 共享鎖,目前線程執行到這裡時 (這個線程就是 tail 節點。)
// 2. 第一個調用await()方法的線程 與 調用countDown()且觸發喚醒阻塞節點的線程 出現并發了..
// 因為await()線程是第一個調用 latch.await()的線程,此時隊列内什麼也沒有,它需要補充建立一個Head節點,然後再次自旋時入隊
// 在await()線程入隊完成之前,假設目前隊列内 隻有 剛剛補充建立的空元素 head 。
// 同期,外部有一個調用countDown()的線程,将state 值從1,修改為0了,那麼這個線程需要做 喚醒 阻塞隊列内元素的邏輯..
// 注意:調用await()的線程 因為完全入隊完成之後,再次回到上層方法 doAcquireSharedInterruptibly 會進入到自旋中,
// 擷取目前元素的前驅,判斷自己是head.next, 是以接下來該線程又會将自己設定為 head,然後該線程就從await()方法傳回了...
if (h != null && h != tail) {
// 執行到if裡面,說明目前head 一定有 後繼節點!
int ws = h.waitStatus;
// 目前head狀态 為 signal 說明 後繼節點并沒有被喚醒過呢...
if (ws == Node.SIGNAL) {
// 喚醒後繼節點前 将head節點的狀态改為 0
// 這裡為什麼,使用CAS呢? 回頭說...
// 當doReleaseShared方法 存在多個線程 喚醒 head.next 邏輯時,
// CAS 可能會失敗...
// 案例:
// t3 線程在if(h == head) 傳回false時,t3 會繼續自旋. 參與到 喚醒下一個head.next的邏輯..
// t3 此時執行到 CAS WaitStatus(h,Node.SIGNAL, 0) 成功.. t4 在t3修改成功之前,也進入到 if (ws == Node.SIGNAL) 裡面了,
// 但是t4 修改 CAS WaitStatus(h,Node.SIGNAL, 0) 會失敗,因為 t3 改過了...
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒後繼節點
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 條件成立:
// 1.說明剛剛喚醒的 後繼節點,還沒執行到 setHeadAndPropagate方法裡面的 設定目前喚醒節點為head的邏輯。
// 這個時候,目前線程 直接跳出去...結束了..
// 此時用不用擔心,喚醒邏輯 在這裡斷掉呢?、
// 不需要擔心,因為被喚醒的線程 早晚會執行到doReleaseShared方法。
// 2.h == null latch建立出來後,沒有任何線程調用過 await() 方法之前,
// 有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯..
// 3.h == tail -> head 和 tail 指向的是同一個node對象
// 條件不成立:
// 被喚醒的節點 非常積極,直接将自己設定為了新的head,此時 喚醒它的節點(前驅),執行h == head 條件會不成立..
// 此時 head節點的前驅,不會跳出 doReleaseShared 方法,會繼續喚醒 新head 節點的後繼...
if (h == head) // loop if head changed
break;
}
}
}
構造方法
建立Semaphore時需要傳入許可次數。Semaphore預設也是非公平模式,但是你可以調用第二個構造方法聲明其為公平模式。
// 構造方法,建立時要傳入許可次數,預設使用非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 構造方法,需要傳入許可次數,及是否公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire()方法
擷取一個許可,預設使用的是可中斷方式,如果嘗試擷取許可失敗,會進入AQS的隊列中排隊。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 擷取一個許可,非中斷方式,如果嘗試擷取許可失敗,會進入AQS的隊列中排隊。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
cquire(int permits)方法
一次擷取多個許可,可中斷方式。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 一次擷取多個許可,非中斷方式。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
tryAcquire()方法
嘗試擷取一個許可,使用Sync的非公平模式嘗試擷取許可方法,不論是否擷取到許可都傳回,隻嘗試一次,不會進入隊列排隊。
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 嘗試擷取一個許可,先嘗試一次擷取許可,如果失敗則會等待timeout時間,這段時間内都沒有擷取到許可,則傳回false,否則傳回true;
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
elease()方法
釋放一個許可,釋放一個許可時state的值會加1,并且會喚醒下一個等待擷取許可的線程。
public void release() {
sync.releaseShared(1);
}
release(int permits)方法
一次釋放多個許可,state的值會相應增加permits的數量。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
4、小結
Semaphore,也叫信号量,通常用于控制同一時刻對共享資源的通路上,也就是限流場景;
Semaphore的内部實作是基于AQS的共享鎖來實作的;
Semaphore初始化的時候需要指定許可的次數,許可的次數是存儲在state中;
擷取一個許可時,則state值減1;
釋放一個許可時,則state值加1;