天天看点

AQS(AbstractQueuedSynchronizer)——源码分析

简介

AbstractQueuedSynchronizer(队列同步器),是用来构建锁或者其他同步组件的基础框架,它通过使用一个int类型的变量来表示同步状态的同时内置FIFO队列来完成资源获取线程的排队工作,AbstractQueuedSynchronizer是大部分同步需求实现的基础。

1、Lock接口

在学习AbstractQueuedSynchronizer之前,先了解一下Lock接口。了解Lock接口之前呢,先了解一下什么是锁?锁是用来控制多个线程访问共享资源的方式,一个互斥锁能够防止多个线程同时访问共享资源。在Lock接口出现之前,我们都知道Java程序员是通过synchronized关键字来实现锁功能的,而Java SE 5之后,并发包中新增Lock接口及其相关实现用来实现所得功能,Lock接口及其实现提供了与synchronized类似的同步功能,相比之下有如下不同点。

Lock需要显示的获取锁和释放锁,虽然便捷性低,但是具有更强的可操作性,synchronized可以隐式的获取锁,但是其获取锁的方式是固化的,也就是先获取再释放

Lock具有可中断性、超时获取锁能多种synchronized不具备的功能

1.1 Lock接口提供的synchronized关键字不具备的主要特性

AQS(AbstractQueuedSynchronizer)——源码分析

1.3 Lock接口主要API(获取锁/释放锁)

public interface Lock {

    /**
     * 获取锁;调用该方法的线程将会获取锁,当获取到锁后,从该方法返回
     */
    void lock();

    /**
     * 可以中断的获取锁,和Lock方法相比不同之处在于该方法可以响应中断,即在锁获取的过程中可以中断当前线程
     *
     * @throws InterruptedException
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 尝试非阻塞的获取锁,调用该方法后立即返回,如果能够获取则返回true,否则返回false
     *
     * @return
     */
    boolean tryLock();

    /**
     * 超时获取锁,如下情况会返回:
     * 1、当前线程在指定超时时间内获取到锁
     * 2、当前线程在超时时间内被中断
     * 3、超时时间结束,返回false
     *
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 释放锁
     */
    void unlock();

    /**
     * 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的wait()方法,而调用后,当前线程会释放锁
     *
     * @return
     */
    Condition newCondition();
}
      
AQS(AbstractQueuedSynchronizer)——源码分析

2、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer(队列同步器)是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义,java.util.concurrent中许多可阻塞类,例如

ReentrantLock、Semaohore、ReentrantReadWriteLock、CountDownLatch、SynchronuosQueue和FutureTask等,都是基于AQS构建的。这两者之间的关系如下:

锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节

同步器是面向锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒的底层操作

2.1 AbstractQueuedSynchronizer的接口与示例

同步器的设计基于模板方法,使用者需要继承同步器并重写指定的方法,然后将同步器组合在自定义的同步组件的实现中,并调用同步器的提供的模板方法,而模板方法将会调用使用者(子类)的重写方法。

重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态

getState():获取当前同步状态

setState(int newState):设置当前同步状态

compareAndSetState(int expect, int update) :使用CAS设置当前状态,该方法能够保证状态设置的原子性

同步器可重写方法如下:

/**
     * 独占式获取同步状态,实现该方法需要查询当前状态并且判断同步状态是否符合预期,然后再进行CAS设置同步状态
     *
     * @param arg
     * @return
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 独占式释放同步状态,等待获取同步状态的线程将会有机会获取同步状态
     *
     * @param arg
     * @return
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享式获取同步状态,返回大于等于0的值,表示获取成功,反正获取失败
     *
     * @param arg
     * @return
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }


    /**
     * 共享式释放同步状态
     *
     * @param arg
     * @return
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 当前同步器释放在独占模式下被线程占用,一般该方法表示释放被当前线程所占
     *
     * @return
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
      
AQS(AbstractQueuedSynchronizer)——源码分析

同步器提供的模板方法主要分为3类:

  1. 独占式获取与释放同步状态
  2. 共享式获取与释放同步状态
  3. 查询同步队列中的等待线程情况
/**
     * 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回;
     * 否则,将会进入同步等待队列等待,该方法将会调用重写的tryAcquire(int arg)方法
     *
     * @param arg
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 与acquire方法相同,但是该方法可以响应中断,当前线程未获取到同步状态进入同步队列中
     * 如果当前线程被中断,则该方法抛出InterruptedException异常
     *
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    /**
     * 在acquireInterruptibly方法的基础上增加了超时限制
     * 如果当前线程在超时时间内未获取到同步状态,则返回false,获取到则返回true
     *
     * @param arg
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

    /**
     * 共享式额获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待
     * 与独占式获取的主要区别是在同一时刻可以由多个线程同时获取到同步状态
     *
     * @param arg
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }


    /**
     * 与acquireShared方法相同,该方法响应中断
     *
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    /**
     * 在acquireSharedInterruptibly的基础上增加了超时限制
     *
     * @param arg
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
    }

    /**
     * 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
     *
     * @param arg
     * @return
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            java.util.concurrent.locks.AbstractQueuedSynchronizer.Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * 共享式的释放同步状态
     *
     * @param arg
     * @return
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     *  获取等待在同步队列上的线程集合
     *
     * @return
     */
    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (java.util.concurrent.locks.AbstractQueuedSynchronizer.Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }
      
