天天看點

Semaphore,信号量,限流操作

Semaphore

使用場景

  • 停車場

特點

  1. semaphore 也就是我們常說的信号燈,semaphore 可以控制同時通路的線程個數,通過 acquire 擷取一個許可,如果沒有就等待,通過 release 釋放一個許可。

code

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SemaphoreDemo {

    // 20個使用者同時通路
    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        // 信号量:限流,同一時刻隻允許有permits個線程(使用者)同時通路
        int permits = 2;
        final Semaphore semaphore = new Semaphore(permits);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
//                    acquire(semaphore, threadNum);
//                    acquireMore(semaphore, threadNum);
                    tryAcquire(semaphore, threadNum);
//                    tryAcquireWithTime(semaphore, threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    /**
     * 擷取一個許可
     *
     * @param semaphore
     * @param threadNum
     * @throws Exception
     */
    private static void acquire(Semaphore semaphore, int threadNum) throws Exception {
        // 擷取一個許可
        semaphore.acquire();
        log.info("{}", threadNum);
        Thread.sleep(1000);
        // 釋放一個許可
        semaphore.release();
    }

    /**
     * 擷取多個個許可
     *
     * @param semaphore
     * @param threadNum
     * @throws Exception
     */
    private static void acquireMore(Semaphore semaphore, int threadNum) throws Exception {
        // 擷取多個個許可
        semaphore.acquire(2);
        log.info("{}", threadNum);
        Thread.sleep(1000);
        // 釋放多個許可
        semaphore.release(2);
    }

    /**
     * 嘗試擷取一個許可,擷取不到則跳過目前包裹的代碼
     *
     * @param semaphore
     * @param threadNum
     * @throws Exception
     */
    private static void tryAcquire(Semaphore semaphore, int threadNum) throws Exception {
        // 嘗試擷取一個許可
        if (semaphore.tryAcquire()) {
            log.info("{}", threadNum);
            Thread.sleep(1000);
            // 釋放一個許可
            semaphore.release();
        } else {
            // 擷取不到許可
            log.info("擷取不到許可:{}", threadNum);
        }
    }

    /**
     * 嘗試擷取一個許可(帶逾時時間),給定時間内擷取不到則跳過目前包裹的代碼
     *
     * @param semaphore
     * @param threadNum
     * @throws Exception
     */
    private static void tryAcquireWithTime(Semaphore semaphore, int threadNum) throws Exception {
        // 嘗試擷取一個許可(帶逾時時間)
        if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試擷取一個許可
            log.info("{}", threadNum);
            Thread.sleep(1000);
            // 釋放一個許可
            semaphore.release();
        } else {
            // 擷取不到許可
            log.info("擷取不到許可:{}", threadNum);
        }
    }
}