天天看點

Java Semaphore 信号量的源碼深度解析與應用1 Semaphore的概述2 Semaphore的原理3 Semaphore的使用4 Semaphore的總結

詳細介紹了Semaphore信号量的原理和應用,以及與CountDownLatch的對比!

文章目錄

  • 1 Semaphore的概述
  • 2 Semaphore的原理
    • 2.1 基本結構
    • 2.2 可中斷擷取信号量
      • 2.1.1 公平模式
      • 2.1.2 非公平模式
    • 2.3 不可中斷擷取信号量
    • 2.4 逾時可中斷擷取信号量
    • 2.5 嘗試擷取信号量
    • 2.6 釋放信号量
  • 3 Semaphore的使用
  • 4 Semaphore的總結

1 Semaphore的概述

public class Semaphore

extends Object

implements Serializable

Semaphore來自于JDK1.5的JUC包,直譯過來就是信号量,被作為一種多線程并發控制工具來使用。

Semaphore可以控制同時通路共享資源的線程個數,線程通過 acquire方法擷取一個信号量,信号量減一,如果沒有就等待;通過release方法釋放一個信号量,信号量加一。它通過控制信号量的總數量,以及每個線程所需擷取的信号量數量,進而控制多個線程對共享資源通路的并發度,以保證合理的使用共享資源。相比synchronized和獨占鎖一次隻能允許一個線程通路共享資源,功能更加強大,有點類似于共享鎖!

2 Semaphore的原理

2.1 基本結構

Java Semaphore 信号量的源碼深度解析與應用1 Semaphore的概述2 Semaphore的原理3 Semaphore的使用4 Semaphore的總結

根據uml類圖,可以很明顯的看出來Semaphore和CountDownLatch一樣都是直接使用AQS實作的。差別就是Semaphore還分别實作了公平模式FairSync和非公平模式NonfairSync兩個内部類。

實際上公平與非公平隻是在擷取信号量的時候得到展現,它們的釋放信号量的方法都是一樣的,這就類似于ReentrantLock:公平與非公平隻是在擷取鎖的時候得到展現,它們的釋放鎖的方法都是一樣的!或許這裡有人在想,信号量是不是可以看作鎖資源呢?某些時候這麼看是沒問題的,比如都是擷取了隻有擷取了“信号量”或者“鎖”才能通路共享資源,但是它們又有差別,鎖資源會和線程綁定,而信号量則不會和線程綁定。

在構造器部分,如同CountDownLatch 構造函數傳遞的初始化計數個數count被賦給了AQS 的state 狀态變量一樣,Semaphore的信号量個數permits同樣賦給了AQS 的state 值。

在建立Semaphore時可以使用一個fair變量指定是否使用公平政策,預設是非公平的模式。公平模式會確定所有等待的擷取信号量的線程按照先進先出的順序擷取信号量,而非公平模式則沒有這個保證。非公平模式的吞吐量比公平模式的吞吐量要高,而公平模式則可以避免線程饑餓。

/**
 * 儲存某個AQS子類執行個體
 */
private final Sync sync;

/**
 * 建立具有給定的信号量數和非公平的公平模式的 Semaphore。
 *
 * @param permits 初始的可用信号量數目。此值可能為負數,在這種情況下,必須在授予任何擷取信号量前進行釋放信号量。
 */
public Semaphore(int permits) {
    //預設初始化NonfairSync執行個體
    sync = new NonfairSync(permits);
}

/**
 * 建立具有給定的信号量數和給定的公平設定的 Semaphore。
 *
 * @param permits 初始的可用信号量數目。此值可能為負數,在這種情況下,必須在授予任何擷取信号量前進行釋放信号量。
 * @param fair    如果此信号量保證在争用時按先進先出的順序授予信号量,則為 true;否則為 false。
 */
public Semaphore(int permits, boolean fair) {
    //根據fair參數選擇初始化一個公平FairSync類或者非公平NonfairSync類的執行個體
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}


/**
 * 非公平模式的實作
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    //…………其他方法後面再講

}

/**
 * 公平模式的實作
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    //…………其他方法後面再講

}

/**
 * 信号量的同步實作。 使用 AQS 的state狀态表示信号量。子分類為公平和非公平模式。
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    /**
     * 構造器
     *
     * @param permits 初始的可用信号量數目。
     */
    Sync(int permits) {
        //被設定為state值
        setState(permits);
    }

    //…………其他方法後面再講
}
           

