天天看點

JUC之AbstractQueuedSynchronizerAbstractQueuedSynchronizer

AbstractQueuedSynchronizer

從鎖說起

juc.locks包下提供了常見的鎖相關的工具,用來替代synchronized關鍵字(jdk1.5之前synchronized效率較低)和加強一些鎖功能,比如重入鎖,公平非公平鎖,讀寫鎖等等。

AbstractQueuedSynchronizer簡稱aqs是juc.locks包中鎖相關的一個最核心的類,ReentrantLock等鎖相關的實作都離不開它。字面意思"抽象的排隊的同步器"。

先抛開aqs,先嘗試自己實作一個鎖。

public class Lock {
    
   	private AtomicBoolean state = new AtomicBoolean();
    
    public void lock() {
        while (!state.compareAndSet(false, true)) {}
    }
    
    public void unLock() {
        state.set(false);
    }
}
           

如上,我們實作了一個最簡單的鎖,但是這個鎖有個很大的缺點,線程A加完鎖之後,線程B雖然加鎖的時候會被阻塞(while循環),但是線程B卻可以解鎖,即使鎖不是由B加上的。

我們預期的應該是鎖由A加上,隻能由A解鎖。為此,需要稍微改動一下代碼。

public class Lock2 {

    private AtomicBoolean state = new AtomicBoolean();
    
    private Thread ownThread;

    public void lock() {
        while (!state.compareAndSet(false, true)) {}
        ownThread = Thread.currentThread();
    }

    public void unLock() {
        if (ownThread != Thread.currentThread()) {
            state.set(false);
            ownThread = null;
        }
    }
}
           

我們添加了一個變量ownThread用來儲存加鎖的線程。

還有一個問題,鎖是不可重入的,即一個線程隻能加鎖一次,同一個線程再次加鎖同樣會阻塞,導緻程式無法運作。這顯然不是我們想要的。

再優化一下。

public class Lock3 {
    
    private volatile int state;
    
    private AtomicReference<Thread> ownThread = new AtomicReference<>();
    
    public void lock() {
        Thread currentThread = Thread.currentThread();
        if (ownThread.get() != currentThread) {
            while (!ownThread.compareAndSet(null, currentThread)) {}
        } 
        state++;
    }
    
    public void unLock() {
        if (ownThread.get() == Thread.currentThread()) {
            state--;
            if (state == 0) {
                ownThread.set(null);
            }
        } else {
            throw new IllegalMonitorStateException();
        }
    }
    
}
           

我們用state存儲了線程重入的次數,ownThread用來存儲加鎖的線程。這樣貌似比較完美了,但是鎖效率比較低,我們的鎖是通過自旋來實作阻塞的。有沒有更好的辦法?更為高效的做法是線程擷取不到鎖,進入wait狀态。線程解鎖的時候,通知其他線程退出wait狀态,并使被喚醒的線程重新嘗試擷取鎖。我們再優化一下代碼。

public class Lock4 {

    private volatile int state;

    private AtomicReference<Thread> ownThread = new AtomicReference<>();

    private Set<Thread> threadHolder = Collections.synchronizedSet(new HashSet<>());
    
    public void lock() {
        Thread currentThread = Thread.currentThread();
        if (ownThread.get() != currentThread) {
            while (!ownThread.compareAndSet(null, currentThread)) {
                threadHolder.add(currentThread);
                LockSupport.park();
            }
        }
        state++;
    }

    public void unLock() {
        Thread currentThread = Thread.currentThread();
        if (ownThread.get() == currentThread) {
            state--;
            if (state == 0) {
                ownThread.set(null);
                threadHolder.remove(currentThread);
                threadHolder.stream().findAny().ifPresent(LockSupport::unpark);
            }
        } else {
            throw new IllegalMonitorStateException();
        }
    }
    
}
           

這裡用一個Set來儲存所有嘗試加鎖但是失敗并且阻塞的線程,這裡的Set是線程安全的,我們不讨論線程安全的Set是怎麼實作的以及效率問題,這裡隻是為了說明問題。

加鎖失敗的時候我們将目前線程加入集合,并阻塞。釋放鎖的時候,随機擷取集合中的一個線程,喚醒。

還有沒有優化的空間或者缺失的功能呢?

  1. 我們的鎖不是公平鎖

    這個解決起來比較簡單,将Set改成隊列,維持等待線程的順序,喚醒的時候喚醒隊列頭的線程。

  2. 我們的鎖不能快失敗,即擷取不到鎖時立刻傳回false

    這個也比較簡單,擷取不到鎖的時候目前線程不加入隊列即可。

  3. 我們的鎖不支援共享鎖

    實作這個需要我們加入隊列的不隻是目前線程,還要維護一些額外的狀态,比如能否共享等等。

