天天看點

AQS源碼探究_09 Semaphore源碼分析

1、簡介

Semaphore,信号量,它儲存了一系列的許可(permits),每次調用acquire()都将消耗一個許可,每次調用release()都将歸還一個許可。

Semaphore通常用于限制同一時間對共享資源的通路次數上,也就是常說的限流。

Semaphore信号量,擷取通行證流程圖:

AQS源碼探究_09 Semaphore源碼分析
AQS源碼探究_09 Semaphore源碼分析

2、入門案例

案例1:Pool.java

/**
 * date: 2021/5/10
 * @author csp
 */
public class Pool {
    /**
     * 可同時通路資源的最大線程數
     */
    private static final int MAX_AVAILABLE = 100;
    
    /**
     * 信号量 表示:可擷取的對象通行證
     */
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
    /**
     * 共享資源,可以想象成 items 數組記憶體儲的都是Connection對象 模拟是連接配接池
     */
    protected Object[] items = new Object[MAX_AVAILABLE];
    
    /**
     * 共享資源占用情況,與items數組一一對應,比如:
     * items[0]對象被外部線程占用,那麼 used[0] == true,否則used[0] == false
     */
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    /**
     * 擷取一個空閑對象
     * 如果目前池中無空閑對象,則等待..直到有空閑對象為止
     */
    public Object getItem() throws InterruptedException {
        // 每次調用acquire()都将消耗一個許可(permits)
        available.acquire();
        return getNextAvailableItem();
    }

    /**
     * 歸還對象到池中
     */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    /**
     * 擷取池内一個空閑對象,擷取成功則傳回Object,失敗傳回Null
     * 成功後将對應的 used[i] = true
     */
    private synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null;
    }

    /**
     * 歸還對象到池中,歸還成功傳回true
     * 歸還失敗:
     * 1.池中不存在該對象引用,傳回false
     * 2.池中存在該對象引用,但該對象目前狀态為空閑狀态,也傳回false
     */
    private synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }
}
      

案例2:SemaphoreTest02.java

/**
 * date: 2020/5/10
 * @author csp
 */
public class SemaphoreTest02 {
    public static void main(String[] args) throws InterruptedException {
        // 聲明信号量,初始的許可(permits)為2
        // 公平模式:fair為true
        final Semaphore semaphore = new Semaphore(2, true);

        Thread tA = new Thread(() ->{
            try {
                // 每次調用acquire()都将消耗一個許可(permits)
                semaphore.acquire();
                System.out.println("線程A擷取通行證成功");
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
            }finally {
                // 每次調用release()都将歸還一個許可(permits)
                semaphore.release();
            }
        });
        tA.start();
        // 確定線程A已經執行
        TimeUnit.MILLISECONDS.sleep(200);

        Thread tB = new Thread(() ->{
            try {
                // 調用acquire(2)都将消耗2個許可(permits)
                semaphore.acquire(2);
                System.out.println("線程B擷取通行證成功");
            } catch (InterruptedException e) {
            }finally {
                // 調用release(2)都将歸還2個許可(permits)
                semaphore.release(2);
            }
        });
        tB.start();
        // 確定線程B已經執行
        TimeUnit.MILLISECONDS.sleep(200);

        Thread tC = new Thread(() ->{
            try {
                // 每次調用acquire()都将消耗一個許可(permits)
                semaphore.acquire();
                System.out.println("線程C擷取通行證成功");
            } catch (InterruptedException e) {
            }finally {
                // 每次調用release()都将歸還一個許可(permits)
                semaphore.release();
            }
        });
        tC.start();
    }
}
      

執行結果:

線程A擷取通行證成功
線程B擷取通行證成功
線程C擷取通行證成功
      

3、源碼分析

内部類Sync

通過Sync的幾個實作方法,我們擷取到以下幾點資訊:

許可是在構造方法時傳入的;

許可存放在狀态變量state中;

嘗試擷取一個許可的時候,則state的值減1;

當state的值為0的時候,則無法再擷取許可;

釋放一個許可的時候,則state的值加1;