2.2 可中斷擷取信号量

public void acquire()

可中斷的擷取一個信号量,沒有則一直阻塞,直到在其他線程提供信号量并喚醒該線程或者線程被中斷。擷取一個信号量就立即傳回,将可用的信号量數減 1。

如果調用此方法時已被中斷或者等待時被中斷,則抛出 InterruptedException,并且清除目前線程的已中斷狀态。

public void acquire(int permits)

可中斷的擷取permits 個信号量。

内部調用AQS的acquireSharedInterruptibly方法,這實際上就是共享式可中斷擷取資源的模版方法,是以Semaphore和CountDownLatch一樣都是基于共享資源模式。

/**
 * Semaphore的acquire方法
 * 從信号量擷取一個信号量,沒有則一直阻塞,直到在其他線程提供信号量并喚醒或者線程被中斷。
 *
 * @throws InterruptedException 如果調用此方法時已被中斷或者等待時被中斷
 */
public void acquire() throws InterruptedException {
    //内部調用AQS的acquireSharedInterruptibly方法
    //這實際上就是共享式可中斷擷取資源模版方法
    sync.acquireSharedInterruptibly(1);
}

/**
 * 從信号量擷取permits個信号量,沒有則一直阻塞,直到在其他線程提供信号量并喚醒或者線程被中斷。
 *
 * @param permits 需要擷取的信号量數量
 * @throws InterruptedException 如果調用此方法時已被中斷或者等待時被中斷
 */
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    //參數就是permits
    sync.acquireSharedInterruptibly(permits);
}

/**
 1. AQS的acquireSharedInterruptibly方法
 2. 共享式可中斷擷取信号量資源的模版方法
 3.  4. @param arg 需要擷取的信号量資源數量
 5. @throws InterruptedException 如果調用此方法時已被中斷或者等待時被中斷
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //最開始就檢查一次,如果目前線程是被中斷狀态,直接抛出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    //調用tryAcquireShared嘗試擷取共享信号量資源,這個方法是子類自己重寫的
    //如果傳回值小于0,表示目前線程共享信号量資源失敗,否則表示成功
    //Semaphore的FairSync和NonfairSync對tryAcquireShared分别做出了公平和不公平的實作
    if (tryAcquireShared(arg) < 0)
        //擷取不到就執行doAcquireSharedInterruptibly方法
        doAcquireSharedInterruptibly(arg);
}
           

在擷取共享信号量資源的時候,Semaphore還實作了公平模式和非公平模式!它們的實作實際上和lock鎖的實作中鎖資源的公平、非公平擷取非常類似!

2.1.1 公平模式

公平模式調用FairSync的tryAcquireShared方法!

如果我們學習了AQS、ReentrantLock、ReadWriteLock的源碼,我們第一個就會發現hasQueuedPredecessors方法,這個方法是AQS為實作公平模式的預定義的方法,AQS幫我們實作好了,該方法用于查詢是否有任何線程等待擷取信号量資源的時間超過目前線程。

大概步驟為:

  1. 開啟一個死循環:
  2. 調用hasQueuedPredecessors方法,判斷是否有線程比目前線程更早地請求擷取信号量資源。如果該方法傳回true,則表示有線程比目前線程更早地請求擷取信号量資源,由于是公平的的,是以目前線程不應該擷取信号量資源,直接傳回-1,表示擷取信号量資源失敗。
  3. 到這裡還沒有傳回,表示目前線程就是最早請求擷取信号量資源,可以嘗試擷取。
  4. 擷取state的值available,我們知道state代表信号量資源數量。remaining為available減去需要擷取的信号量資源數量之後的內插補點。
  5. 如果remaining小于0,那麼傳回remaining值,由于是負數,是以擷取失敗,如果大于等于0,那麼表示可以擷取成功,嘗試CAS的更新state,更新成功之後同樣傳回remaining,由于是大于等于0的數,是以擷取成功。
  6. 如果remaining大于等于0,但是CAS更新state失敗,那麼循環重試。

原理還是很簡單的,就是判斷目前的信号量資源數量—state的值,是否滿足要擷取的信号量資源數量,acquire()方法預設擷取1個資源。擷取到了就是CAS的原子性的将state遞減,否則表示擷取資源失敗,那麼可能會阻塞。但是我們也會發現:如果remaining大于等于0,但是CAS更新state失敗,那麼會循環重試,這裡為什麼要重試呢?

實際上我們的在AQS文章的“可重入共享鎖的實作” 部分已經講過:因為可能會有多個線程同時擷取信号量資源,但是由于CAS隻能保證一次隻有一個線程成功,是以其他線程必定失敗,但此時,實際上還是存在剩餘的信号量資源沒有被擷取完畢的,是以讓其他線程重試,相比于直接加入到同步隊列中,對于信号量資源的使用率更高!

/**
 * 公平模式
 */
