Semaphore簡介
Semaphore就是我們常說的信号量,本質就是基于AQS的一個共享鎖。對AQS不太了解的可以看我之前寫的AQS源碼解析的文章AQS源碼詳細分析,讓你掌握AQS原理,獨占鎖、共享鎖、Condition
Semaphore常常被用作限流器,通過共享鎖對資源進行限制。
Semaphore結構
如上圖所示,Semaphore實作了非公平鎖和公平鎖兩個模式。
Semaphore示例
class Pool {
private static final int MAX_AVAILABLE = 100;
// 初始化一個信号量,設定為公平鎖模式,總資源數為100個
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
// 擷取一個信号量
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
protected Object[] items = ...whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
以上代碼的場景是池子中有100個資源,線程可以單獨申請其中一個,當申請不到的時候會被挂起等待。
Semaphore源碼解析
1、初始化
public Semaphore(int permits) {
//permits為同一時刻容納的最大線程數
//預設調用了非公平鎖
sync = new NonfairSync(permits);
}
//也可以在初始化方法中傳入fair
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
2、acquire()
//申請鎖,可響應中斷
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//如果線程被中斷,抛出異常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//小于0代表申請失敗
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//将state-=acquires
int available = getState();
int remaining = available - acquires;
//如果remaining<0直接傳回
//如果CAS成功,傳回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//doAcquireSharedInterruptibly我們在AQS源碼詳解中,已經詳細說過了,不在展開說了
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入鎖queue中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//如果在queue中的第二個節點,嘗試申請鎖
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//申請鎖成功後,就将node移出queue
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//将線程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3、release()
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//如果釋放鎖成功
doReleaseShared();//喚醒其他等待線程來争奪鎖
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
//cas将state+=releases
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
//doReleaseShared()用來喚醒其他線程,我們在AQS源碼詳解中,已經詳細說過了,不在展開說了
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
Semaphore模式解析
方法 | 詳解 |
---|---|
tryAcquire() | 嘗試擷取鎖,隻擷取一次,申請不到就拉倒,搶鎖期間不響應中斷 |
tryAcquire(long timeout,TimeUnit unit) | 在timeout個unit時間内,申請鎖,申請不到就挂起,搶鎖期間可響應中端 |
acquire() | 阻塞式申請鎖,申請不到就挂起,搶鎖期間可響應中斷 |
acquireUninterruptibly | 阻塞式申請鎖,申請不到就挂起,搶鎖期間不響應中斷,在搶鎖成功後才響應中斷 |