天天看點

JUC 提供的限流利器-Semaphore(信号量)

在 JUC 包下,有一個 Semaphore 類,翻譯成信号量,Semaphore(信号量)是用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。Semaphore 跟鎖(synchronized、Lock)有點相似,不同的地方是,鎖同一時刻隻允許一個線程通路某一資源,而 Semaphore 則可以控制同一時刻多個線程通路某一資源。

Semaphore(信号量)并不是 Java 語言特有的,幾乎所有的并發語言都有。是以也就存在一個「信号量模型」的概念,如下圖所示:

JUC 提供的限流利器-Semaphore(信号量)

信号量模型

信号量模型比較簡單,可以概括為:「一個計數器、一個隊列、三個方法」。

計數器:記錄目前還可以運作多少個資源通路資源。

隊列:待通路資源的線程

「三個方法」:

  • 「init()」:初始化計數器的值,可就是允許多少線程同時通路資源。
  • 「up()」:計數器加1,有線程歸還資源時,如果計數器的值大于或者等于 0 時,從等待隊列中喚醒一個線程
  • 「down()」:計數器減 1,有線程占用資源時,如果此時計數器的值小于 0 ,線程将被阻塞。

這三個方法都是原子性的,由實作方保證原子性。例如在 Java 語言中,JUC 包下的 Semaphore 實作了信号量模型,是以 Semaphore 保證了這三個方法的原子性。

Semaphore 是基于 AbstractQueuedSynchronizer 接口實作信号量模型的。AbstractQueuedSynchronizer 提供了一個基于 FIFO 隊列,可以用于建構鎖或者其他相關同步裝置的基礎架構,利用了一個 int 來表示狀态,通過類似 acquire 和 release 的方式來操縱狀态。關于 AbstractQueuedSynchronizer 更多的介紹,可以點選連結:

❝http://ifeve.com/introduce-abstractqueuedsynchronizer/

AbstractQueuedSynchronizer 在 Semaphore 類中的實作類如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
           

複制

在 Semaphore 類中,實作了兩種信号量:「公平的信号量和非公平的信号量」,公平的信号量就是大家排好隊,先到先進,非公平的信号量就是不一定先到先進,允許插隊。非公平的信号量效率會高一些,是以預設使用的是非公平信号量。具體的可以檢視 Semaphore 類實作源碼。

Semaphore 類中,主要有以下方法:

// 構造方法,參數表示許可證數量,用來建立信号量
public Semaphore(int permits);
// 從信号量中擷取許可,相當于擷取到執行權
public void acquire() throws InterruptedException;
// 嘗試擷取1個許可,不管是否能夠擷取成功,都立即傳回,true表示擷取成功,false表示擷取失敗
public boolean tryAcquire();
// 将許可還給信号量
public void release();
           

複制

Semaphore 類的實作就了解的差不多了。可能你會有疑問 Semaphore 的應用場景是什麼?Semaphore 可以用來限流(流量控制),在一些公共資源有限的場景下,Semaphore 可以派上用場。比如在做日志清洗時,可能有幾十個線程在并發清洗,但是将清洗的資料存入到資料庫時,可能隻給資料庫配置設定了 10 個連接配接池,這樣兩邊的線程數就不對等了,我們必須保證同時隻能有 10 個線程擷取資料庫連結,否則就會存在大量線程無法連結上資料庫。

用 Semaphore 信号量來模拟這操作,代碼如下:

public class SemaphoreDemo {
    /**
     * semaphore 信号量,可以限流
     *
     * 模拟并發資料庫操作,同時有三十個請求,但是系統每秒隻能處理 5 個
     */

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);
	// 初始化信号量,個數為 5
    private static Semaphore s = new Semaphore(5);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 擷取許可
                        s.acquire();
                        System.out.println(Thread.currentThread().getName()+" 完成資料庫操作 ,"+ new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format( new Date()));
                        // 休眠兩秒鐘,效果更直覺
                        Thread.sleep(2000);
                        // 釋放許可
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }
		// 關閉連接配接池
        threadPool.shutdown();
    }
}
           

複制

運作效果如下:

JUC 提供的限流利器-Semaphore(信号量)

圖檔描述

從結果中,可以看出,每秒隻有 5 個線程在執行,這符合我們的預期。

好了,關于 Semaphore 的内容就結束了,更加詳細的還請您查閱相關資料和閱讀 Semaphore 源碼。希望這篇文章對您的學習或者工作有所幫助。

感謝您的閱讀,祝好。