天天看點

Semaphore分析

摘要: 
一、Semaphore介紹      

        Semaphore意思為信号量,是用來控制同時通路特定資源的線程數數量。它的本質上其實也是一個共享鎖。Semaphore可以用于做流量控制,特别是公用資源有限的應用場景。例如:某個停車場車位隻有10個,一開始停車場沒有車輛所有車位全部空着,然後先後到來八輛車,停車場車位夠,安排進去停車,然後又來三輛,這個時候由于隻有兩個停車位,所有隻能停兩輛,其餘一輛必須在外面候着,直到停車場有空車位,當然以後每來一輛都需要在外面候着。當停車場有車開出去,裡面有空位了,則安排一輛車進去。這個場景就是典型的信号量應用場景。

二、SemaphoreAPI使用介紹

       2.1、構造函數:

  1. Semaphore(int permits) :建立具有給定的許可數和非公平的公平設定的 Semaphore。
  2. Semaphore(int permits, boolean fair) :建立具有給定的許可數和給定的公平設定的 Semaphore。

       2.2、信号量擷取:

       1.  Semaphore提供了acquire()方法來擷取一個許可。

      2.3、信号量釋放:

       1.  Semaphore提供release()來釋放許可。

      2.4、其他方法:

       1.   intavailablePermits():傳回此信号量中目前可用的許可證數。

       2.   intgetQueueLength():傳回正在等待擷取許可證的線程數。

       3.   booleanhasQueuedThreads():是否有線程正在等待擷取許可證。

·      4.   void reducePermits(int reduction):減少reduction個許可證,是個protected方法。

·      5.   Collection getQueuedThreads():傳回所有等待擷取許可證的線程集合,是個protected方

三.源代碼分析      
package com.cloudtravel.common.sourcecode;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * Semaphore是一個線程同步的輔助類,可以維護控制通路目前資源的線程并發個數,并提供了同步機制 ,
 * 使用semaphore可以控制同時通路資源的個數.較多的如可以控制接口的并發個數
 * 對比Mutex :
 *      Mutex一般用來串行化Critical section代碼的通路,即最多同時允許一個線程通路 . 定義比較明顯
 *      Semaphore: 控制資源允許的并發量 , 當其允許的并發量設定為1時,又稱之為Binary Semaphore
 *  Binary Semaphore同Mutex對比:
 *    作用:兩者在所起到的作用上并沒有什麼差別 , 此時都是為了保證資源可以串行化通路,即都是保證同時隻有一個線程通路資源 .
 *    原理 : 兩者在實作原理上有差別 .
 *      Semaphore: 為非對稱模型,可以了解為,鎖雖然隻有一把,同時也隻能有一把鑰匙開着,但是不同的線程擁有的鑰匙不同.
 *                 semaphore可以了解為一個原子變量 , 信号量通過内部計數器,控制線程的并發量.且可以通過變更信号量[permit]進行靈活的調整
 *      Mutex: 為對稱模型,即一把鎖隻有一把鑰匙,線程之間通過這一把鑰匙控制資源的通路 .也就是通過synchronized安全鎖進行控制的.
 * @author Administrator
 */
