天天看点

ReentrantLock使用和原理(AQS简述)

在开发过程中,synchronized是最简单的同步控制方法,在通常情况下是够用的,但synchronized时不响应中断,而且有时候,我们需要灵活的来控制加解锁。这时候可以使用ReentrantLock。

在以前的版本中,synchronized效率是远远低于ReentrantLock,后来经过优化,两者性能差距不大了。但ReentrantLock有一些新特性,是synchronized所不具备的。

1、接口

//Lock.java
/* 加锁 */
void lock();
//可响应中断
void lockInterruptibly() throws InterruptedException;
//尝试加锁,失败马上返回
boolean tryLock();
//尝试加锁,包含最大等待时间,最大等待时间范围内未加锁,返回。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解锁,注意不要忘,如果有异常,解锁应放在finally中
void unlock();
//获取锁的condition对象,然后做await,signal来实现等待和通知
Condition newCondition();
           

另外ReentrantLock可以选择是否使用公平锁,公平锁需要维持一个有序队列,按照请求锁的先后顺序来获得锁,因而效率较低,但好处在于不会产生饥饿现象,即一个线程,只要等待,必然能获取到锁。而非公平锁则不是这样,非公平锁除了随机获得锁以外,还有一个隐藏属性,即一个线程会倾向于再次获得已经持有的锁,这样的锁的分配方式比较高效。

再来说说Condition。

//Condition.java
//等待,可响应中断
void await() throws InterruptedException;
//等待,不响应中断
void awaitUninterruptibly();
//按照时间来等待,响应中断
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
//通知
void signal();
void signalAll();
           

2、原理

ReentrantLock使用内部类Sync来实现加解锁,Sync是AbstractQueuedSynchronizer(下面简称AQS)的子类。

AQS,人如其名,抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,是模板模式的典型应用,不光用在ReentrantLock中,也用在Semaphor/CountDownLatch等上。

AQS核心是一个共享资源(volatile int state;)和一个等待队列。共享资源相关的方法有3个。