許可的個數可以動态改變;

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    // 構造方法,傳入許可次數,放入state中
    Sync(int permits) {
        setState(permits);
    }

    // 擷取許可次數
    final int getPermits() {
        return getState();
    }

    // 非公平模式嘗試擷取許可
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 先看看還有幾個許可
            int available = getState();
            // 減去這次需要擷取的許可還剩下幾個許可
            int remaining = available - acquires;
            // 如果剩餘許可小于0了則直接傳回
            // 如果剩餘許可不小于0,則嘗試原子更新state的值,成功了傳回剩餘許可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // 釋放許可
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 先看看還有幾個許可
            int current = getState();
            // 加上這次釋放的許可
            int next = current + releases;
            // 檢測溢出
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 如果原子更新state的值成功,就說明釋放許可成功,則傳回true
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 減少許可
    final void reducePermits(int reductions) {
        for (;;) {
            // 先看看還有幾個許可
            int current = getState();
            // 減去将要減少的許可
            int next = current - reductions;
            // 檢測溢出
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // 原子更新state的值,成功了傳回true
            if (compareAndSetState(current, next))
                return;
        }
    }
    
    // 銷毀許可
    final int drainPermits() {
        for (;;) {
            // 先看看還有幾個許可
            int current = getState();
            // 如果為0,直接傳回
            // 如果不為0,把state原子更新為0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
      

内部類NonfairSync

非公平模式下,直接調用父類的

nonfairTryAcquireShared()

嘗試擷取許可。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    
    // 構造方法,調用父類的構造方法
    NonfairSync(int permits) {
        super(permits);
    }
    // 嘗試擷取許可,調用父類的nonfairTryAcquireShared()方法
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
      

内部類FairSync

公平模式下,先檢測前面是否有排隊的,如果有排隊的則擷取許可失敗,進入隊列排隊,否則嘗試原子更新state的值。

**注意:**為了閱讀友善,該内部類中将一些AQS中的方法粘貼過來了,在方法頭注釋加有标注!

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    /**
     * 該方法位于AQS中:
     * 嘗試擷取通行證,擷取成功傳回 >= 0的值;
     * 擷取失敗 傳回 < 0 值
     */
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 判斷目前 AQS 阻塞隊列内 是否有等待者線程,如果有直接傳回-1,表示目前aquire操作的線程需要進入到隊列等待..
            if (hasQueuedPredecessors())
                return -1;
            // 執行到這裡,有哪幾種情況?
            // 1.調用aquire時 AQS阻塞隊列内沒有其它等待者
            // 2.目前節點 在阻塞隊列内是headNext節點

            // 擷取state ,state這裡表示 通行證
            int available = getState();
            // remaining 表示目前線程 擷取通行證完成之後,semaphore還剩餘數量
            int remaining = available - acquires;

            // 條件一:remaining < 0 成立,說明線程擷取通行證失敗..
            // 條件二:前置條件,remaning >= 0, CAS更新state 成功,說明線程擷取通行證成功,CAS失敗,則自旋。
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    /**
     * 該方法位于AQS中:
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 條件成立:說明目前調用acquire方法的線程 已經是 中斷狀态了,直接抛出異常..
        if (Thread.interrupted())
            throw new InterruptedException();

        // 對應業務層面 執行任務的線程已經将latch打破了。然後其他再調用latch.await的線程,就不會在這裡阻塞了
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    /**
     * 該方法位于AQS中:
     */
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 将調用Semaphore.aquire方法的線程 包裝成node加入到 AQS的阻塞隊列當中。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // 擷取目前線程節點的前驅節點
                final Node p = node.predecessor();
                // 條件成立,說明目前線程對應的節點 為 head.next節點
                if (p == head) {
                    // head.next節點就有權利擷取 共享鎖了..
                    int r = tryAcquireShared(arg);


                    // 站在Semaphore角度:r 表示還剩餘的通行證數量
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // shouldParkAfterFailedAcquire  會給目前線程找一個好爸爸,最終給爸爸節點設定狀态為 signal(-1),傳回true
                // parkAndCheckInterrupt 挂起目前節點對應的線程...
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


    /**
     * 該方法位于AQS中:
     * 設定目前節點為 head節點,并且向後傳播!(依次喚醒!)
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 将目前節點設定為 新的 head節點。
        setHead(node);
        // 調用setHeadAndPropagete 時  propagate  == 1 一定成立
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            // 擷取目前節點的後繼節點..
            Node s = node.next;
            // 條件一:s == null  什麼時候成立呢?  目前node節點已經是 tail了,條件一會成立。 doReleaseShared() 裡面會處理這種情況..
            // 條件二:前置條件,s != null , 要求s節點的模式必須是 共享模式。 latch.await() -> addWaiter(Node.SHARED)
            if (s == null || s.isShared())
                // 基本上所有情況都會執行到 doReleasseShared() 方法。
                doReleaseShared();
        }
    }

    //AQS.releaseShared 該方法位于AQS中:
    public final boolean releaseShared(int arg) {
        // 條件成立:表示目前線程釋放資源成功,釋放資源成功後,去喚醒擷取資源失敗的線程..
        if (tryReleaseShared(arg)) {
            // 喚醒擷取資源失敗的線程...
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     * 喚醒擷取資源失敗的線程
     *
     * CountDownLatch版本
     * 都有哪幾種路徑會調用到doReleaseShared方法呢?
     * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 喚醒目前阻塞隊列内的 head.next 對應的線程。
     * 2.被喚醒的線程 -> doAcquireSharedInterruptibly parkAndCheckInterrupt() 喚醒 -> setHeadAndPropagate() -> doReleaseShared()
     *
     * Semaphore版本
     * 都有哪幾種路徑會調用到doReleaseShared方法呢?
     *
     */
    //AQS.doReleaseShared 該方法位于AQS中:
    private void doReleaseShared() {
        for (;;) {
            // 擷取目前AQS 内的 頭結點
            Node h = head;
            // 條件一:h != null 成立,說明阻塞隊列不為空..
            // 不成立:h == null 什麼時候會是這樣呢?
            // latch建立出來後,沒有任何線程調用過 await() 方法之前,有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯..

            // 條件二:h != tail 成立,說明目前阻塞隊列内,除了head節點以外  還有其他節點。
            // h == tail  -> head 和 tail 指向的是同一個node對象。 什麼時候會有這種情況呢?
            // 1. 正常喚醒情況下,依次擷取到 共享鎖,目前線程執行到這裡時 (這個線程就是 tail 節點。)
            // 2. 第一個調用await()方法的線程 與 調用countDown()且觸發喚醒阻塞節點的線程 出現并發了..
            //   因為await()線程是第一個調用 latch.await()的線程,此時隊列内什麼也沒有,它需要補充建立一個Head節點,然後再次自旋時入隊
            //   在await()線程入隊完成之前,假設目前隊列内 隻有 剛剛補充建立的空元素 head 。
            //   同期,外部有一個調用countDown()的線程,将state 值從1,修改為0了,那麼這個線程需要做 喚醒 阻塞隊列内元素的邏輯..
            //   注意:調用await()的線程 因為完全入隊完成之後,再次回到上層方法 doAcquireSharedInterruptibly 會進入到自旋中,
            //   擷取目前元素的前驅,判斷自己是head.next, 是以接下來該線程又會将自己設定為 head,然後該線程就從await()方法傳回了...
            if (h != null && h != tail) {
                // 執行到if裡面,說明目前head 一定有 後繼節點!

                int ws = h.waitStatus;
                // 目前head狀态 為 signal 說明 後繼節點并沒有被喚醒過呢...
                if (ws == Node.SIGNAL) {
                    // 喚醒後繼節點前 将head節點的狀态改為 0
                    // 這裡為什麼,使用CAS呢? 回頭說...
                    // 當doReleaseShared方法 存在多個線程 喚醒 head.next 邏輯時,
                    // CAS 可能會失敗...
                    // 案例:
                    // t3 線程在if(h == head) 傳回false時,t3 會繼續自旋. 參與到 喚醒下一個head.next的邏輯..
                    // t3 此時執行到 CAS WaitStatus(h,Node.SIGNAL, 0) 成功.. t4 在t3修改成功之前,也進入到 if (ws == Node.SIGNAL) 裡面了,
                    // 但是t4 修改 CAS WaitStatus(h,Node.SIGNAL, 0) 會失敗,因為 t3 改過了...
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 喚醒後繼節點
                    unparkSuccessor(h);
                }

                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }

            // 條件成立:
            // 1.說明剛剛喚醒的 後繼節點,還沒執行到 setHeadAndPropagate方法裡面的 設定目前喚醒節點為head的邏輯。
            // 這個時候,目前線程 直接跳出去...結束了..
            // 此時用不用擔心,喚醒邏輯 在這裡斷掉呢?、
            // 不需要擔心,因為被喚醒的線程 早晚會執行到doReleaseShared方法。

            // 2.h == null  latch建立出來後,沒有任何線程調用過 await() 方法之前,
            // 有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯..
            // 3.h == tail  -> head 和 tail 指向的是同一個node對象

            // 條件不成立:
            // 被喚醒的節點 非常積極,直接将自己設定為了新的head,此時 喚醒它的節點(前驅),執行h == head 條件會不成立..
            // 此時 head節點的前驅,不會跳出 doReleaseShared 方法,會繼續喚醒 新head 節點的後繼...
            if (h == head)                   // loop if head changed
                break;
        }
    }
}
      

構造方法

建立Semaphore時需要傳入許可次數。Semaphore預設也是非公平模式,但是你可以調用第二個構造方法聲明其為公平模式。

// 構造方法,建立時要傳入許可次數,預設使用非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// 構造方法,需要傳入許可次數,及是否公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
      

acquire()方法

擷取一個許可,預設使用的是可中斷方式,如果嘗試擷取許可失敗,會進入AQS的隊列中排隊。

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 擷取一個許可,非中斷方式,如果嘗試擷取許可失敗,會進入AQS的隊列中排隊。
public void acquireUninterruptibly() {
     sync.acquireShared(1);
}
      

cquire(int permits)方法

一次擷取多個許可,可中斷方式。

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

// 一次擷取多個許可,非中斷方式。
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
      

tryAcquire()方法

嘗試擷取一個許可,使用Sync的非公平模式嘗試擷取許可方法,不論是否擷取到許可都傳回,隻嘗試一次,不會進入隊列排隊。

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

// 嘗試擷取一個許可,先嘗試一次擷取許可,如果失敗則會等待timeout時間,這段時間内都沒有擷取到許可,則傳回false,否則傳回true;
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
      

elease()方法

釋放一個許可,釋放一個許可時state的值會加1,并且會喚醒下一個等待擷取許可的線程。

public void release() {
    sync.releaseShared(1);
}
      

release(int permits)方法

一次釋放多個許可,state的值會相應增加permits的數量。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
      

4、小結

Semaphore,也叫信号量,通常用于控制同一時刻對共享資源的通路上,也就是限流場景;

Semaphore的内部實作是基于AQS的共享鎖來實作的;

Semaphore初始化的時候需要指定許可的次數,許可的次數是存儲在state中;

擷取一個許可時,則state值減1;

釋放一個許可時,則state值加1;