天天看點

Semaphore源碼閱讀1.作用說明2.代碼實作3.源碼閱讀

文章目錄

  • 1.作用說明
  • 2.代碼實作
    • 1.代碼書寫
    • 2.輸出結果
  • 3.源碼閱讀
    • 1.構造函數
    • 2.請求鎖:acquire
      • 1.嘗試擷取鎖:tryAcquire
      • 2.共享隊列等待:doAcquireSharedInterruptibly
    • 2.釋放鎖:release
      • 1.嘗試釋放:tryReleaseShared
      • 2.解鎖:doReleaseShared

1.作用說明

網上有一些比較好的解釋,直接截圖:

Semaphore源碼閱讀1.作用說明2.代碼實作3.源碼閱讀

https://www.jianshu.com/p/2f1d6942bcd5

關于Semaphore,他的中文解釋是信号量,可以舉例子來了解一下這個東西:

銀行有兩個辦事視窗,但是這個時候可能有十個人在等待,那麼等待的十個人需要等待視窗空閑了才能夠進行業務辦理.這個時候銀行視窗就是信号量,信号量為2,而等待的人員相當于等待執行的線程,每次隻有等待信号量釋放才能讓其他線程進來運作。

2.代碼實作

1.代碼書寫

public class SemaphoreDemo {

    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getId() + "工作結束了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            }.start();
        }
    }


}
           

2.輸出結果

Semaphore源碼閱讀1.作用說明2.代碼實作3.源碼閱讀
Semaphore源碼閱讀1.作用說明2.代碼實作3.源碼閱讀

3.源碼閱讀

1.構造函數

這邊有兩個構造函數,預設是非公平的方式實作的,也可以通過設定boolean值來決定使用公平方式還是非公平的方式.不多贅述.

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
           
public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
           

2.請求鎖:acquire

這邊擷取的是共享鎖,調用AQS的實作。具體來看一下tryAcquire這個方法

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
           
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        //線程本身就是阻塞狀态報錯
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        if (tryAcquireShared(arg) < 0) {
            //嘗試擷取共享鎖失敗,這邊的嘗試是直接傳回狀态時候為0的結果,如果不為0表示擷取失敗
            doAcquireSharedInterruptibly(arg);
        }
    }
           

1.嘗試擷取鎖:tryAcquire

通過擷取目前的state,減去需要的共享鎖數量,通過剩餘的共享鎖數量表示是否需要等待,這邊有幾種情況:

1.如果結果<0.表示目前沒有可用的信号量,必須要加入到共享隊列中進行等待

2.如果CAS設定非0的值成功,則表示目前已經擷取到鎖,不需要加入到共享隊列.

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
           
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //state - acquire得到目前剩餘可用的信号量
                int available = getState();
                int remaining = available - acquires;

                //小于0表示沒有可用資源,需要加入到等待隊列中,否則通過CAS設定成功表示目前奪鎖成功,可以運作了。
                if (remaining < 0 || compareAndSetState(available, remaining)){
                    return remaining;
                }
            }
        }
           

2.共享隊列等待:doAcquireSharedInterruptibly

這邊同樣需要調用tryAcquire方法來判斷是否可以擷取到運作權限,如果信号量任然小于0,則目前的線程會一直等待,直到有可用的信号量才能執行線程方法。

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        //将目前線程添加到隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            //死循環判斷是否頭節點,頭結點則擷取鎖,
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        // help GC
                        p.next = null;
                        failed = false;
                        return;
                    }
                }

                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }
           

2.釋放鎖:release

release方法發這邊直接調用AQS的releaseShared方法。

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

           
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
           

1.嘗試釋放:tryReleaseShared

通過死循環去釋放鎖,這邊一定會傳回true結果,使用循環CAS方式.

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                // overflow
                //下一個小于目前,說明不對,直接抛錯,如果傳進來的是負數,有可能出現這種情況
                if (next < current) {
                    throw new Error("Maximum permit count exceeded");
                }
                //通過CAS方式修改state成功傳回true。這是一個死循環,是以最後一定會成功傳回的.
                //也就是說目前持有的信号量一定會被釋放掉.
                if (compareAndSetState(current, next)){
                    return true;
                }
            }
        }
           

2.解鎖:doReleaseShared

這邊是 釋放頭結點(通過死循環保證一定被釋放),并設定為傳播态。

private void doReleaseShared() {
        for (; ; ) {
            //進入死循環
            Node h = head;
            //如果有隊列有節點等待
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果waitState是SIGNAL
                if (ws == Node.SIGNAL) {
                    //設定waitstate 為0失敗則continue重新開始循環
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                        // loop to recheck cases
                        continue;
                    }
                    //如果設定節點waitstate成功,則喚醒下一個節點
                    unparkSuccessor(h);
                }
                //設定為傳播狀态.
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                    // loop on failed CAS
                    continue;
                }

            }
            // loop if head changed
            //保證節點被釋放才能結束循環
            if (h == head) {
                break;
            }
        }
    }
           

繼續閱讀