static final class FairSync extends Sync {
    /**
     * 嘗試公平的擷取共享信号量資源
     *
     * @param acquires 擷取信号量資源數量
     * @return 如果傳回值小于0,表示目前線程共享信号量資源失敗,否則表示成功
     */
    protected int tryAcquireShared(int acquires) {
        /*開啟一個循環嘗試擷取共享信号量資源*/
        for (; ; ) {
            //這是AQS實作公平模式的預定義的方法,AQS幫我們實作好了。該方法用于查詢是否有任何線程等待擷取信号量資源的時間超過目前線程
            //如果該方法傳回true,則表示有線程比目前線程更早地請求擷取信号量資源。由于是公平的的,是以目前線程不應該擷取信号量資源,直接傳回-1,表示擷取信号量資源失敗
            if (hasQueuedPredecessors())
                return -1;
            //到這裡,表示目前線程就是最早請求擷取信号量資源,可以嘗試擷取

            //擷取state的值available,我們知道state代表信号量資源數量
            int available = getState();
            //remaining為available減去需要擷取的信号量資源數量之後的內插補點
            int remaining = available - acquires;
            //如果remaining小于0,那麼傳回remaining值,由于是負數,是以擷取失敗
            //如果大于等于0,那麼表示可以擷取成功,嘗試CAS的更新state,更新成功之後同樣傳回remaining,由于是大于等于0的數,是以擷取成功
            if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                return remaining;
            //如果remaining大于等于0,但是CAS更新state失敗,那麼循環重試
        }
    }
}
           

2.1.2 非公平模式

非公平模式調用NonfairSync的tryAcquireShared方法!

相比于公平模式的實作,少了hasQueuedPredecessors的判斷。可以想象:如果某線程A 先調用了aquire()方法擷取信号量,但是如果目前信号量個數為0,那麼線程A 會被放入AQS 的同步隊列阻塞。

過一段時間後線程B調用了release()方法釋放了一個信号量,他它會喚醒隊列中等待的線程A,但是這時線程C又調用了aquire()方法。如果采用非公平政策,那麼線程C就會和線程A 去競争這個信号量資源。由nonfairTryAcquireShared的代碼可知,線程C完全可以線上程A 被激活前,或者激活後先于線程A 擷取到該信号量,也就是在這種模式下阻塞線程和目前請求的線程是競争關系,而不遵循先來先得的政策。

另外,非公平模式的具體實作是在父類Sync中的nonfairTryAcquireShared方方法,為什麼該方法要實作在父類中的,因為無論是指定的公平模式還是非公平模式,它們的tryAcquire方法都是調用的nonfairTryAcquireShared方法,即非公平的,是以實作在父類中!

/**
 * 非公平模式
 */
static final class NonfairSync extends Sync {

    /**
     * 嘗試非公平的擷取共享信号量資源
     *
     * @param acquires 擷取信号量資源數量
     * @return 如果傳回值小于0,表示目前線程共享信号量資源失敗,否則表示成功
     */
    protected int tryAcquireShared(int acquires) {
        //調用父類Sync的nonfairTryAcquireShared方法
        //為什麼該方法要實作在父類中的,因為無論是指定的公平模式還是非公平模式,
        //它們的tryAcquire方法都是調用的nonfairTryAcquireShared方法,即非公平的,是以實作在父類中
        return nonfairTryAcquireShared(acquires);
    }
}

