天天看點

【并發】AQS源碼分析

AQS全稱是AbstractQueuedSynchronizer,是JDK提供的一個同步器設計架構,很多并發資料結構如ReentrantLock、ReentrantReadWriteLock、Semaphore等都是基于AQS來實作的,下面來分析一下AQS的原理。

一、底層資料結構

AQS底層維護了一個state(代表共享資源)和一個CLH隊列(代表線程等待隊列)

state:state是一個volatile int型變量,用以表示共享資源的使用情況。

head、tail:分别表示CLH隊列的頭節點和尾節點。關于CLH隊列原理可以看這篇部落格:

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;
           

AQS定義了兩種資源:

一種是Exclusive(獨占方式,同一時間隻有一個線程能夠通路,比如ReentrantLock);

一種是Share(共享方式,同一時間允許多個線程通路,比如Semaphore)。

二、AQS原理

AQS被定義為一個同步器架構,那麼為什麼說AQS是一個“架構”呢,架構的定義應該是使用者根據架構的要求去實作一些業務邏輯,然後底層由架構自己去完成,比如使用Hadoop大資料架構的時候,我們隻需要定義好Map/Reduce函數就可以了,其他複雜的底層操作就有Hadoop幫我們完成。AQS也應該是這樣一個東西,事實上,AQS也确實是這樣做的,它隻需要開發者實作其中的幾個函數,其他的底層操作比如CLH隊列操作、CAS自旋等等都是由AQS架構來實作。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
           

上面的幾個方法就是AQS要我們實作的函數,這幾個方法都是AQS提供給開發者編寫代碼邏輯的,是以每個方法裡面都定義為直接抛出UnsupportedOperationException,下面解釋一下這幾個函數的作用。

tryAcquire:嘗試擷取獨占資源。成功傳回true,失敗傳回false。

tryRelease:嘗試釋放獨占資源。成功傳回true,失敗傳回false。

tryAcquireShared:嘗試擷取共享資源。成功傳回一個非負數,代表剩餘資源(0就表示沒有可用資源),負數表示失敗。

tryReleaseShared:嘗試釋放共享資源。成功傳回true,失敗傳回false。

isHeldExclusively:判斷線程是否正在獨占資源。

對于獨占方式的同步器,隻需要實作tryAcquire-tryRelease;而對于共享方式的同步器則隻需要實作tryAcquireShared-tryReleaseShared就行了,當然如果自定義同步器需要同時實作獨占和共享方式的話也可以,比如讀寫鎖ReentrantReadWriteLock,其中讀鎖是共享的,寫鎖是獨占的。

三、源碼解析

AQS中對于資源擷取的頂層接口是acquire-release和acquireShare-releaseShare,下面從這些頂層接口來分析:

1、acquire

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

代碼邏輯:

(1)嘗試擷取資源(tryAcquire),成功則傳回,失敗則跳轉到(2)

(2)請求将該線程加入線程等待隊列的尾部,并标記為獨占模式(addWaiter(Node.EXCLUSIVE)),并不斷請求隊列智鬥擷取到資源,如果該過程被打斷,則跳轉到(3),失敗則傳回。

(3)自我中斷(selfInterrupt)

(4)完成

2、release

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

代碼邏輯:

(1)嘗試釋放資源,如果成功則跳轉到(2),失敗則傳回false

(2)擷取線程等待隊列的頭節點,如果頭節點線程不為空且處于等待狀态,則喚醒該線程(unparkSuccessor)

3、acquireShared

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
           

代碼邏輯:

(1)嘗試擷取共享資源,如果擷取失敗則調用doAcquireShared自旋來請求共享資源。

4、releaseShared

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
           

代碼邏輯:

(1)嘗試釋放共享資源,釋放成功則喚醒等待隊列中的後繼節點來擷取資源。

5、如果需要響應中斷程式的話,AQS還提供了acquireInterruptibly和acquireSharedInterruptibly函數來響應。

4、實際例子:ReentrantLock解析

一般用AQS來實作同步器主要使用内部類的方式來實作的,ReentrantLock裡面就是一個比較經典的實作,看一下這個内部類的實作源碼:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
           

這裡定義的Sync同步器類仍然是一個抽象類,是因為ReentrantLock同時支援公平鎖和非公平鎖,而這兩種同步方式是不一樣的,是以先定義一個Sync類作為兩種實作的基類,後面會看到具體的公平鎖和非公平鎖的實作。

由于ReentrantLock本身是一個獨享鎖,是以Sync類主要重寫了AQS裡面的tryRelease和isHeldExclusively方法,同時定義了非公平鎖的nonfairTryAcquire方法作為請求資源的方法。

非公平鎖實作:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
           

其實基類Sync完完全全就是一個非公平鎖的實作,是以NonfairSync并沒有新代碼,tryAcquire直接調用nonfairTryAcquire。

公平鎖實作:

static final class FairSync extends Sync {
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
           

公平鎖與非公平鎖的差別也就是資源請求的方式不一樣,是以在基類Sync的基礎上重寫tryAcquire方法,順序喚醒等待隊列中的線程。

基于AQS實作了公平鎖和非公平鎖之後,如何實作上鎖和解鎖的功能也就十分簡單了,源碼直接用一行代碼就實作了:

public void lock() { 
        sync.acquire(1); 
    }

    public void unlock() {
        sync.release(1);
    }