天天看點

Semaphore源碼詳解

Semaphore簡介

Semaphore就是我們常說的信号量,本質就是基于AQS的一個共享鎖。對AQS不太了解的可以看我之前寫的AQS源碼解析的文章AQS源碼詳細分析,讓你掌握AQS原理,獨占鎖、共享鎖、Condition

Semaphore常常被用作限流器,通過共享鎖對資源進行限制。

Semaphore結構

Semaphore源碼詳解

如上圖所示,Semaphore實作了非公平鎖和公平鎖兩個模式。

Semaphore示例

class Pool {
    private static final int MAX_AVAILABLE = 100;
    // 初始化一個信号量,設定為公平鎖模式,總資源數為100個
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    public Object getItem() throws InterruptedException {
        // 擷取一個信号量
        available.acquire();
        return getNextAvailableItem();
    }

    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

   
    protected Object[] items = ...whatever kinds of items being managed
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    protected synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null; 
    }

    protected synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }

}
           

以上代碼的場景是池子中有100個資源,線程可以單獨申請其中一個,當申請不到的時候會被挂起等待。

Semaphore源碼解析

1、初始化

public Semaphore(int permits) {
    //permits為同一時刻容納的最大線程數
    //預設調用了非公平鎖
    sync = new NonfairSync(permits);
}
//也可以在初始化方法中傳入fair
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

           

2、acquire()

//申請鎖,可響應中斷
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())//如果線程被中斷,抛出異常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//小于0代表申請失敗
        doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //将state-=acquires
        int available = getState();
        int remaining = available - acquires;
        //如果remaining<0直接傳回
        //如果CAS成功,傳回
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

//doAcquireSharedInterruptibly我們在AQS源碼詳解中,已經詳細說過了,不在展開說了
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //加入鎖queue中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //如果在queue中的第二個節點,嘗試申請鎖
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                   //申請鎖成功後,就将node移出queue
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //将線程挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

3、release()

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//如果釋放鎖成功
        doReleaseShared();//喚醒其他等待線程來争奪鎖
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    //cas将state+=releases
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

//doReleaseShared()用來喚醒其他線程,我們在AQS源碼詳解中,已經詳細說過了,不在展開說了
private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

           

Semaphore模式解析

方法 詳解
tryAcquire() 嘗試擷取鎖,隻擷取一次,申請不到就拉倒,搶鎖期間不響應中斷
tryAcquire(long timeout,TimeUnit unit) 在timeout個unit時間内,申請鎖,申請不到就挂起,搶鎖期間可響應中端
acquire() 阻塞式申請鎖,申請不到就挂起,搶鎖期間可響應中斷
acquireUninterruptibly 阻塞式申請鎖,申請不到就挂起,搶鎖期間不響應中斷,在搶鎖成功後才響應中斷