來總結一下要實作所有功能,我們的鎖需要有哪些必要條件。

  • 一個state變量,用來存儲鎖的狀态
  • 一個Thread變量,用來存儲持有鎖的線程資訊
  • 一個隊列,用來存放沒有獲得鎖的線程資訊

    而AbstractQueuedSynchronizer幫我們抽象出了這些條件,提供了一些基本的方法幫助我們操作這些變量,我們可以很容易的通過這些方法實作自己的鎖。

AbstractQueuedSynchronizer源碼解析

JUC之AbstractQueuedSynchronizerAbstractQueuedSynchronizer

可以看到AbstractQueuedSynchronizer繼承于AbstractOwnableSynchronizer,AbstractQueuedLongSynchronizer和AbstractQueuedSynchronizer基本一樣,隻是state變量一個是int型一個是long型,我們以AbstractQueuedLongSynchronizer為例來分析源碼。

AbstractOwnableSynchronizer

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
           

類很簡單,隻有exclusiveOwnerThread一個變量,以及相應的get,set方法,這個變量就對應上邊實作的鎖的ownThread變量,用來存儲得到鎖的線程。

AbstractQueuedSynchronizer

變量

先來看看AbstractQueuedSynchronizer中的變量。

JUC之AbstractQueuedSynchronizerAbstractQueuedSynchronizer

以Offsett結尾的變量是相應變量對應的偏移量,用來做cas操作,不用關注。Unsafe是jdk提供的操作底層的類,這裡主要用來做cas操作,也不用關注。

我們最需要關注的是 head, tail,state變量。

head和tail組成了一個隊列,對應我們自己實作的鎖的隊列。

state對應我們自己實作的鎖的state。

可見aqs和我們實作的鎖很類似,的确是将實作鎖的公共條件抽象出來,來簡化鎖的實作。

head和tail的類型Node是個内部類,先來看看這是個什麼。

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        volatile int waitStatus;
        
        volatile Node prev;
        
        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
           

prev和next變量不必多說,用來組成雙向連結清單進而實作隊列。thread用來存儲等待中的線程的資訊,waitStatus用來存儲目前等待線程的一些額外的一些狀态,這裡可能取值1,0,-1,-2,-3,分别對應CANCELLED,預設,SIGNAL,CONDITION ,PROPAGATE。先不用關注這裡的取值的具體的含義,之後我們再分析,隻需要了解這裡存儲了線程的額外的一些狀态。SHARED,EXCLUSIVE ,nextWaiter這些都是Node類型的變量,暫時也不做分析。

方法

接下來看看AQS提供的方法,AQS方法有很多,先來看幾個基本方法,這些方法是aqs實作的基礎。

