天天看點

AbstractQueuedSynchronizer 筆記AbstractQueuedSynchronizer

AbstractQueuedSynchronizer

AQS是一個内部維護了先進先出隊列+辨別(數字)的模型,辨別使用CAS模式修改,作為多線程工具的基礎元件

AbstractQueuedSynchronizer 筆記AbstractQueuedSynchronizer

屬性

屬性名 說明
volatile Node head 為頭節點會清理Node中關聯的pre,thread避免GC不回收
volatile Node tail 尾節點
volatile int state 0為空閑其它元件按需使用,使用cas來指派,
Thread exclusiveOwnerThread 持有線程

state為volatile的int,不同的業務場景按需實作,

獨占模式:

ReentrantLock.Sync中state為0表示未鎖定>0表示被幾個線程持有

ThreadPoolExecutor.Worker中state為0表示未執行

共享模式:

CountDownLatch.Sync中state為初始化時指定,表示有多少個線程可持有,

Semaphore.Sync中state與CountDownLatch相同

混合模式:

ReentrantReadWriteLock.Sync中state為共享鎖+獨占鎖組合 通過位運算16位來分割,最大的讀鎖寫鎖個數為65535

關鍵方法

獨占模式

持有資源:acquire

acquire: 獨占模式擷取,獨占模式即隻有一個線程可更新state.忽略中斷辨別,在擷取之後響應中斷。

acquireInterruptibly:獨占模式擷取,線程辨別為中斷則抛出異常

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}      
AbstractQueuedSynchronizer 筆記AbstractQueuedSynchronizer

tryAcquire:子類按需實作,使用獨占模式更新state,增加state,成功傳回true失敗傳回false

中斷後不會正确響應park,是以需要重置線程中斷辨別,并在unpark之後進行中斷補償

釋放資源:release

release:以獨占模式釋放資源

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}      
AbstractQueuedSynchronizer 筆記AbstractQueuedSynchronizer
tryRelease:子類按需實作,使用獨占模式更新state,減少state,并處理對應獨占線程

共享模式

持有資源:acquireShared

  • acquireShared 共享模式擷取,忽略中斷線程,在擷取之後相應中斷。
  • acquireSharedInterruptibly 共享模式擷取,響應中斷,線程中斷則抛出異常。
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}      
AbstractQueuedSynchronizer 筆記AbstractQueuedSynchronizer

tryAcquireShared:子類按需實作,對傳回值有如下要求:

負值:失敗。 零:共享模式下的擷取成功,但是沒有後續共享模式下的擷取可以成功。 正值: 如果共享模式下的擷取成功并且後續共享模式下的擷取也可能成功,則為正值,在這種情況下,後續的等待線程必須檢查可用性。 (對三個不同傳回值的支援使該方法可以在僅有時進行擷取的情況下使用。)成功後,就已經擷取了此對象。

釋放資源:releaseShared

  • releaseShared:以共享模式釋放資源
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}      
AbstractQueuedSynchronizer 筆記AbstractQueuedSynchronizer
tryReleaseShared:子類按需實作,使用共享模式更新state,減少state

其它關鍵方法

檢查并更新節點狀态:shouldParkAfterFailedAcquire

在park線程之前的判斷,目前置節點為取消時更新前置節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }      

喚醒後續節點:unparkSuccessor

喚醒後續節點,并清理取消的節點,
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}      

共享模式設定隊列頭:setHeadAndPropagate

共享模式下多個線程同時持有資源,頭節點會頻繁變化需要及時釋放資源
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}      

Node

int waitStatus 1:CANCELLED,表示目前的線程被取消;<br/>-1:SIGNAL,後續節點需要unpark;<br/>-2:CONDITION,表示目前節點在等待condition,也就是在condition隊列中;<br/>-3:PROPAGATE,A releaseShared應該傳播到其他節點。 在doReleaseShared中對此進行了設定(僅适用于頭節點),以確定傳播繼續進行,即使此後進行了其他操作也是如此。 <br/> 0:表示目前節點在sync隊列中,等待着擷取鎖。
Node prev 前驅節點,比如目前節點被取消,那就需要前驅節點和後繼節點來完成連接配接。
Node next 後繼節點。
Thread thread 入隊列時的目前線程。
Node nextWaiter 為NULL表示為獨占模式
PROPAGATE:共享模式中會通過狀态是否小于0來判斷是否需要喚醒後續節點,共享模式下多個線程可同時持有state變更,waitStatus會頻繁從0切換為SIGNAL,區分SIGNAL增加的中間狀态是以稱為傳播值