/**
 * AQS的實作,作為公平和非公平模式的父類,有一些共享方法
 */
abstract static class Sync extends AbstractQueuedSynchronizer {

    /**
     * 嘗試非公平的擷取共享信号量資源
     *
     * @param acquires 擷取信号量資源數量
     * @return 如果傳回值小于0,表示目前線程共享信号量資源失敗,否則表示成功
     */
    final int nonfairTryAcquireShared(int acquires) {
        /*開啟一個循環嘗試擷取共享信号量資源*/
        for (; ; ) {
            //相比于公平模式,少了hasQueuedPredecessors的實作
            //擷取state的值available,我們知道state代表信号量資源數量
            int available = getState();
            //remaining為available減去需要擷取的信号量資源數量之後的內插補點
            int remaining = available - acquires;
            //如果remaining小于0,那麼傳回remaining值,由于是負數,是以擷取失敗
            //如果大于等于0,那麼表示可以擷取成功,嘗試CAS的更新state,更新成功之後同樣傳回remaining,由于是大于等于0的數,是以擷取成功
            if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                return remaining;
            //如果remaining大于等于0,但是CAS更新state失敗,那麼循環重試
        }
    }
}
           

2.3 不可中斷擷取信号量

public void acquireUninterruptibly()

不可中斷的擷取一個信号量,沒有則一直阻塞,直到在其他線程提供信号量并喚醒該線程。擷取一個信号量就立即傳回,将可用的信号量數減 1。

相比于acquire()方法,該方法不響應中斷,不會抛出InterruptedException

public void acquireUninterruptibly(int permits)

不可中斷的擷取permits個信号量。

相比于acquire方法,acquireUninterruptibly方法不響應中斷,不會抛出InterruptedException。實際上内部調用AQS的acquireShared方法,這實際上就是共享式擷取資源的模版方法式。

/**
 * 擷取一個信号量,沒有則一直阻塞,直到在其他線程提供信号量并喚醒該線程。
 * 擷取一個信号量就立即傳回,将可用的信号量數減 1。
 */
public void acquireUninterruptibly() {
    //内部調用AQS的acquireShared方法
    //這實際上就是共享式不可中斷擷取資源模版方法
    sync.acquireShared(1);
}

/**
 * AQS的acquireShared方法
 * 共享式不可中斷擷取資源模版方法
 *
 * @param arg 擷取的資源數量
 */
public final void acquireShared(int arg) {
    //并沒有檢查中斷
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}


/**
 * 擷取permits個信号量,沒有則一直阻塞,直到在其他線程提供信号量并喚醒該線程。
 *
 * @param permits 擷取的信号量數量
 */
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //參數就是permits
    sync.acquireShared(permits);
}
           

2.4 逾時可中斷擷取信号量

public boolean tryAcquire(long timeout, TimeUnit unit)

逾時可中斷的擷取一個信号量,沒有則一直阻塞,直到在其他線程提供信号量并喚醒該線程或者線程被中斷或者阻塞逾時。擷取一個信号量就立即傳回,将可用的信号量數減 1。

如果調用此方法時已被中斷或者等待時被中斷,則抛出 InterruptedException,并且清除目前線程的已中斷狀态。

public boolean tryAcquire(int permits,long timeout,TimeUnit unit)

逾時可中斷的擷取permits 個信号量。

實際上内部調用AQS的tryAcquireSharedNanos方法,這實際上就是共享式逾時可中斷擷取資源的模版方法。

/**
 * @param timeout 逾時時間
 * @param unit    時間機關
 * @return 是否擷取資源成功
 * @throws InterruptedException 如果調用此方法時已被中斷或者等待時被中斷
 */
public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    //實際上就是調用的AQS的共享式逾時擷取資源的方法,擷取1個資源
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

/**
 * @param permits 擷取的資源數量
 * @param timeout 逾時時間
 * @param unit    時間機關
 * @return 是否擷取資源成功
 * @throws InterruptedException 如果調用此方法時已被中斷或者等待時被中斷
 */
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    //實際上就是調用的AQS的共享式逾時擷取資源的方法,擷取permits個資源
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