//AbstractQueuedSynchronizer.java
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}`
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
           

队列是一个FIFO的双向队列,队列相关结构如下。

//注释

     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
//code
private transient volatile Node head;
private transient volatile Node tail;
           

AQS定义了2种资源共享的方式,EXCLUSIVE(独占,即只有一个线程能获取到该资源,比如ReentrantLock),SHARED(共享,多个线程可以共享该资源,Semephore/CountDownLatch),当然也可以兼而有之,比如ReentrantReadWriteLock。

等待队列的每一项为Node,核心结构如下

//AbstractQueueSynchronizer.java
//前一个Node和下一个Node
volatile Node prev;
volatile Node next;
//本Node持有的线程
volatile Thread thread;
//指定本Node的模式,标识共享/独占
Node nextWaiter;
//标识本Node的各种状态,比如被中断,下个节点需要通知,等等等等
volatile int waitStatus;
           

下面通过具体过程来阐述加锁的细节。

2-1、加锁

通常使用的都是非公平锁,我们以这个为例来说明

//ReentrantLock.java
private final Sync sync;
public ReentrantLock() {
        sync = new NonfairSync();//默认是非公平锁
}
public void lock() {
        sync.lock();
}
//NonfairSync内部类,非公平锁加锁执行本方法
final void lock() {
    if (compareAndSetState(, ))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire();
}
           

其中,compareAndSetState(0, 1),使用CAS来设置State为1,成功返回true,并执行

//AbstractOwnableSynchronizer.java
//设置当前执行的线程
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
           

此时表示加锁成功。如果state当时已经为1,即有其他线程已拿到锁,则compareAndSetState(0, 1)马上返回false表示失败。并执行如下代码。

//AbstractQueuedSychronizer.java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           

首先,继续尝试拿锁。

protected final boolean tryAcquire(int acquires) {
  return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == ) {
        //state为9表示可以加锁,马上CAS操作设置State为1
        if (compareAndSetState(, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果本线程已经拿到锁,state也不为0,此时,state+=1
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < ) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

上一步加锁成功,直接返回,失败则将本线程进行加队列操作。

private Node addWaiter(Node mode) {
        //创建当前线程的Node,独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //尝试使用快速方式增加节点到队尾
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
   private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
           

先使用快速方式增加节点,一次成功可以马上返回,失败则执行enq方法。

private Node enq(final Node node) {
       //经典的CAS自旋volatile变量
        for (;;) {
            Node t = tail;
            //队列为空,创建头结点,即尾节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            //CAS加入队尾,成功返回,失败继续自旋
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

加队列成功后,可以以等待状态休息了,直到其他线程释放资源后唤醒本线程,下面来看源码

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        //记录是否被中断
            boolean interrupted = false;
            //自旋拿锁,即拿state
            for (;;) {
                //队列前一个节点
                final Node p = node.predecessor();
                //如果前一个节点是head节点,则可以尝试进行拿锁,即CAS设置state,快速返回成功或失败。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //除非上面作为第二个节点拿到锁返回了,其他情况执行下面代码
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

如果本线程的Node作为队列中第二个节点拿到锁成功,否则会执行shouldParkAfterFailedAcquire,该函数其实是将Node往前插队,前面的Node可能因为各种原因已经死去(中断等等),直到找到确实在等待拿锁的,然后通过park进入waiting状态。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//前驱的状态
        if (ws == Node.SIGNAL)
            //如果已经告诉前驱拿到锁通知自己了,直接返回,可以马上休息
            return true;
        if (ws > ) {
            //其他情况,循环遍历队列,如果前驱放弃了,就继续往前找,直到找到正常的节点,并排在他后面。那些被放弃的节点,由于引用消失,后续会被GC掉。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > );
            pred.next = node;
        } else {
            //前驱状态正常,就设置前驱状态为SIGNAL,表示,前驱拿到锁后通知自己
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

上述函数如果返回成功,则执行park,直到被unpark或中断唤醒。这里特意注意下,如果park时收到中断,并不会抛异常,而是通过Thread.interrupted()获得中断标记,并清除掉中断标记。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
           

如果是中断,会在acquire中执行中断,整个拿锁流程如下图所示

1、入队-》2、是否二号&拿锁-》3、找正常前前节点-》4、park等待-》2
2-》成功拿锁/中断
           

公平锁的区别只有下面的获取锁的方法有区别

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == ) {
        //state为0可以加锁。hasQueuedPredecessors表示前面是否有Node,具体代码见下
        if (!hasQueuedPredecessors() &&
            compareAndSetState(, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < )
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

其中hasQueuedPredecessors表示是否前面有节点

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
           

另外还有一些tryLock(只使用乐观加锁尝试一次,直接返回)。 lockInteruptbly(基本流程相同,把最终的Thread.interupt换为throw InteruptException)

2-2、解锁

unlock流程如下,执行unlock表示已经拿到lock,因而不需要考虑线程安全的问题,直接将state-1即可,唯一需要注意的是多次lock需要多次unlock,这里要判断是否存在未完全释放资源的情况。

public void unlock() {
    sync.release();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
    Node h = head;//找到头结点
    if (h != null && h.waitStatus != )
        unparkSuccessor(h);//通知下一个Node
    return true;
}
return false;
}
           

首先执行tryRelease,每次都把state-=1;

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //
    if (c == ) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           

再来看unparkSuccessor

private void unparkSuccessor(Node node) {
        //清零当前node(头结点)的waitState,允许失败
        int ws = node.waitStatus;
        if (ws < )
            compareAndSetWaitStatus(node, ws, );

        Node s = node.next;//下一个节点
        if (s == null || s.waitStatus > ) {//为空或取消
            s = null;
            //从队列尾部往前找,waitStatus <0即为有效的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= )
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//对该节点进行unpark唤醒
    }
           

unlock的流程主要在于对下一个等待线程的通知上。

2-3 Condition.await

首先是lock.newCondition()

//ReentrantLock.java
final ConditionObject newCondition() {
   return new ConditionObject();
}
           

ConditionObject是AQS的内部类,主要结构如下

//双向等待队列的首节点
/** First node of condition queue. */
private transient Node firstWaiter;
//尾节点
/** Last node of condition queue. */
private transient Node lastWaiter;
           

关于await,基本猜想就是往上面的队列中加,然后阻塞等,基本逻辑跟lock换汤不换药。下面来验证我们的猜想。下面来看await的代码

//AbastractQueuedSynchronizer.java
public final void await() throws InterruptedException {
    if (Thread.interrupted())//先判断中断
        throw new InterruptedException();
    //加入等待队列,就是上面ConditionObject中的那个。
    Node node = addConditionWaiter();
    //释放锁将lock等待队列的下一个节点进行unpark通知
    int savedState = fullyRelease(node);
    int interruptMode = ;
    //循环判断是否在AQS的等待队列中,不在就进行park动作。之后不论是被unpark唤醒,还是中断,均会跳出次循环。
    while (!isOnSyncQueue(node)) {
        //这里的park是conditionObject的this,即只有signal或中断的会唤醒本线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != )
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != )
        reportInterruptAfterWait(interruptMode);
}
           

先看addConditionWaiter的代码,代码的意义在于加入等待队列。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果尾节点取消了等待,清除队列上无效的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //本节点添加到队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
           

看unlinkCancelledWaiters,其实就是遍历conditionObject的整个队列,将非等待状态的节点摘除。

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;//头结点
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        //不是等待节点,就将该节点摘链
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            //到尾部了
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        //往后遍历
        t = next;
    }
}
           

再看fullyRelease,其实就是unlock操作,并找出lock队列(AQS的队列)中下一个节点(下一个状态不正确, 会从头结点开始寻找)进行unpark通知

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != )
        unparkSuccessor(h);//参考unlock流程的unpark,这里不展开
    return true;
}
return false;
}
           

其中tryRelease取决于何种子类,对于ReentrantLock来说,就是state-1。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == ) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           

再回过头看下一个流程,判断是否在等待lock的等待队列中,即AQS的队列

final boolean isOnSyncQueue(Node node) {
    //正常等待condition节点可以进行unpark
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //其他时候需要下一个节点在AQS的等待队列中,即等待拿锁lock中,这里判断不在lock中,执行下一步动作
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //从尾部往前找,找到本节点返回true
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
           if (t == node)
               return true;
           if (t == null)
               return false;
           t = t.prev;
    }
}
           

回过头来梳理这个流程,简单来理解,就是入ConditionObject的队列进行park等待,直到被唤醒或中断,这两种都会跳出while循环。下面来看后面的流程。

这里刚从await的park中出来,要么被signal唤醒,要么被中断唤醒。下面会重新拿锁,并返回。

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != )
    reportInterruptAfterWait(interruptMode);
           

await的核心其实就是释放锁,并通过park等待signal。后面被唤醒时,再拿锁并返回。

2-4 Condition.signal

signal的核心是将线程从await的等待状态(park)中唤醒。

public final void signal() {
    //当前非本线程执行,抛异常
   if (!isHeldExclusively())
       throw new IllegalMonitorStateException();
   Node first = firstWaiter;
   if (first != null)
       doSignal(first);
}
protected final boolean isHeldExclusively() {
   return getExclusiveOwnerThread() == Thread.currentThread();
}
           

来看核心流程,通知Condition队列的头结点。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    //当前Condition状态置为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, ))
        return false;

    //节点入AQS锁的等待队列
    Node p = enq(node);
    int ws = p.waitStatus;
    //设置为Sinal状态,并进行unpark
    if (ws >  || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
           

signal的效果是将头结点加入Lock的等待队列,并通知那个线程启动。那个线程在执行await()会由于while (!isOnSyncQueue(node)) 而跳出循环。

signalAll会把ConditionObject队列中的所有节点都移入AQS的等待队列并唤醒他们。