/**
     * 通過cas操作來設定頭結點
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    /**
     * 通過cas操來設定尾結點
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
	
	/**
     * 通過cas操作來設定狀态,這裡方法是protected,說明是為了提供給子類調用或者覆寫
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

	/**
     * 通過case操作某個結點的next變量的值
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
	
	/**
     * 通過case操作某個結點的waitStatus
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

	/**
     * 設定頭部結點
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

	/**
     * 設定狀态
     */
    protected final void setState(int newState) {
        state = newState;
    }

	/**
     * 利用上面提供的方法,實作線程安全的入隊操作
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
	
	/**
     * 入隊一個新結點到隊尾,并将新結點的nextWaiter指派為mode
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 這裡先嘗試進行cas操作,如果失敗再進入到複雜的enq(node)方法内,這種做法是為了效率考慮,在aqs的設計中,處處可以看到這些優化。
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
           

接下來回顧一下上面自己實作的鎖,并抽象一下加鎖的步驟。

  1. 嘗試加鎖并将state+1
  2. 加鎖成功完成并退出,加鎖失敗到3
  3. 将目前線程加入隊列,并阻塞目前嘗試加鎖的線程

解鎖的步驟。

  1. 比對目前線程是否是加鎖的線程
  2. 如果不是抛異常,如果是到3
  3. 将state-1,如果state為0到4,否則完成
  4. 從隊列中拿出頭結點對應的線程并喚醒

aqs将加鎖抽象成一個方法 acquire()

public final void acquire(int arg) {
        if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

其中tryAcquire()并沒有具體的實作,交由子類來實作。可以在tryAcquire進行更改state的操作,一般需要在tryAcquire判斷得到鎖的線程是不是目前線程以及更新state

acquireQueued() 不需要子類來實作,aqs已經實作,主要作用在隊列中添加結點,并阻塞。

這兩步這和我們上邊分析出的加鎖的步驟高度一緻。

當沒有擷取到鎖,并且添加等待隊列失敗,假如期間線程被中斷過,會執行selfInterrupt(),重新中斷線程。這裡很容易讓人誤解為acquire()是響應中斷的(假如加鎖的過程中,線程中斷了,立刻終止加鎖操作,并抛出異常),但其實并不是。

我們來看一下selfInterrupt()方法。

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
           

隻是将目前的線程的中斷标緻重新置為已中斷。

接下來帶着兩個疑問來分析acquireQueued()方法,是如何阻塞線程的以及為什麼不響應中斷。

需要明确的是,執行到此方法時,代表嘗試擷取鎖失敗(tryAcquire() == true)并且目前線程對應的Node已經加入到隊尾(addWaiter())。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//獲得目前線程對應結點的前驅結點
                final Node p = node.predecessor();
                //step1 
                //目前線程對應的結點是第二個結點,表明之前結點對應的線程的已經進行過解鎖操作,此時嘗試加鎖,假如成功後,将目前結點置為頭結點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //step2
                //假如目前線程對應的結點不是第二個結點或者沒有搶到鎖(舉例線程A為頭結點獲得鎖,線程B為第二個結點,當線程A釋放鎖的時候,線程C插隊獲得鎖,此時線程B就沒有擷取到鎖),
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
            	//取消
                cancelAcquire(node);
        }
    }
           

接下來依次分析shouldParkAfterFailedAcquire(),parkAndCheckInterrupt()方法。

/**
	* 通過方法名稱很容易判斷出這個方法的作用是為了判斷目前結點對應的線程是不是應該進入等待狀态。
	* 思考一下,什麼情況下線程不能該進入wait狀态?
	* 兩個線程A和B, A獲得鎖,B線程請求鎖失敗,嘗試wait時,A線程進行解鎖操作,并喚醒B線程,B線程此時wait,将永遠無法被喚醒。
	* 
	* 但是此方法并沒有保證上述情況不發生,即(線程在沒有進入wait轉态的情況下被喚醒,之後又進入wait狀态),因為shouldParkAfterFailedAcquire和parkAndCheckInterrupt并不是原子操作
	* 但事實并沒有這種情況,這個之後我們再分析。
	* 這裡我們假定shouldParkAfterFailedAcquire和parkAndCheckInterrupt是原子操作。
	* 
	* 對于這個方法,目前能進入wait狀态條件是,目前結點對應的第一個不是取消狀态的結點[pre]的狀态是SIGNAL,即-1
	* 否則的話,不能進入wait狀态。
	* 
	*/
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
      
        
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 前一個結點的waitStatus是SIGNAL的話,即-1,代表可以阻塞目前線程。
             * 
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             * 目前結點的前一個結點是取消狀态 waitStatus > 0,此時把中間取消的結點“斷掉”。
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             * 到這裡的狀态隻能是0或者PROPAGATE,表明可以進入wait狀态。
             * 為什麼隻能是0或者PROPAGATE,但是卻沒有CONDITION狀态呢,原因是CONDITION狀态根本不會在這個隊列裡出現
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           
/**
	* 這個方法的作用就是使目前線程進入wait狀态,從這裡可以看到并沒有響應中斷,隻是在阻塞被喚醒後傳回了線程的中斷狀态。
	*/
	private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
           

接下來我們來看relese方法,即抽象出來的解鎖方法

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

同樣 tryRelease()是個抽象方法,一般在此方法中進行更改狀态操作,正常情況下都會傳回true,因為解鎖操作隻有目前活動的線程在調用,不存在競态。

unparkSuccessor()用來喚醒隊列中下一個結點。

/**
     * 喚醒線程
     */
    private void unparkSuccessor(Node node) {
        /*
         * 将頭結點狀态更改為0,結合shouldParkAfterFailedAcquire()來看
         * 線程要進入wait的條件,是前一個結點狀态為SIGNAL
         * 當頭結點的狀态為0時,頭結點之後的結點必定不能進入wait狀态
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 找到頭結點之後第一個不是取消狀态的結點
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
           

為了更好的講解,我們模拟兩個線程擷取鎖時,程式的執行流程,擷取鎖就是acquire(1),釋放鎖就是release(1)

JUC之AbstractQueuedSynchronizerAbstractQueuedSynchronizer

下面我們模拟之前提到的線程永遠無法擷取鎖的情況

JUC之AbstractQueuedSynchronizerAbstractQueuedSynchronizer

按照我們的分析,是有可能存線上程永遠阻塞情況,但事實卻沒有這個情況,什麼原因呢?奧妙就在LockSupport.unpark()這個方法上

/**
     * Makes available the permit for the given thread, if it
     * was not already available.  If the thread was blocked on
     * {@code park} then it will unblock.  Otherwise, its next call
     * to {@code park} is guaranteed not to block. This operation
     * is not guaranteed to have any effect at all if the given
     * thread has not been started.
     *
     * @param thread the thread to unpark, or {@code null}, in which case
     *        this operation has no effect
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
           

注釋很明确的說明了原因,假如一個線程沒有被阻塞時調用unpark方法,再調用park方法,該線程是不會被阻塞的。這樣上面的問題也就解釋通了。

結語

AbstractQueuedSynchronizer 本次就分析到這裡,我們隻分析了最核心的兩個方法acquire()和release()兩個方法,涉及到的Node狀态隻有0,SIGNAL(-1), 其他方法的分析之後解決具體的實作類再細說。

繼續閱讀