天天看点

jdk1.8 ReentrantLock以及Condition分析

ReentrantLock

ReentrantLock是可重入锁,所谓可重入锁,就是说线程可以进入任何一个它已经拥有的锁所同步着的代码块,底层是基于AQS实现的并发控制,可以选择是公平的还是不公平的

属性

下面分析Sync及其三种子类

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

   /**
    * Performs {@link Lock#lock}. The main reason for subclassing
    * is to allow fast path for nonfair version.
    */
   abstract void lock();

   /**
    * Performs non-fair tryLock.  tryAcquire is implemented in
    * subclasses, but both need nonfair try for trylock method.
    */
    // 这里实现的是不公平的获取
   final boolean nonfairTryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       // 状态为0代表资源可用
       if (c == 0) {
       		// 使用cas将状态设置为1
       		// 并且设置当前独占资源的线程为当前线程
       		// setExclusiveOwnerThread是实现可重入锁的关键
           if (compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       // 当前状态为1,代表不能使用资源
       // 首先判断当前线程是否是独占资源的线程,如果是,那么仍然可以使用资源
       // 这里就是可重入的体现
       else if (current == getExclusiveOwnerThread()) {
       		// 更新对应的状态
           int nextc = c + acquires;
           if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       // 如果既不是独占资源的线程,当前也不能获取资源,那么返回false
       return false;
   }

	// 释放资源
   protected final boolean tryRelease(int releases) {
   		// 更新state
       int c = getState() - releases;
       // 释放资源的线程必须是当前独占资源的线程
       if (Thread.currentThread() != getExclusiveOwnerThread())
           throw new IllegalMonitorStateException();
       boolean free = false;
       // 如果资源量是0,那么代表当前线程不再独占资源
       // 因为存在重入的情况,所以c的值并不是0和1,还有可能是2,3,4,5等等
       if (c == 0) {
           free = true;
           setExclusiveOwnerThread(null);
       }
       // 更新状态
       setState(c);
       return free;
   }

   protected final boolean isHeldExclusively() {
       // While we must in general read state before owner,
       // we don't need to do so to check if current thread is owner
       return getExclusiveOwnerThread() == Thread.currentThread();
   }

   final ConditionObject newCondition() {
       return new ConditionObject();
   }

   // Methods relayed from outer class

   final Thread getOwner() {
       return getState() == 0 ? null : getExclusiveOwnerThread();
   }

   final int getHoldCount() {
       return isHeldExclusively() ? getState() : 0;
   }

   final boolean isLocked() {
       return getState() != 0;
   }

   /**
    * Reconstitutes the instance from a stream (that is, deserializes it).
    */
   private void readObject(java.io.ObjectInputStream s)
       throws java.io.IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
   }
}
           

公平的同步,套路就是在获取资源的时候,都会判断等待队列中是否有等待任务

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

   final void lock() {
       acquire(1);
   }

   /**
    * Fair version of tryAcquire.  Don't grant access unless
    * recursive call or no waiters or is first.
    */
   protected final boolean tryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       // c等于0代表资源可获取
       if (c == 0) {
       		// 没有等待任务,或者当前线程时等待队列中的下一个任务
       		// 如果成功获取资源,将当前线程设置为独占资源的线程
           if (!hasQueuedPredecessors() &&
               compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0)
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
   }
}
           

不公平的同步

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
    	// 不理会等待队列,直接尝试获取资源
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
           

下面我们看一下Node中都要哪些属性,这里用到的Node是AQS中的

volatile Node next;
volatile Thread thread;
Node nextWaiter;
volatile Node prev;
volatile int waitStatus;
           

首先我们先正确区分两个队列

第一个队列,是AQS中维护的等待使用资源的队列,这里我们称呼为sync队列,该队列维护的节点是等待竞争资源的线程,使用上面的next和prev属性,形成一个双向队列

第二队列,就是在Condition对象上调用await(),线程进入的等待队列,这里我们称为condition队列,该队列中维护的节点表示在某个Condition上等待的线程,需要被其他线程使用signal()唤醒,使用nextWaiter形成一个单向队列

另外我们这里再说一下waitStatus的取值,waitStatus代表Node的状态

CANCELLED:因为超时或者中断而取消的节点的状态

CONDITION:代表当前节点在condition队列中

SIGNAL:当前节点在队列中的后继节点将会被阻塞或者已经被阻塞了(使用park),所以当当前节点释放资源或者取消的时候,必须使用unpark方法唤醒后继节点

PROPAGATE:用于共享模式,用于唤醒之后的节点

其中CANCELLED,SIGNAL,PROPAGATE用于sync队列

CANCELLED,CONDITION用于condition队列

获取和释放

public void lock() {
    sync.lock();
}
           
public void unlock() {
    sync.release(1);
}
           

Condition

下面分析下Condition

Condition内部也会维护一个等待队列,会将所有在当前Condition上调用await方法的线程加入到等待队列中

