天天看點

AQS之排斥鎖分析

文章目錄

  • ​​1.AQS 内部體系架構​​
  • ​​2.擷取資源​​
  • ​​2.1 擷取資源 tryAcquire()e​​
  • ​​2.2 加入隊列 addWaiter()​​
  • ​​2.3 排隊擷取資源 acquireQueued()​​
  • ​​2.4 阻塞檢查 shouldParkAfterFailedAcquire()​​
  • ​​3.釋放資源​​
  • ​​3.1 喚醒後繼節點 unparkSuccessor()​​
  • ​​4.擷取&釋放資源總流程​​
  • ​​5.其他擷取資源的方法​​
  • ​​5.1 響應中斷 acquireInterruptibly()​​
  • ​​5.2 逾時參數 tryAcquireNanos()​​

1.AQS 内部體系架構

AQS之排斥鎖分析
  • FairSync: 公平鎖
  • NoFairSync: 非公平鎖
AQS之排斥鎖分析
  • Shared: 共享模式
  • Exclusive: 排他模式

2.擷取資源

AbstractQueueSynchronized.acquire()

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }      
  1. 嘗試擷取資源,成功則結束
  2. 擷取資源失敗,建立節點插入隊列
  3. 在隊列裡排隊并擷取資源
  4. 如果等待過程中出現線程中斷,則中斷自身

    (1) 首先就是一次資源擷取的嘗試,具體嘗試邏輯由各實作類實作,失敗再加入隊列

    (2) 該方法不響應中斷,如果在等待過程線程被中斷,線程不會響應,隻會在等待結束後中斷重新自身

2.1 擷取資源 tryAcquire()e

FairSync:公平

AQS之排斥鎖分析

NoFairSync: 非公平

AQS之排斥鎖分析

差別為 hasQueuedPredecessors()方法

public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }      

這個方法可以使得公平和不公平, 去判斷有沒有别的線程排在了目前線程的前面。

先是tail, 後是head, 如果這二者隻有一個為null(另一個不為null), 隻可能為tail = null, head 不為null

AQS之排斥鎖分析

2.2 加入隊列 addWaiter()

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }      

enq()

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }      

初始化一個排斥模式的節點加入到隊列尾部,tail指向該節點。本方法先進行一次快速嘗試,失敗後在一個循環中自旋嘗試。

2.3 排隊擷取資源 acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      

本方法是核心邏輯,實作了一個節點在隊列中排隊、阻塞到擷取資源成功的一個邏輯,具有傳回值,傳回排隊過程是否線程被中斷過。

最後考慮到排隊過程中抛出異常的場景,加了一個finally的處理,對這類廠家取消排隊擷取資源。

2.4 阻塞檢查 shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     int ws = pred.waitStatus;
     //唯一符合的情況
     if (ws == Node.SIGNAL)
         return true;
     //不符合怎麼辦
     if (ws > 0) {
      //ws>0說明節點無效,調整排隊位置,向前找到有效的前驅節點
         do {
             node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
         pred.next = node;
     } else {
         //修改前驅節點狀态
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     }
     return false;
 }      
  1. 符合阻塞的條件:前驅節點狀态為singal
  2. 不符合怎麼辦:調整隊列,找到有效的前驅節點後,修改前驅節點狀态

parkAndCheckInterrupt(): 阻塞方法

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }      

隻有被喚醒或者線程中斷才能退出阻塞, 最後一句檢查并清理中斷标志,是以acquire方法中如果被中斷過,需要進行一次自我中斷。

3.釋放資源

AQS之排斥鎖分析

嘗試釋放資源tryRelease也是交給實作類去實作。

Sync.tryRelease()

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }      

3.1 喚醒後繼節點 unparkSuccessor()

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }      
  1. 修改目前節點的狀态标志
  2. 找到下一個節點,如果存在則喚醒線程

4.擷取&釋放資源總流程

5.其他擷取資源的方法

5.1 響應中斷 acquireInterruptibly()

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      
  1. 嘗試擷取資源時進行一次中斷檢查
  2. 阻塞時被中斷抛出異常

5.2 逾時參數 tryAcquireNanos()

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      
  1. 排隊擷取資源時先判斷是否已經逾時
  2. 每次判斷是否需要阻塞時都先判斷是否逾時
  3. 阻塞時增加檢查,時間不足隻進行自旋,減少阻塞的性能消耗
  4. 阻塞時阻塞指定時間