摘要:
一、Semaphore介紹
Semaphore意思為信号量,是用來控制同時通路特定資源的線程數數量。它的本質上其實也是一個共享鎖。Semaphore可以用于做流量控制,特别是公用資源有限的應用場景。例如:某個停車場車位隻有10個,一開始停車場沒有車輛所有車位全部空着,然後先後到來八輛車,停車場車位夠,安排進去停車,然後又來三輛,這個時候由于隻有兩個停車位,所有隻能停兩輛,其餘一輛必須在外面候着,直到停車場有空車位,當然以後每來一輛都需要在外面候着。當停車場有車開出去,裡面有空位了,則安排一輛車進去。這個場景就是典型的信号量應用場景。
二、SemaphoreAPI使用介紹
2.1、構造函數:
- Semaphore(int permits) :建立具有給定的許可數和非公平的公平設定的 Semaphore。
- Semaphore(int permits, boolean fair) :建立具有給定的許可數和給定的公平設定的 Semaphore。
2.2、信号量擷取:
1. Semaphore提供了acquire()方法來擷取一個許可。
2.3、信号量釋放:
1. Semaphore提供release()來釋放許可。
2.4、其他方法:
1. intavailablePermits():傳回此信号量中目前可用的許可證數。
2. intgetQueueLength():傳回正在等待擷取許可證的線程數。
3. booleanhasQueuedThreads():是否有線程正在等待擷取許可證。
· 4. void reducePermits(int reduction):減少reduction個許可證,是個protected方法。
· 5. Collection getQueuedThreads():傳回所有等待擷取許可證的線程集合,是個protected方
三.源代碼分析
package com.cloudtravel.common.sourcecode;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Semaphore是一個線程同步的輔助類,可以維護控制通路目前資源的線程并發個數,并提供了同步機制 ,
* 使用semaphore可以控制同時通路資源的個數.較多的如可以控制接口的并發個數
* 對比Mutex :
* Mutex一般用來串行化Critical section代碼的通路,即最多同時允許一個線程通路 . 定義比較明顯
* Semaphore: 控制資源允許的并發量 , 當其允許的并發量設定為1時,又稱之為Binary Semaphore
* Binary Semaphore同Mutex對比:
* 作用:兩者在所起到的作用上并沒有什麼差別 , 此時都是為了保證資源可以串行化通路,即都是保證同時隻有一個線程通路資源 .
* 原理 : 兩者在實作原理上有差別 .
* Semaphore: 為非對稱模型,可以了解為,鎖雖然隻有一把,同時也隻能有一把鑰匙開着,但是不同的線程擁有的鑰匙不同.
* semaphore可以了解為一個原子變量 , 信号量通過内部計數器,控制線程的并發量.且可以通過變更信号量[permit]進行靈活的調整
* Mutex: 為對稱模型,即一把鎖隻有一把鑰匙,線程之間通過這一把鑰匙控制資源的通路 .也就是通過synchronized安全鎖進行控制的.
* @author Administrator
*/
public class SemaphoreAnalyse implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/**
* 通過AbstractQueuedSynchronizer子類實作的所有機制
*/
private final Sync sync;
/**
* Synchronization implementation for SemaphoreAnalyse. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
/**
* 為semaphore定義的鎖抽象類,繼承自AQS,采用CAS算法,
* 借用AQS中的state控制是否允許通路 .
* sync為抽象類 . 主要定義了擷取鎖和釋放鎖的非公平鎖方法 .
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
/** 調用AQS中的方法設定目前允許通路的并發數.permits : 建立semaphore執行個體的時候傳進來 */
Sync(int permits) {
setState(permits);
}
/** 擷取目前可使用的資源并發數 */
final int getPermits() {
return getState();
}
/** 擷取一定數量的資源數.-非公平鎖版本,傳回目前剩餘可使用的信号量 . */
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
/** 釋放一定量的資源 */
@Override
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState();
int next = current + releases;
if (next < current)
{
throw new Error("Maximum permit count exceeded");
}
if (compareAndSetState(current, next))
{
return true;
}
}
}
/** 減小信号量,同nonfairTryAcquireShared操作類似.隻是隻能減小,不能傳入負值 */
final void reducePermits(int reductions) {
for (; ; ) {
int current = getState();
int next = current - reductions;
if (next > current) {
throw new Error("Permit count underflow");
}
if (compareAndSetState(current, next)) {
return;
}
}
}
/** 銷毀信号量-将目前可使用信号量改為0.即不允許通路 */
final int drainPermits() {
for (; ; ) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/** 非公平鎖版本 */
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
/** 構造器 */
NonfairSync(int permits) {
super(permits);
}
/** 擷取信号量資源 */
@Override
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平鎖版本
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
/** 擷取資源信号量 目前state -1
* 對比NonFairSync多了hasQueuedPredecessors()的判斷
* hasQueuedPredecessors()判斷目前線程是否需要排隊.并将目前線程加入到隊列中
* 隊列遵循FIFO[先進先出]的原則,若目前隊列中無等待線程,則傳回false表示不需要排隊,
* 若存在等待線程,說明目前可用信号量已耗盡 , 需要排隊,傳回true.
* */
protected int tryAcquireShared(int acquires) {
for (; ; ) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
/**
* 構造器-預設非公平鎖版本
* @param permits
*/
public SemaphoreAnalyse(int permits) {
sync = new NonfairSync(permits);
}
/** 構造器,可自定義選擇公平鎖還是非公平鎖 */
public SemaphoreAnalyse(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* 擷取一個信号量, 目前state -1
* 其核心實作為AQS的doAcquireSharedInterruptibly()方法
* 若目前信号量資源耗盡,則會中斷響應
* */
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 擷取信号量,目前state -1
* 若目前信号量資源耗盡,則響應等待直到擷取到資源 . 采用自旋鎖
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* 擷取一個信号量 . 目前state -1
* 若目前還有資源可用 . 傳回true . 若無,傳回false
* @return
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* 擷取一個信号量 . 目前state -1
* 若目前還有資源可用 . 傳回true . 若無,傳回false . 可自定義逾時時間,逾時中斷響應
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/** 釋放一個資源 . 目前state + 1 */
public void release() {
sync.releaseShared(1);
}
/** 擷取一定量的信号量資源 . 若目前信号量耗盡或者無法滿足,則中斷響應 */
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
* 從該信号量分析擷取給定數量的資源,直到所有許可證都可用為止。
* 擷取給定數量的許可證(如果可用),并立即傳回,将可用許可證數量減少給定數量。
* 如果沒有足夠的信号量可用,則目前線程出于線程排程目的将被禁用,并處于休眠狀态,
* 直到其他線程調用此信号量分析的某個釋放方法,目前線程下一個将被配置設定許可證,并且可用許可證的數量滿足此請求。
* 如果目前線程在等待許可證時被中斷,那麼它将繼續等待,并且它在隊列中的位置不受影響。當線程确實從此方法傳回時,将設定其中斷狀态。
* @param permits
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
/**
* 僅當調用時所有許可證都可用時,從該信号量分析擷取給定數量的許可證。
* 擷取給定數量的許可證(如果可用),并立即傳回,傳回值為true,将可用許可證數量減少給定數量。
* 如果沒有足夠的許可證可用,則此方法将立即傳回值false,并且可用許可證的數量不變。
* 即使此信号量分析已設定為使用公平排序政策,對tryAcquire的調用也将立即獲得許可證(如果有),
* 無論其他線程目前是否正在等待。這種“讨價還價”行為在某些情況下是有用的,即使它破壞了公平性。
* 如果您想遵守公平性設定,那麼使用幾乎相等的tryAcquire(允許,0,TimeUnit.SECONDS)(它還檢測中斷)。
* @param permits
* @return
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
* 以共享定時模式擷取信号量,逾時中斷響應
* @param permits
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
* 釋放給定數量的許可證
* @param permits
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
/**
* 傳回目前可用的信号量
* @return
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* 将目前可用信号量全部占用
* @return
*/
public int drainPermits() {
return sync.drainPermits();
}
/**
* 按指定的減少量縮小可用許可證的數量。此方法在使用信号量分析跟蹤不可用資源的子類中非常有用。
* 此方法與acquire的不同之處在于,它不會發生線程阻塞
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
/** 目前是否為公平鎖 */
public boolean isFair() {
return sync instanceof FairSync;
}
/**
* 目前是否有線程在隊列中等待
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 擷取等待線程的隊列的長度.即傳回有多少個等待線程
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* 擷取隊列中的所有等待線程 AQS内自帶的方法
* @return the collection of threads
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
/**
* toString
* @return a string identifying this SemaphoreAnalyse, as well as its state
*/
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}