首先放一张原理示意图

jdk1.8 ReentrantLock以及Condition分析
public Condition newCondition() {
   return sync.newCondition();
}
           

实际上调用的sync的newCondition方法,再看sync的newCondition方法

final ConditionObject newCondition() {
    return new ConditionObject();
}
           

从源码中可以知道ConditionObject实现了Condition接口

属性

// 等待队列中的第一个结点
private transient Node firstWaiter;
// 等待队列中的最后一个节点
private transient Node lastWaiter;
           

await

public final void await() throws InterruptedException {
   if (Thread.interrupted())
       throw new InterruptedException();
   // 删除队列中Cancelled状态的节点,并且将一个代表当前线程的节点添加到队列的末尾
   Node node = addConditionWaiter();
   // 释放当前线程获取到的ReentrantLock锁
   int savedState = fullyRelease(node);
   int interruptMode = 0;
   // 判断当前节点是否在aqs的等待队列中
   // 如果不在就阻塞并且就算被唤醒如果不在aqs的等待队列中也会接着阻塞
   // 如果在等待的过程被中断了,那么就会跳出循环
   // 如果加入到aqs的等待队列中了,也会跳出循环
   // 这里的流程是这样的
   // 当线程调用condition.await时,会加入到这个condition的condition 队列中,注意这个队列和aqs的等待队列是不同,condition 队列中的节点表示等待被其他线程唤醒的线程
   // 而aqs的等待队列中是等待获取锁的线程
   while (!isOnSyncQueue(node)) {
       LockSupport.park(this);
       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
           break;
   }
   // 下面的步骤就是竞争锁
   // 如果竞争到了锁,那么线程就会跳出await方法继续执行
   if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
       interruptMode = REINTERRUPT;
   if (node.nextWaiter != null) // clean up if cancelled
       unlinkCancelledWaiters();
   if (interruptMode != 0)
       reportInterruptAfterWait(interruptMode);
}
           
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 将队列中所有状态为Cancelled的节点从队列中删除,同时更新队列尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建一个代表当前线程的节点添加到等待队列的末尾
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
           

addConditionWaiter就是将当前线程封装成一个Node,然后加入到等待队列的末尾

这里的Node就是AQS中的Node,不了解的可以看之前关于AQS源码的分析

private void unlinkCancelledWaiters() {
	// 当前节点
    Node t = firstWaiter;
    // 上一个遍历的节点
    Node trail = null;
    // 从头往后遍历
    while (t != null) {
    	// 下一个节点
        Node next = t.nextWaiter;
        // 如果当前节点的状态是CANCELLED
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            // 如果前一个节点是Null,代表当前节点是头结点
            // 更新头结点为下一个节点
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
           

unlinkCancelledWaiters方法就是将队列中所有已经取消的任务从队列中删除

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 释放锁,因为能够执行到condition的await方法,之前一定获取到了ReentrantLock的锁
        // 这里底层调用的是ReentrantLock的tryRelease
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
           
final boolean isOnSyncQueue(Node node) {
	// 如果当前节点的状态是CONDITION或者当前节点是头结点,那么判断为不在同步队列中
	// prev是在sync队列中Node用来维护双向队列的
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // next不为null,代表当前节点在sync队列中有后继节点
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
     // 从aqs的等待队列中从后往前判断当前节点是否在aqs的等待队列中
     // 对于ReentrantLock来说,aqs就是它的属性sync
    return findNodeFromTail(node);
}
           

signal

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 从头遍历condition 等待队列
    // 将第一个没有取消的任务节点从conditiion 等待队列中删除,并且添加到aqs的等待队列中,代表该节点已经被其他线程唤醒,去竞争aqs的锁了
    // 每遇到一个取消的任务节点,就直接从等待队列中删除了
    if (first != null)
        doSignal(first);
}
           
private void doSignal(Node first) {
    do {
    	// 将condition 等待队列的头结点从队列中取出,令下一个节点为队列的头结点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
           
final boolean transferForSignal(Node node) {
   	// 如果无法更改节点的状态为0,那么代表节点对应的任务已经被取消了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
     // 将当前节点添加到aqs的等待队列的末尾,并且返回在aqs的等待队列中当前节点前面一个节点
     // 运行到这里该节点已经从condition的等待队列中移除了,并且该节点对应的任务并没有取消
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果在aqs队列中,当前节点的前一个节点取消了,或者将前一个节点的状态设置为SINGAL失败了,那么直接唤醒当前节点,节点被唤醒之后,就会执行await方法跳出循环之后竞争锁的代码
    // 如果之前的节点被取消或者不能设置为SINGAL,那么代表当前节点在aqs队列中不会被唤醒,所以我们就需要直接唤醒
    // 如果之前的节点的状态不是取消,那么就代表当前节点可以被之前的节点唤醒,所以我们就不用再这里进行唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}