AQS(AbstractQueuedSynchronizer)——源码分析

2.2 自定义独占锁加强AbstractQueuedSynchronizer的工作原理的理解

独占锁指的是,同一时刻只能有一个线程获取到锁,其他获取锁的线程只能处于等待队列中等待,只有获取到锁的线程释放了锁,后继的线程才能获取到锁。(不太了解的可以写一遍,基本上就懂了)
package com.lizba.p5;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * <p>
 *      自定义独占锁
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/19 22:40
 */
public class Mutex implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 尝试获取锁
         *
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {
            // 当前状态如果为0则获取到锁
            if (compareAndSetState(0, 1)) {
                // 设置锁的占用线程为当线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 尝试释放锁
         *
         * @param arg
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) {
            // 如果当前同步状态为0,调用该方法则抛出异常
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            // 清空占用线程
            setExclusiveOwnerThread(null);
            // 设置共享状态为0
            setState(0);
            return true;
        }

        /**
         * 判断当前线程是否处于占用状态
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        /**
         * 返回一个Condition,每个Condition包含一个condition队列
         *
         * @return
         */
        Condition condition() {
            return new ConditionObject();
        }
    }

    // 将需要的操作代理至Sync上
    private Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.condition();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
}
      

总结自定义同步组件Mutex互斥锁:

通过自定义同步组件Mutex我们可以看出。Mutex定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。用户在使用Mutex的时候并不会直接和内部同步器打交道,而是调用Mutex提供的方法,在Mutex的实现中以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可。这种实现方法大大降低了实现一个可靠自定义同步组件的门槛。(不多说_Doug Lea牛逼_)

3、AbstractQueuedSynchronizer实现分析

分析的主要内容包括如下几个方面

同步队列

独占式同步状态获取与释放

共享式同步状态获取与释放

超时获取同步状态

3.1 同步队列

同步队列实现依赖的是内部的一个(FIFO)的同步队列来完成同步状态管理的,而这个队列的重中之重就是AbstractQueuedSynchronizer的内部类Node,这个Node节点是用来保存同步失败的线程引用、等待状态以及前驱prev和后继next节点。

节点源码重点部分解释:

static final class Node {

    /**
     * 等待状态
     * 0                    初始状态
     * -3 = PROPAGATE       表示下一次共享式同步状态获取将会无条件地传播下去
     * -2 = CONDITION       节点线程等待在Condition上,当其他线程对Condition调用了signal()后,节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
     * -1 = SIGNAL          当前节点的线程释放同步状态或者被取消,则通知后继节点,使其节点线程得以运行
     * 1 = CANCELLED        同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消,进入该状态后将不会在改变状态
     */
    volatile int waitStatus;
    
    /**
     *  前驱节点
     */
    volatile Node prev;
    
    /**
     *  后继节点
     */
    volatile Node next;
    
    /**
     *  获取同步状态的线程
     */
    volatile Thread thread;
    
    /**
     *  等待在Condition上的后继节点。当前节点是共享模式时,含义特殊,它也代表节点类型(独占/共享)
     */
     Node nextWaiter;

}
      
AQS(AbstractQueuedSynchronizer)——源码分析
  1. 同步器持有首节点和尾节点,初始都为null
  2. 获取同步状态失败的线程节点加入队列尾部
AQS(AbstractQueuedSynchronizer)——源码分析
AQS(AbstractQueuedSynchronizer)——源码分析
AQS(AbstractQueuedSynchronizer)——源码分析

3.2 独占式同步状态获取与释放

独占式同步状态获取通过调用同步器的acquire(int arg)获取同步状态,注意该方法不响应中断。
获取解析

acquire(int arg)源码解析

