文章目錄
- 1.作用說明
- 2.代碼實作
-
- 1.代碼書寫
- 2.輸出結果
- 3.源碼閱讀
-
- 1.構造函數
- 2.請求鎖:acquire
-
- 1.嘗試擷取鎖:tryAcquire
- 2.共享隊列等待:doAcquireSharedInterruptibly
- 2.釋放鎖:release
-
- 1.嘗試釋放:tryReleaseShared
- 2.解鎖:doReleaseShared
1.作用說明
網上有一些比較好的解釋,直接截圖:
https://www.jianshu.com/p/2f1d6942bcd5
關于Semaphore,他的中文解釋是信号量,可以舉例子來了解一下這個東西:
銀行有兩個辦事視窗,但是這個時候可能有十個人在等待,那麼等待的十個人需要等待視窗空閑了才能夠進行業務辦理.這個時候銀行視窗就是信号量,信号量為2,而等待的人員相當于等待執行的線程,每次隻有等待信号量釋放才能讓其他線程進來運作。
2.代碼實作
1.代碼書寫
public class SemaphoreDemo {
public static void main(String[] args) {
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread() {
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId() + "工作結束了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}.start();
}
}
}
2.輸出結果
3.源碼閱讀
1.構造函數
這邊有兩個構造函數,預設是非公平的方式實作的,也可以通過設定boolean值來決定使用公平方式還是非公平的方式.不多贅述.
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
2.請求鎖:acquire
這邊擷取的是共享鎖,調用AQS的實作。具體來看一下tryAcquire這個方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//線程本身就是阻塞狀态報錯
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (tryAcquireShared(arg) < 0) {
//嘗試擷取共享鎖失敗,這邊的嘗試是直接傳回狀态時候為0的結果,如果不為0表示擷取失敗
doAcquireSharedInterruptibly(arg);
}
}
1.嘗試擷取鎖:tryAcquire
通過擷取目前的state,減去需要的共享鎖數量,通過剩餘的共享鎖數量表示是否需要等待,這邊有幾種情況:
1.如果結果<0.表示目前沒有可用的信号量,必須要加入到共享隊列中進行等待
2.如果CAS設定非0的值成功,則表示目前已經擷取到鎖,不需要加入到共享隊列.
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//state - acquire得到目前剩餘可用的信号量
int available = getState();
int remaining = available - acquires;
//小于0表示沒有可用資源,需要加入到等待隊列中,否則通過CAS設定成功表示目前奪鎖成功,可以運作了。
if (remaining < 0 || compareAndSetState(available, remaining)){
return remaining;
}
}
}
2.共享隊列等待:doAcquireSharedInterruptibly
這邊同樣需要調用tryAcquire方法來判斷是否可以擷取到運作權限,如果信号量任然小于0,則目前的線程會一直等待,直到有可用的信号量才能執行線程方法。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//将目前線程添加到隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//死循環判斷是否頭節點,頭結點則擷取鎖,
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
// help GC
p.next = null;
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
2.釋放鎖:release
release方法發這邊直接調用AQS的releaseShared方法。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
1.嘗試釋放:tryReleaseShared
通過死循環去釋放鎖,這邊一定會傳回true結果,使用循環CAS方式.
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// overflow
//下一個小于目前,說明不對,直接抛錯,如果傳進來的是負數,有可能出現這種情況
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
//通過CAS方式修改state成功傳回true。這是一個死循環,是以最後一定會成功傳回的.
//也就是說目前持有的信号量一定會被釋放掉.
if (compareAndSetState(current, next)){
return true;
}
}
}
2.解鎖:doReleaseShared
這邊是 釋放頭結點(通過死循環保證一定被釋放),并設定為傳播态。
private void doReleaseShared() {
for (; ; ) {
//進入死循環
Node h = head;
//如果有隊列有節點等待
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果waitState是SIGNAL
if (ws == Node.SIGNAL) {
//設定waitstate 為0失敗則continue重新開始循環
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
// loop to recheck cases
continue;
}
//如果設定節點waitstate成功,則喚醒下一個節點
unparkSuccessor(h);
}
//設定為傳播狀态.
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// loop on failed CAS
continue;
}
}
// loop if head changed
//保證節點被釋放才能結束循環
if (h == head) {
break;
}
}
}