天天看点

java并发锁机制----ReentrantLock 原理解析

什么是AQS

AQS即是AbstractQueuedSynchronizer,一个用来构建锁和同步工具的框架,包括常用的ReentrantLock、CountDownLatch、Semaphore等。

AQS没有锁之类的概念,它有个state变量,是个int类型,在不同场合有着不同含义。本文研究的是锁,为了好理解,姑且先把state当成锁。

AQS围绕state提供两种基本操作“获取”和“释放”,有条双向队列存放阻塞的等待线程,并提供一系列判断和处理方法,简单说几点:

  • state是独占的,还是共享的;
  • state被获取后,其他线程需要等待;
  • state被释放后,唤醒等待线程;
  • 线程等不及时,如何退出等待。

至于线程是否可以获得state,如何释放state,就不是AQS关心的了,要由子类具体实现。

直接分析AQS的代码会比较难明白,所以结合子类ReentrantLock来分析。AQS的功能可以分为独占和共享,ReentrantLock实现了独占功能,是本文分析的目标。

ReentrantLock对比synchronized

ReentrantLock实现了Lock接口,加锁和解锁都需要显式写出,注意一定要在适当时候unlock。

和synchronized相比,ReentrantLock用起来会复杂一些。在基本的加锁和解锁上,两者是一样的,所以无特殊情况下,推荐使用synchronized。ReentrantLock的优势在于它更灵活、更强大,除了常规的lock()、unlock()之外,还有lockInterruptibly()、tryLock()方法,支持中断、超时。

公平锁和非公平锁

首先看ReentrantLock构造方法:

/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync(); // 默认选择非公平锁
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ?
        new FairSync()  // 公平锁
        :
        new NonfairSync(); // 非公平锁
    }
           

学习AQS的时候,了解到AQS依赖于内部的FIFO同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个Node对象并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

这时,我有了一个疑问,AQS的同步队列是FIFO的,就是先来排队的先走。那怎么实现非公平锁呢?查阅了一些资料,总算知道了。

首先从公平锁开始看起。

ReentrantLock 默认采用非公平锁,除非在构造方法中传入参数 true 。

公平锁的 lock 方法:

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
           

我们可以看到,在注释1的位置,有个

!hasQueuedPredecessors()

条件,意思是说当前同步队列没有前驱节点(也就是没有线程在等待)时才会去

compareAndSetState(0, acquires)

使用CAS修改同步状态变量。所以就实现了公平锁,根据线程发出请求的顺序获取锁。

非公平锁的lock方法