public class SemaphoreAnalyse implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /**
     * 通過AbstractQueuedSynchronizer子類實作的所有機制
     */
    private final Sync sync;

    /**
     * Synchronization implementation for SemaphoreAnalyse.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */

    /**
     * 為semaphore定義的鎖抽象類,繼承自AQS,采用CAS算法,
     * 借用AQS中的state控制是否允許通路 .
     * sync為抽象類 . 主要定義了擷取鎖和釋放鎖的非公平鎖方法 .
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        /** 調用AQS中的方法設定目前允許通路的并發數.permits : 建立semaphore執行個體的時候傳進來 */
        Sync(int permits) {
            setState(permits);
        }

        /** 擷取目前可使用的資源并發數 */
        final int getPermits() {
            return getState();
        }

        /** 擷取一定數量的資源數.-非公平鎖版本,傳回目前剩餘可使用的信号量 .  */
        final int nonfairTryAcquireShared(int acquires) {
            for (; ; ) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }

        /** 釋放一定量的資源 */
        @Override
        protected final boolean tryReleaseShared(int releases) {
            for (; ; ) {
                int current = getState();
                int next = current + releases;
                if (next < current)
                {
                    throw new Error("Maximum permit count exceeded");
                }
                if (compareAndSetState(current, next))
                {
                    return true;
                }
            }
        }

        /** 減小信号量,同nonfairTryAcquireShared操作類似.隻是隻能減小,不能傳入負值 */
        final void reducePermits(int reductions) {
            for (; ; ) {
                int current = getState();
                int next = current - reductions;
                if (next > current) {
                    throw new Error("Permit count underflow");
                }
                if (compareAndSetState(current, next)) {
                    return;
                }
            }
        }

        /** 銷毀信号量-将目前可使用信号量改為0.即不允許通路 */
        final int drainPermits() {
            for (; ; ) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /** 非公平鎖版本 */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        /** 構造器 */
        NonfairSync(int permits) {
            super(permits);
        }

        /** 擷取信号量資源 */
        @Override
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * 公平鎖版本
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        /** 擷取資源信号量 目前state -1
         * 對比NonFairSync多了hasQueuedPredecessors()的判斷
         * hasQueuedPredecessors()判斷目前線程是否需要排隊.并将目前線程加入到隊列中
         * 隊列遵循FIFO[先進先出]的原則,若目前隊列中無等待線程,則傳回false表示不需要排隊,
         * 若存在等待線程,說明目前可用信号量已耗盡 , 需要排隊,傳回true.
         * */
        protected int tryAcquireShared(int acquires) {
            for (; ; ) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /**
     * 構造器-預設非公平鎖版本
     * @param permits
     */
    public SemaphoreAnalyse(int permits) {
        sync = new NonfairSync(permits);
    }

    /** 構造器,可自定義選擇公平鎖還是非公平鎖 */
    public SemaphoreAnalyse(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * 擷取一個信号量, 目前state -1
     * 其核心實作為AQS的doAcquireSharedInterruptibly()方法
     * 若目前信号量資源耗盡,則會中斷響應
     * */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 擷取信号量,目前state -1
     * 若目前信号量資源耗盡,則響應等待直到擷取到資源 . 采用自旋鎖
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * 擷取一個信号量 . 目前state -1
     * 若目前還有資源可用 . 傳回true . 若無,傳回false
     * @return
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 擷取一個信号量 . 目前state -1
     * 若目前還有資源可用 . 傳回true . 若無,傳回false . 可自定義逾時時間,逾時中斷響應
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /** 釋放一個資源 . 目前state + 1 */
    public void release() {
        sync.releaseShared(1);
    }

    /** 擷取一定量的信号量資源 . 若目前信号量耗盡或者無法滿足,則中斷響應 */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * 從該信号量分析擷取給定數量的資源,直到所有許可證都可用為止。
     * 擷取給定數量的許可證(如果可用),并立即傳回,将可用許可證數量減少給定數量。
     * 如果沒有足夠的信号量可用,則目前線程出于線程排程目的将被禁用,并處于休眠狀态,
     * 直到其他線程調用此信号量分析的某個釋放方法,目前線程下一個将被配置設定許可證,并且可用許可證的數量滿足此請求。
     * 如果目前線程在等待許可證時被中斷,那麼它将繼續等待,并且它在隊列中的位置不受影響。當線程确實從此方法傳回時,将設定其中斷狀态。
     * @param permits
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * 僅當調用時所有許可證都可用時,從該信号量分析擷取給定數量的許可證。
     * 擷取給定數量的許可證(如果可用),并立即傳回,傳回值為true,将可用許可證數量減少給定數量。
     * 如果沒有足夠的許可證可用,則此方法将立即傳回值false,并且可用許可證的數量不變。
     * 即使此信号量分析已設定為使用公平排序政策,對tryAcquire的調用也将立即獲得許可證(如果有),
     * 無論其他線程目前是否正在等待。這種“讨價還價”行為在某些情況下是有用的,即使它破壞了公平性。
     * 如果您想遵守公平性設定,那麼使用幾乎相等的tryAcquire(允許,0,TimeUnit.SECONDS)(它還檢測中斷)。
     * @param permits
     * @return
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * 以共享定時模式擷取信号量,逾時中斷響應
     * @param permits
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * 釋放給定數量的許可證
     * @param permits
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    /**
     * 傳回目前可用的信号量
     * @return
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * 将目前可用信号量全部占用
     * @return
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 按指定的減少量縮小可用許可證的數量。此方法在使用信号量分析跟蹤不可用資源的子類中非常有用。
     * 此方法與acquire的不同之處在于,它不會發生線程阻塞
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /** 目前是否為公平鎖 */
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * 目前是否有線程在隊列中等待
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * 擷取等待線程的隊列的長度.即傳回有多少個等待線程
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * 擷取隊列中的所有等待線程 AQS内自帶的方法
     * @return the collection of threads
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    /**
     * toString
     * @return a string identifying this SemaphoreAnalyse, as well as its state
     */
    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}