AQS(AbstractQueuedSynchronizer)——源码分析
/**
 * 尝试将获取同步状态失败的节点通过CAS的方式加入同步队列尾部
 */
private Node addWaiter(Node mode) {
    // 创建一个新的节点,设置节点信息,和节点线程为当前线程
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点,当尾节点不为空的时候,尝试设置尾节点
    Node pred = tail;
    if (pred != null) {
        // 设置当前插入节点的前驱节点为当前同步队列中的尾节点
        node.prev = pred;
        // 通过CAS快速设置尾节点为当前插入的节点
        if (compareAndSetTail(pred, node)) {
            // 如设置尾节点成功,将pred节点(同步队列上一个的尾节点,此时新的尾节点为插入节点)的后继节点指向新插入的节点(新的尾节点)
            pred.next = node;
            // 返回节点对象
            return node;
        }
    }
    enq(node);
    return node;
}
      
AQS(AbstractQueuedSynchronizer)——源码分析
AQS(AbstractQueuedSynchronizer)——源码分析
AQS(AbstractQueuedSynchronizer)——源码分析
  1. 头节点是成功获取同步状态的节点,头节点线程释放同步状态之后会唤醒后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点
  2. 保持并维护同步队列FIFO的出队入队原则

acquire(int arg) 执行时序图

AQS(AbstractQueuedSynchronizer)——源码分析

3.2 release

当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,执行响应逻辑执行,需要释放同步状态,是的后续节点能够继续获取同步状态。释放同步状态调用的是release(int arg) 方法,该方法释放同步状态之后,会唤醒其他后继节点。

源码解析:

/**
 * 释放同步状态,并且唤醒后继节点
 */
public final boolean release(int arg) {
    // 尝试释放同步状态
    if (tryRelease(arg)) {
        Node h = head;
        // 如果头节点不为空,并且头节点的等待状态waitStatus不为初始状态0,此时判断为仍存在后继节点
        if (h != null && h.waitStatus != 0)
            // 使用LockSupport来唤醒处于等待状态的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
      

3.3 独占式同步状态获取与释放总结

在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并且在队列中进行自旋

移除队列或停止自旋的条件是当前节点的前驱节点是头节点并且成功的获取到了同步状态

释放同步状态时调用release(int arg) 来释放同步状态,如果存在后继节点则唤醒后继节点

3.3 共享式同步状态获取与释放

区别

共享式同步状态获取与独占式同步状态获取的主要区别在于同一时刻释放能被多个线程同时获取到同步状态。例如,文件的读写,一个程序对文件进行读操作,那么此时运行多个读操作同步进行,但是写操作将会被阻塞。而独占式访问,例如写操作,此时会阻塞其他所有的读写操作。

共享式和独占式访问资源对比

AQS(AbstractQueuedSynchronizer)——源码分析
AQS(AbstractQueuedSynchronizer)——源码分析

acquireShared(int arg)之doAcquireShared(int arg)

/**
 * 
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 判断当前节点的前驱节点是否为头节点
            final Node p = node.predecessor();
            if (p == head) {
                // tryAcquireShared(arg) >= 0表示获取同步状态成功
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
      
AQS(AbstractQueuedSynchronizer)——源码分析

共享式释放同步状态与独占式有区别,共享式因为同步状态可以被多个线程获取,自然释放的时候也可以来自多个线程,因此要tryReleaseShared(int arg)要保证同步状态的正确释放,具体实现可以在支持多线程同时访问的并发组件Semaphore中阅读。

3.4 独占式超时同步状态获取

超时同步状态获取,指的是如果可以在指定的时间段内获取同步状态则返回true,否则,返回false;这个功能是synchronized所不具备的,synchronized在线程被发起中断后,仍然会阻塞在synchronized上;而超时同步状态获取doAcquireNanos(int arg, long nanosTimeout)方法在指定时间内未获取到同步状态,会抛出InterruptedException,立即返回。

acquire、acquireInterruptibly和tryAcquireNanos的区别

AQS(AbstractQueuedSynchronizer)——源码分析

doAcquireNanos(arg, nanosTimeout)

/**
 *  在指定时间内获取同步状态,过程中会判断时间和响应中断
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 超时结束时间等于当前时间加上需要中断的时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 死循环/自旋
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 判断当前节点的前驱节点是否为头节点,是头节点则尝试获取共享状态
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 计算时间,如果时间到了则结束自旋返回false,获取失败
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 时间大于0,则判断是否需要park当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 判断当前线程是否被中断,被中断则抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
      

独占式超时获取同步状态的流程图

AQS(AbstractQueuedSynchronizer)——源码分析