天天看点

AQS之排斥锁分析

文章目录

  • ​​1.AQS 内部体系架构​​
  • ​​2.获取资源​​
  • ​​2.1 获取资源 tryAcquire()e​​
  • ​​2.2 加入队列 addWaiter()​​
  • ​​2.3 排队获取资源 acquireQueued()​​
  • ​​2.4 阻塞检查 shouldParkAfterFailedAcquire()​​
  • ​​3.释放资源​​
  • ​​3.1 唤醒后继节点 unparkSuccessor()​​
  • ​​4.获取&释放资源总流程​​
  • ​​5.其他获取资源的方法​​
  • ​​5.1 响应中断 acquireInterruptibly()​​
  • ​​5.2 超时参数 tryAcquireNanos()​​

1.AQS 内部体系架构

AQS之排斥锁分析
  • FairSync: 公平锁
  • NoFairSync: 非公平锁
AQS之排斥锁分析
  • Shared: 共享模式
  • Exclusive: 排他模式

2.获取资源

AbstractQueueSynchronized.acquire()

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }      
  1. 尝试获取资源,成功则结束
  2. 获取资源失败,新建节点插入队列
  3. 在队列里排队并获取资源
  4. 如果等待过程中出现线程中断,则中断自身

    (1) 首先就是一次资源获取的尝试,具体尝试逻辑由各实现类实现,失败再加入队列

    (2) 该方法不响应中断,如果在等待过程线程被中断,线程不会响应,只会在等待结束后中断重新自身

2.1 获取资源 tryAcquire()e

FairSync:公平

AQS之排斥锁分析

NoFairSync: 非公平

AQS之排斥锁分析

区别为 hasQueuedPredecessors()方法

public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }      

这个方法可以使得公平和不公平, 去判断有没有别的线程排在了当前线程的前面。

先是tail, 后是head, 如果这二者只有一个为null(另一个不为null), 只可能为tail = null, head 不为null

AQS之排斥锁分析

2.2 加入队列 addWaiter()

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }      

enq()

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }      

初始化一个排斥模式的节点加入到队列尾部,tail指向该节点。本方法先进行一次快速尝试,失败后在一个循环中自旋尝试。

2.3 排队获取资源 acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      

本方法是核心逻辑,实现了一个节点在队列中排队、阻塞到获取资源成功的一个逻辑,具有返回值,返回排队过程是否线程被中断过。

最后考虑到排队过程中抛出异常的场景,加了一个finally的处理,对这类厂家取消排队获取资源。

2.4 阻塞检查 shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     int ws = pred.waitStatus;
     //唯一符合的情况
     if (ws == Node.SIGNAL)
         return true;
     //不符合怎么办
     if (ws > 0) {
      //ws>0说明节点无效,调整排队位置,向前找到有效的前驱节点
         do {
             node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
         pred.next = node;
     } else {
         //修改前驱节点状态
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     }
     return false;
 }      
  1. 符合阻塞的条件:前驱节点状态为singal
  2. 不符合怎么办:调整队列,找到有效的前驱节点后,修改前驱节点状态

parkAndCheckInterrupt(): 阻塞方法

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }      

只有被唤醒或者线程中断才能退出阻塞, 最后一句检查并清理中断标志,所以acquire方法中如果被中断过,需要进行一次自我中断。

3.释放资源

AQS之排斥锁分析

尝试释放资源tryRelease也是交给实现类去实现。

Sync.tryRelease()

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }      

3.1 唤醒后继节点 unparkSuccessor()

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }      
  1. 修改当前节点的状态标志
  2. 找到下一个节点,如果存在则唤醒线程

4.获取&释放资源总流程

5.其他获取资源的方法

5.1 响应中断 acquireInterruptibly()

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      
  1. 尝试获取资源时进行一次中断检查
  2. 阻塞时被中断抛出异常

5.2 超时参数 tryAcquireNanos()

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      
  1. 排队获取资源时先判断是否已经超时
  2. 每次判断是否需要阻塞时都先判断是否超时
  3. 阻塞时增加检查,时间不足只进行自旋,减少阻塞的性能消耗
  4. 阻塞时阻塞指定时间