static final class NonfairSync extends Sync {
    final void lock() {
        // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //3.这里也是直接CAS,没有判断前面是否还有节点。
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

非公平锁的实现在刚进入lock方法时会直接使用一次CAS去尝试获取锁,不成功才会到acquire方法中,如注释2。而在nonfairTryAcquire方法中并没有判断是否有前驱节点在等待,直接CAS尝试获取锁,如注释3。由此实现了非公平锁。

非公平锁和公平锁的两处不同:

  1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
  2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

它们均继承了 ReentrantLock.Sync 类, ReentrantLock.Sync 类 又继承了 AbstractQueuedSynchronizer类

1. ReentrantLock.Sync 类是对一些公共方法的封装。

2. AbstractQueuedSynchronizer 内部维护着一个由双向链表实现的队列 用来记录等待锁释放的线程,

实际操作中 NonfairSync 的使用比较多 这里针对NonfairSync 进行解析。

加锁操作

ReentrantLock.NonfairSync

/**
   * 非公平锁的同步对象
   * Sync object for non-fair locks
   */
  static final class NonfairSync extends Sync {
      private static final long serialVersionUID = 7316153563782823691L;

      /**
       * Performs lock.  Try immediate barge, backing up to normal
       * acquire on failure.
       */
      final void lock() {
          if (compareAndSetState(0, 1))
              setExclusiveOwnerThread(Thread.currentThread());
          else
              acquire(1);
      }

      protected final boolean tryAcquire(int acquires) {
          return nonfairTryAcquire(acquires);
      }
  }
           

   1. 先看lock方法

final void lock() {
            //  通过原子操作 改变上锁状态
            if (compareAndSetState(0, 1)) // 变更成功
                setExclusiveOwnerThread(Thread.currentThread()); // 设置持有者为当前线程
            else // 变更不成功
                acquire(1);
        }
           

按代码顺序往下看

1.1 AbstractQueuedSynchronizer#compareAndSetState方法

private static final Unsafe unsafe = Unsafe.getUnsafe();

    static {
        try {
            // 获取偏移量
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    /**
     *  通过原子操作 改变上锁状态
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this  调用本地方法 实现硬件级别的原子操作 cas
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
           

Unsafe :

  •  CAS的核心类 (CAS 比较并交换)
  • 通过本地方法 操作内存 实现 cas
  •  一知半解的情况下 不要操作此类

1.2 AbstractOwnableSynchronizer#setExclusiveOwnerThread(Thread.currentThread());将当前线程记录为独占锁的线程

1.3 AbstractQueuedSynchronizer.acquire();

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        // 再次尝试上锁 回到了  NonfairSync.tryAcquire 方法, tryAcquire 调用了 Sync.nonfairTryAcquire方法
        if (!tryAcquire(arg) && 
            acquireQueued(
                    addWaiter(Node.EXCLUSIVE), // 链表尾部添加节点
                    arg
                )
            )
            selfInterrupt();
    }
           

1.3.1 ReentrantLock.Sync#nonfairTryAcquire:

/**
         * 判断 reentranLock 状态 是否被锁住(state ?= 0)
         * <p>如果没被锁住尝试 原子性上锁 失败返回false</>
         * <p>如果被锁住 判断是否是当前线程持有锁(重入锁的实现) 如果是 state + 1
         * (信号量  记录该线程持有锁的次数。 该线程每次释放所 信号量 -1。 信号量为零 代表 锁被真正释放)</>
         * <p>else 返回false</p>
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread(); // 获取当前线程
            int c = getState(); // 获取当锁的状态
            if (c == 0) { // 如果锁已被经释放 再次尝试获取锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 如果当前线程为锁的拥有者
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc); // 累加 state 的值  此段代码 实现了重入锁
                return true;
            }
            return false;
        }
           

获取锁成功分为两种情况,第一个if判断AQS的state是否等于0,表示锁没有人占有。没有的话调用compareAndSetState使用cas的方式修改state,传入的acquires写死是1。最后线程获取锁成功,setExclusiveOwnerThread将线程记录为独占锁的线程。

第二个if判断当前线程是否为独占锁的线程,因为ReentrantLock是可重入的,线程可以不停地lock来增加state的值,对应地需要unlock来解锁,直到state为零。

如果 tryAcquire(arg) 返回true,则不会执行acquireQueued,表示成功获取锁,如果tryAcquire(arg) 返回 false,说明没有成功获取锁,则加入请求队列中。接着请看 addWaiter(Node.EXCLUSIVE) 方法。

1.3.2 AbstractQueuedSynchronizer#addWaiter代码:

AQS内部有一条双向的队列存放等待线程,节点是Node对象。每个Node维护了线程、前后Node的指针和等待状态等参数。

线程在加入队列之前,需要包装进Node,调用方法是addWaiter,addWaiter 中涉及的逻辑,就是 CLH思想的实现,故在 AbstractQueuedSynchronizer中,源码如下:

/**
     *
     * 把当前线程加入队列 尾部
     *
     * 负责队列初始化
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) { //@1--start 队列尾部不为空
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }//@1--end
        // 列队尾部为空 或者  CAS 操作失败
        enq(node);
        return node;
    }
           

每个Node需要标记是独占的还是共享的,由传入的mode决定,ReentrantLock自然是使用独占模式Node.EXCLUSIVE。 

对于上面的代码@1,处说,如果当前该锁的尾部节点不为空时,只需要原子性的将新增节点放入原队列的尾部,然后更新锁的tail 属性即可。如果尾部节点不为空,说明有线程已经在该锁上等待,那如果尾部为空,是什么情况呢?尾部为空,表示没有线程持有锁,那为什么该线程获取锁没有成功呢?我们不妨设想一下,该线程在尚未执行到addWaiter时,尾部不为空,无法获取锁,当执行到addWaiter时,别的线程释放了锁,导致尾部为空,可以重新获取锁了;(其实这个就是并发编程的魅力,与synchronized关键字不同的机制);为了解答上述疑问,我们进入到 enq(node)方法中一探究竟。

/**
     * java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // @1 尾部为空 尝试构建表结构 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { // 尾部不为空 不断尝试CAS 操作
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

使用自旋来加入,众所周知,CLH算法,需要初始化一个假的head节点,也就是head节点并不代表一个等待获取锁的对象,AbstractQueuedSynchronzier选择初始化head,tail的时机为第一次产生锁争用的时候。@1处为初始化head,tail,初始化后,再将新添加的节点放入到队列的尾部,然后该方法会返回原队列的尾节点。addWaiter方法执行后,继续回到acquire(args)方法处:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

接下来,查看acquireQueued方法,addWaiter方法返回的是代表当前线程的Node节点。

1.3.3 线程加入队列后,下一步是调用acquireQueued阻塞线程。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued代码:

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
*/

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                
                final Node p = node.predecessor();//  @1
                if (p == head && tryAcquire(arg)) { // @2 判断当前节点的 前驱节点 是否为队列头部  如果是 再次尝试上锁(如果头部节点 已经释放所, 则使当前线程成为持有者 并且设置自己为 头部。 同时释放前驱节点)
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt() // 进入等待状态 等待唤醒
                        )//@3
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);  // 抛出异常 才会走的到这里。  源码在下面
        }
    }
           

首先@1,获取该节点的 node 的上一个节点。

@2如果node的前节点是head,,因为head初始化时,都是假节点,不代表有线程拥有锁,所以再次尝试获取锁,如果获取锁,则将锁的 head设置为当前获取锁的线程的Node,然后返回false《这步是实现公平锁的核心,保证释放锁时,由下个排队线程获取锁》。这里返回false代表 if (!tryAcquire(arg) &&  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 的结果为false,那么直接返回就好,不需要selfInterrupt()设置中断标记。

如果node的前节点不是head的话,则说明该锁被别的线程占用了,那就需要等待其他线程释放该锁,具体,我们看一下 shouldParkAfterFailedAcquire,为了更好的理解 shouldParkAfterFailedAcquire,我们先了解一下waitState变量,waitState是在AQS内部类Node中定义的一个volatile变量。

/** waitStatus value to indicate thread has cancelled */
        //取消状态
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        //代表下个线程节点需要被唤醒
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        //当前线程节点在等待条件触发
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        //(共享锁)状态需要向后传播
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;
           

shouldParkAfterFailedAcquire传入当前节点和前节点,根据前节点的状态,判断线程是否需要阻塞。

java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

/**
     * 检查 是否需要阻塞当前线程
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; //@1
        if (ws == Node.SIGNAL)    //@2
            /* 
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             //前驱节点在等待唤醒  也就是说当前节点需要进入等待状态
            return true;
        if (ws > 0) {  //@3 前驱节点被取消
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.     
             */
            // 前驱节点被取消则跳过前驱节点,循环此操作直到找到一个不为 取消状态 的前驱节点
            do {    //@4-start
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);    //@4-end
            pred.next = node;    //@5
        } else {    //@6
            /* 
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //只有前置节点的状态为 0 或 PROPAGATE,才能进入到该代码块,表明我们需要一个信号,但暂不挂起线程,调用者需要重试一次,确保它不能获取到锁,从而阻塞该线程。

             // 设置前驱节点为 SIGNAL 标记自己为等待唤醒,下次循环到这里之前,如果没有成功拥有锁, 则会进入 if (ws == Node.SIGNAL) 代码段
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

//前驱节点状态是SIGNAL时,当前线程需要阻塞;
//前驱节点状态是CANCELLED时,通过循环将当前节点之前所有取消状态的节点移出队列;
//前驱节点状态是其他状态时,需要设置前驱节点为SIGNAL。
           

@1 首先获取前置节点的 waitStatus。

@2 如果前置节点的waitStatus = Node.SIGNAL,那么当前节点,直接阻塞,说明状态是一个信号,如果前置节点状态为              Node.SIGNAL,那么后续节点应该阻塞(一个节点的waitStatus初始值为 0。)

@3,ws > 0 ,则代表前置节点已取消。

@4 处的代码,就是找到当前Node的第一个不为取消状态的前置节点,重构CLH队列后,返回false,再次进入到acquireQueued  的无限循环中,又继续acquireQueued的流程,继续尝试获取锁,最终获取到锁,或者阻塞。

@6如果前置节点为0或PROPAGATE(可传播),如果前置节点为0,说明之前还没有其他节点通过(prev)来判断该prev的后继节点是否需要阻塞,所以,通过CAS设置前置节点为 Node.SIGNAL,重试获取锁过程,避免不必要的线程阻塞。

如果线程需要阻塞,由parkAndCheckInterrupt方法进行操作。

java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
           

parkAndCheckInterrupt使用了LockSupport,和cas一样,最终使用UNSAFE调用Native方法实现线程阻塞(以后有机会就分析下LockSupport的原理,park和unpark方法作用类似于wait和notify)。最后返回线程唤醒后的中断状态,关于中断,后文会分析。

至此,获取锁的过程就结束了,为了直观体现上述获取锁的过程,现给出如下流程图:

java并发锁机制----ReentrantLock 原理解析

java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire代码我们放到后面分析。

/**
     * Cancels an ongoing attempt to acquire.
     * 列队等待中 抛出异常会调用此方法
     * @param node the node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null; // 释放线程

        // 前驱节点已被取消  重新定义前驱节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED; // 取消当前线程 所属的节点(标记为取消),  没有使用 cas  因为 其他线程 不会干扰这里

        // If we are the tail, remove ourselves. 如果我们是尾巴,就把自己弄走
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            // 如果node既不是tail,又不是head的后继节点
            // 则将node的前继节点的waitStatus置为SIGNAL
            // 并使node的前继节点指向node的后继节点(相当于将node从队列中删掉了)
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //  如果node是head的后继节点,则直接唤醒node的后继节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
           

 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor代码:

/** 唤醒后继节点
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0) //置零当前线程所在的结点状态,允许失败。
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);  // 唤醒下级节点
    }
           

非公平锁lock 方法的实现 大概就以上的内容。到这里总结一下获取锁的过程:线程去竞争一个锁,可能成功也可能失败。成功就直接持有资源,不需要进入队列;失败的话进入队列阻塞,等待唤醒后再尝试竞争锁。

释放锁

通过上面详细的获取锁过程分析,释放锁过程大概可以猜到:头节点是获取锁的线程,先移出队列,再通知后面的节点获取锁。

public void unlock() {
    sync.release(1);
}
           

ReentrantLock的unlock方法很简单地调用了AQS的release:

public final boolean release(int arg) {
    if (tryRelease(arg)) {    //@1
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           

和lock的tryAcquire一样,unlock的tryRelease同样由ReentrantLock中内部类NonFairSync实现:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;   //  @1
            if (Thread.currentThread() != getExclusiveOwnerThread()) //@2
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {  // @3
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);   //@4
            return free;
}
           

代码@1,首先,计算持有锁的次数=当前被持有锁的次数-减去释放的锁的数量;

代码@2,判断当前锁的持有线程释放与释放锁的线程是否相同,否则,直接抛出运行时异常

代码@3,如果释放锁后,占有次数为0,则代表该锁被释放,设置锁的占有线程为null,

代码@4,设置锁的state,如果返回true,表示锁被释放,如果返回false,表示,锁继续被该线程占有(重入了多次,就需要释放多次)。再次回到release方法,如果tryRelease方法返回true,表示可以释放锁,

public final boolean release(int arg) {
        if (tryRelease(arg)) {  @1
            Node h = head;
            if (h != null && h.waitStatus != 0)   // @2
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

 代码@2为什么需要判断 h!=null && h.waitStatus != 0的判断呢?,在讲解获取锁的时候,方法

shouldParkAfterFailedAcquire 中对于代码@6处的讲解,其实不难发现,一个节点在请求锁时,只有当它的前驱节点的waitStatus=Node.SIGNAL时,才会阻塞。如果 head为空,则说明CLH队列为空,压根就不会有线程阻塞,故无需执行unparkSuccessor(h),同样的道理,如果head节点的waitStatus=0,则说明压根就没有head后继节点判断是否要绑定的逻辑,故也没有线程被阻塞这一说。改进后的CLH,head如果不为空,该节点代表获取锁的那个线程对应的Node,请看获取锁代码acquireQueued中的代码@2处,如果获得锁,setHead(node);知道这一点,就不难理解为什么在释放锁时调用unparkSuccessor(h)时,参数为head了。

现在将目光转移到 AbstractQueuedSynchronizer. unparkSuccessor(h)方法中:

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;  
        if (ws < 0)     // @1
            compareAndSetWaitStatus(node, ws, 0);
 
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {  //@2 start
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        } // @2 end
 
        if (s != null) // @3
            LockSupport.unpark(s.thread);
    }
           

代码@1,目前waitStatus > 0表示取消,等于0表示正常(新建),该步骤主要是为了保护,避免重复释放。

代码@2 start-end,此处,主要是从占有锁的节点,往后找,找到第一个没有被取消的节点,然后唤醒它所代表的线程。这里为什么要从尾部寻址呢?

代码@3,唤醒线程,释放锁的逻辑代码已经结束,那调用LockSupport.unpark(s.thread)后,会进入到哪呢?此时,请再次进入获取锁代码的 acquireQueue方法和shouldParkAfterFailedAcquire方法,先解读如下:

    当LockSupport.unpark(s.thread)事,那acquireQueued的代码@3处parkAndCheckInterrupt方法会解除阻塞,继续往下执行,进入到 acquireQueued的for循环处:此时会有两种情况

    1、HEAD --> Node(s)  ... > ...            (Node(s)为  LockSupport.unpark 中的 s)

    2、HEAD --> A Cancel Node -->  Node(s)

    如果为第一种情况,直接进入 @2去尝试获取锁。

    如果为第二种情况,shouldParkAfterFailedAcquire(prev,node)中的prev为一个取消的节点,然后会重构整个CLH链表,删除Node到head节点直接的取消节点,使得被唤醒线程的节点的上一个节点为head,从而满足@2处的条件,进入获取锁方法。至此, lock方法与unlock方法实现了交互。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();    //  @1
                if (p == head && tryAcquire(arg)) {    // @2
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt()  )   //@3
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
与shouldParkAfterFailedAcquire方法:
  */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;   //  @1
        if (ws == Node.SIGNAL)    // @2
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {   // @3
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {   // @4 start
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);   //@4 end
 
            pred.next = node; // @5
        } else { // @6
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
               只有前置节点的状态为 0 或 PROPAGATE,,才能进入到该代码块,表明我们需要一个信号,但暂不挂起线程,调用者需要重试一次,确保它不能获取到锁,从而阻塞该线程。
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

 为了方便大家理解,给出一个简要的释放锁的流程图:

java并发锁机制----ReentrantLock 原理解析

中断锁

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
           

在acquire里还有最后一句代码调用了selfInterrupt,功能很简单,对当前线程产生一个中断请求。

为什么要这样操作呢?因为LockSupport.park阻塞线程后,有两种可能被唤醒。

第一种情况,前节点是头节点,释放锁后,会调用LockSupport.unpark唤醒当前线程。整个过程没有涉及到中断,最终acquireQueued返回false时,不需要调用selfInterrupt。

第二种情况,LockSupport.park支持响应中断请求,能够被其他线程通过interrupt()唤醒。但这种唤醒并没有用,因为线程前面可能还有等待线程,在acquireQueued的循环里,线程会再次被阻塞。parkAndCheckInterrupt返回的是Thread.interrupted(),不仅返回中断状态,还会清除中断状态,保证阻塞线程忽略中断。最终acquireQueued返回true时,真正的中断状态已经被清除,需要调用selfInterrupt维持中断状态。

因此普通的lock方法并不能被其他线程中断,ReentrantLock是可以支持中断,需要使用lockInterruptibly。

两者的逻辑基本一样,不同之处是parkAndCheckInterrupt返回true时,lockInterruptibly直接throw new InterruptedException()。

ReentrantLock lockInterruptibly 源码分析

public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
           

首先先提一个问题: void lock(),通过该方法去获取锁,如果锁被占用,线程阻塞,如果调用被阻塞线程的interupt()方法,会取消获取锁吗?答案是否定的。

     首先需要知道 LockSupport.park 会响应中断,但不会抛出 InterruptedException。

     接下来,我们就从lockInterruptibly()方法入手,一步一步解析,并分析与lock方法的差异。

     首先进入的是AbstractQueuedSynchronizer的acquireInterruptibly方法。

/**
     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);   // @1
    }
    如果尝试获取锁失败后,进入获取锁并等待锁逻辑,doAcquireInterruptibly
/**
     * Acquires in exclusive interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);   // @1
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {              // @2
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();   //@3
            }
        } finally {
            if (failed)
                cancelAcquire(node); //@4
        }
    }
           

 整个获取锁的逻辑与 lock方法一样,唯一的区别在于  @3 处,如果parkAndCheckInterrupt如果是通过t.interupt方法,使LockSupport.park取消阻塞的话,会抛出InterruptedException,停止尝试获取锁,然后将添加的节点取消,那重点关注一下cancelAcquire(node);

/**
     * Cancels an ongoing attempt to acquire.
     *
     * @param node the node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
 
        node.thread = null;
 
        // Skip cancelled predecessors
        Node pred = node.prev;   
        while (pred.waitStatus > 0)  // @1
            node.prev = pred = pred.prev;
 
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next; //@2
 
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED; 
 
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {   // @3 
            compareAndSetNext(pred, predNext, null);
        } else {  // @4
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {   // @5
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {  // @6
                unparkSuccessor(node);
            }
 
            node.next = node; // help GC
        }
    }
           

代码@1:此处的目的就是, 设置prev的值为从当前取消节点往head节点方向,第一个未取消节点。并将中间的取消节点脱离这条链。

代码@2 Node predNext = pred.next;

代码@3 如果被取消的节点是尾节点的话,那么将pred设置为尾节点,compareAndSetTail(node, pred),如果设置失败,说明,有别的线程在申请锁,使得尾部节点发生了变化,那这样的话,我当前节点取消的工作,就到此可以结束了;如果设置成功了,既然pred是尾节点,那么再次将pred的next域设置为null;当然也可能设置失败,表明又有新的线程在申请锁,创建了节点。所以取消操作,也到此结束。

代码@4,如果取消的节点,不是尾部节点的话,这时需要维护CLH链,请看代码@5

代码@5,首先pred不是head节点,接下来判断是否需要设置pred.next = 当前待取消节点的next。

 如果 pred.waitStatus==Node.SIGNAL, 或者操作pred.waitStatus=Node.SIGNAL状态成功,并且pred.thread 的线程不为空;此时进一步判断待取消的节点的next不为空,并且状态为非取消的时,将pred.next 设置为 node.next;该取消节点被删除

代码@6,如果pred为head,执行一次唤醒操作。

整个加锁释放锁流程中Node.CANCEL状态节点的删除操作有两处,一处在shouldParkAfterFailedAcquire,另一处就发生在cancelAcquire方法。

总结

从ReentrantLock的实现完整分析了AQS的独占同步功能(如果需要更进一步详细了解,可以参考这篇博文https://blog.csdn.net/tyrroo/article/details/92772279),总的来讲并不复杂。别忘了AQS还有共享功能。

继续阅读