/**
 * AQS的共享式逾時擷取資源的模版方法,支援中斷
 *
 * @param arg          參數
 * @param nanosTimeout 逾時時間,納秒
 * @return 是否擷取資源成功
 * @throws InterruptedException 如果調用此方法時已被中斷或者等待時被中斷
 */
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    //最開始就檢查一次,如果目前線程是被中斷狀态,直接抛出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    //下面是一個||運算進行短路連接配接的代碼,同樣左邊是調用子類實作的tryAcquireShared嘗試擷取資源,擷取到了直接傳回true
    //擷取不到資源就執行doAcquireSharedNanos方法,這個方法是AQS的方法,是以逾時機制是AQS幫我們實作的!
    return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
}
           

2.5 嘗試擷取信号量

public boolean tryAcquire()

僅在調用時至少存在至少一個可用信号量,才嘗試擷取一個信号量。

public boolean tryAcquire(int permits)

僅在調用時至少存在permits個的信号量,才嘗試擷取permits個信号量。

實際上内部就是直接調用的nonfairTryAcquireShared方法,即公平模式和非公平模式的tryAcquire實作是一樣的!并且該方法不會阻塞線程,擷取成功傳回true,擷取失敗傳回false!

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //調用nonfairTryAcquireShared方法
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //調用nonfairTryAcquireShared方法
    return sync.nonfairTryAcquireShared(permits) >= 0;
}
           

2.6 釋放信号量

public void release()

釋放一個信号量,信号量總數加1。釋放成功後,将喚醒在同步隊列中等待擷取信号量的結點(線程)!

public void release(int permits)

釋放permits個信号量,信号量總數加permits。釋放成功後,将喚醒在同步隊列中等待擷取信号量的結點(線程)!

公平模式和非公平模式的信号量的釋放都是一樣的。實際上内部調用AQS的releaseShared方法,這實際上就是共享式釋放資源的模版方法。

/**
 * 釋放一個信号量,信号量總數加1。
 */
public void release() {
    //内部調用AQS的releaseShared方法
    //這實際上就是共享式釋放資源的模版方法
    sync.releaseShared(1);
}

/**
 * 釋放permits個信号量,信号量總數加permits。
 *
 * @param permits 釋放的信号量個數
 */
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //參數就是permits
    sync.releaseShared(permits);
}


/**
 * AQS的共享模式下釋放資源的模版方法。
 * 如果成功釋放則會調用doReleaseShared
 */
public final boolean releaseShared(int arg) {
    //tryReleaseShared釋放信号量資源,該方法由子類自己實作
    if (tryReleaseShared(arg)) {
        //釋放成功,必定調用doReleaseShared嘗試喚醒後繼結點,即阻塞的線程
        doReleaseShared();
        return true;
    }
    return false;
}

/**
 * Sync的tryReleaseShared實作
 *
 * @param releases 要釋放的資源數量
 * @return true 成功 false 失敗
 */
protected final boolean tryReleaseShared(int releases) {
    for (; ; ) {
        //很簡單,就是嘗試CAS的增加state值,增加releases
        int current = getState();
        int next = current + releases;
        //這裡可以知道,信号量資源數量不可超過int的最大值
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS的增加state值,CAS成功之後傳回true,否則循環重試
        if (compareAndSetState(current, next))
            return true;
    }
}
           

3 Semaphore的使用

Semaphore可以用來控制多線程對于共享資源通路的并發量!

案例:若一個工廠有5台機器,但是有8個勞工,一台機器同時隻能被一個勞工使用,隻有使用完了,其他勞工才能繼續使用,每個勞工之多工作10秒,最後統計工作量。

我們可以通過Semaphore與之前的CountDownLatch搭配線程池來輕松實作。我們能發現,采用非公平模式的Semaphore時勞工的總工作量大部分情況下要高于采用公平模式的勞工總工作量,即非公平模式的執行效率更高(這是不一定的)。我們還能發現,在非公平模式勞工的總工作量高于公平模式的勞工總工作量時,非公平模式下總會有某些勞工工(特别是勞工0、1、2)作量更多,而另一些勞工工作量更少,這就是線程饑餓!

/**
 * @author lx
 */
public class SemaphoreTest {

