天天看點

1.3.1 AQS抽象隊列同步器詳解

什麼是AQS

AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于實作基于FIFO等待隊列的阻塞鎖和相關的同步器的一個同步架構。這個抽象類被設計為作為一些可用原子int值來表示狀态的同步器的基類。如果你有看過類似 CountDownLatch 類的源碼實作,會發現其内部有一個繼承了 AbstractQueuedSynchronizer 的内部類 Sync 。可見 CountDownLatch 是基于AQS架構來實作的一個同步器.類似的同步器在JUC下還有不少。(eg. Semaphore )

AQS用法

如上所述,AQS管理一個關于狀态資訊的單一整數,該整數可以表現任何狀态。比如, Semaphore 用它來表現剩餘的許可數,ReentrantLock 用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀态(尚未開始、運作、完成和取消)

如JDK的文檔中所說,使用AQS來實作一個同步器需要覆寫實作如下幾個方法,并且使用

getState

,

setState

,

compareAndSetState

這幾個方法來設定擷取狀态

  1. boolean tryAcquire(int arg)

  2. boolean tryRelease(int arg)

  3. int tryAcquireShared(int arg)

  4. boolean tryReleaseShared(int arg)

  5. boolean isHeldExclusively()

    以上方法不需要全部實作,根據擷取的鎖的種類可以選擇實作不同的方法

J.U.C是基于AQS(

AbstractQueuedSynchronizer

)實作的,AQS是一個同步器,設計模式是模闆模式。

核心資料結構:雙向連結清單 + state(鎖狀态)

底層操作:CAS

1.3.1 AQS抽象隊列同步器詳解

首先,我們根據API的方法功能,由我們前面階段學習的知識進行一個自己定義的AQS,來加深印象。

// 抽象隊列同步器
// state, owner, waiters
public class kfAqs {
    // acquire、 acquireShared : 定義了資源争用的邏輯,如果沒拿到,則等待。
    // tryAcquire、 tryAcquireShared : 實際執行占用資源的操作,如何判定一個由使用者具體去實作。
    // release、 releaseShared : 定義釋放資源的邏輯,釋放之後,通知後續節點進行争搶。
    // tryRelease、 tryReleaseShared: 實際執行資源釋放的操作,具體的AQS使用者去實作。

    // 1、 如何判斷一個資源的擁有者
    public volatile AtomicReference<Thread> owner = new AtomicReference<>();
    // 2、 儲存正在等待的線程
    public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    // 3、 記錄資源的狀态
    public volatile AtomicInteger state = new AtomicInteger(0);

    // 共享資源占用的邏輯,傳回資源的占用情況
    public int tryAcquireShared(){
        throw new UnsupportedOperationException();
    }
    
    public void acquireShared(){
        boolean addQ = true;
        while(tryAcquireShared() < 0) {
            if (addQ) {
                // 沒拿到鎖,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起目前的線程,不要繼續往下跑了
                LockSupport.park(); // 僞喚醒,就是非unpark喚醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把線程移除
    }

    // 共享資源釋放的邏輯,傳回資源是否已釋放
    public boolean tryReleaseShared(){
        throw new UnsupportedOperationException();
    }

    public void releaseShared(){
        if (tryReleaseShared()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 喚醒
            }
        }
    }

    // 獨占資源相關的代碼

    public boolean tryAcquire() { // 交給使用者去實作。 模闆方法設計模式
        throw new UnsupportedOperationException();
    }

    public void acquire() {
        boolean addQ = true;
        while (!tryAcquire()) {
            if (addQ) {
                // 沒拿到鎖,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起目前的線程,不要繼續往下跑了
                LockSupport.park(); // 僞喚醒,就是非unpark喚醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把線程移除
    }

    public boolean tryRelease() {
        throw new UnsupportedOperationException();
    }

    public void release() { // 定義了 釋放資源之後要做的操作
        if (tryRelease()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 喚醒
            }
        }
    }

    public AtomicInteger getState() {
        return state;
    }

    public void setState(AtomicInteger state) {
        this.state = state;
    }
}
           

資源占用流程圖

1.3.1 AQS抽象隊列同步器詳解

源碼解析

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;// 等待逾時或被中斷
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;// 釋放鎖之後,是否通知後一個節點
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;// 處于等待隊列中,結點的線程等待在Condition上
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;// 共享模式中使用,線程處于可運作狀态
  //核心資料結構:雙向連結清單 + state(鎖狀态)
           
//資源争用的邏輯
public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 判斷是否拿到鎖
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
static void selfInterrupt() {
        Thread.currentThread().interrupt();
}
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false; // 目前線程釋放中斷的标志位
            for (;;) {// 不斷嘗試
                final Node p = node.predecessor(); // 擷取前一個節點
                if (p == head && tryAcquire(arg)) { // 如果前一個節點是head,嘗試搶一次鎖
                    setHead(node); // 更換head
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&// 檢查狀态,是否需要挂起線程
                    parkAndCheckInterrupt())// 如果需要挂起,則通過Park進入停車場挂起
                    interrupted = true; // 如果出現中斷,則修改标記
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
           
//資源釋放的邏輯
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head; // 從頭開始找
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 喚醒下一個線程
            return true;
        }
        return false;
}
protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
}
/** 喚醒等待者
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; // 正在釋放鎖的線程節點狀态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); // 修改目前節點狀态

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; // 找下一個節點
        if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,繼續尋找合适的下一個節點
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) // 如果找到了合适的節點,就喚醒它
            LockSupport.unpark(s.thread);
    }
           

繼續閱讀