    /**
     * 機器數目,實際上就是信号量為5,非公平模式
     */
    private static Semaphore semaphore = new Semaphore(5, false);
    /**
     * 機器數目,實際上就是信号量為5,公平模式
     */
    //private static Semaphore semaphore = new Semaphore(5, true);

    /**
     * 當所有勞工都完成任務,那麼統計工作量
     */
    private static CountDownLatch countDownLatch = new CountDownLatch(10);

    /**
     * 勞工數目,8
     */
    private static final int NUM = 10;

    /**
     * 目前時間
     */
    private static final long NOW = System.nanoTime();

    /**
     * 納秒機關
     */
    private static final long NANOUNIT = 1000000000;

    /**
     * 工作量
     */
    private static final LongAdder WORKLOAD = new LongAdder();


    static class Worker implements Runnable {
        public Worker(int num) {
            this.num = num;
        }

        private int num;
        private long timed = 20 * NANOUNIT;

        @Override
        public void run() {
            while (true) {
                //擷取信号量
                try {
                    if (semaphore.tryAcquire(timed, TimeUnit.NANOSECONDS)) {
                        System.out.println("勞工" + this.num + "占用一個機器在生産...");
                        //占用一定時間
                        LockSupport.parkNanos((long) (NANOUNIT * num * 0.5));
                        //統一調整為2秒,将會看到更明顯的Semaphore效果
                        //LockSupport.parkNanos((long) (NANOUNIT * 2));

                        System.out.println("勞工" + this.num + "生産完畢,釋放出機器");
                        //釋放信号量
                        //每個勞工最多執行20秒
                        WORKLOAD.increment();
                        if ((timed = timed - (System.nanoTime() - NOW)) <= 0) {
                            semaphore.release();
                            countDownLatch.countDown();
                            break;
                        }
                        semaphore.release();
                    } else {
                        countDownLatch.countDown();
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < NUM; i++) {
            executorService.execute(new Worker(i));
        }
        executorService.shutdown();
        countDownLatch.await();
        System.out.println("工作完畢,空閑機器為:" + semaphore.availablePermits());
        System.out.println("總工作量為:" + WORKLOAD.sum());
    }
}
           

4 Semaphore的總結

Semaphore和CountDownLatch的原理都差不多,都是直接使用AQS的共享模式實作自己的邏輯,都是對于AQS的state資源的利用,但是它們卻實作了不同的功能,CountDownLatch中state被看作一個倒計數器,當state變為0時,表示線程可以放開執行。而Semaphore中的state被看作信号量資源,擷取不到資源則可能會阻塞,擷取到資源則可以通路共享區域,共享區域使用完畢要記得還回信号量。

很明顯Semaphore的信号量資源很像鎖資源,但是我們前面就說過他們的不同,那就是鎖資源是和獲得鎖的線程綁定的,而這裡的信号量資源并沒有和線程綁定,也就是說你可以讓一些線程不停的“釋放信号量”,而另一些線程隻是不停的“擷取信号量”,這在AQS内部實際上就是對state狀态的值的改變而已,與線程無關!

通常Semaphore可以用來控制多線程對于共享資源通路的并發量,在上面的案例中我們就見過!另外還需要注意的是,如果在AQS的同步隊列中隊頭結點線程需要擷取n個資源,目前有m個資源,如果m小于n,那麼這個隊列中的頭結點線程以及後面的所有結點線程都會因為不能擷取到資源而繼續阻塞,即使頭結點後面的結點中的線程所需的資源數量小于m也不行。即已經在AQS同步隊列中阻塞的線程,隻能按照先進先出的順序去擷取資源,如果頭部線程因為所需資源數量不夠而一直阻塞,那麼隊列後面的線程必定不能擷取資源!

和CountDownLatch一樣,Semaphore的源碼看起來非常簡單,那是因為複雜的線程等待、喚醒機制都被AQS實作了,如果想要真正了解Semaphore的原理,那麼AQS是必須要了解的。實際上如果學會了AQS,那麼JUC中的鎖或者其他同步元件就很簡單了!

相關文章:

  1. JUC文章
  2. Java CountDownLatch 閉鎖的源碼深度解析與應用
如有需要交流,或者文章有誤,請直接留言。另外希望點贊、收藏、關注,我将不間斷更新各種Java學習部落格!