天天看点

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

AQS介绍

AQS是同步锁内实现同步的共同父类,如下UML图能看出,ReentrantLock等锁都是基于AQS。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

下面主要介绍独占锁(Exclusive)的实现。

(本文参照网上资料和jdk1.8源码加上自己理解,可能会有一些论述错误的地方,望谅解并指正)

一、背景知识

AQS内部会维护一个双向链表,来存储争取锁的线程。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

(图片来自网络)

Node节点的属性除了nextWaiter都是volatile的。

第二点,解释一下节点的两种状态,独占和共享: Exclusive(独占,只有一个线程能执行,如ReentrantLock),Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

AQS存在一个属性为volatile的state,这个state在不同的实现上可以有不同的含义。

  • ReentrantLock的state表示是否已锁定,当调用lock时,state就会加一(可重入),unlock就会减一。
  • CountdownLatch的state表示倒计时的个数,每次countDown会减一。

第三点,线程的中断是个什么?

Java中断机制是一种协作机制,也就是说通过中断并不能直接终止另一个线程,而需要被中断的线程自己处理中断。这好比是家里的父母叮嘱在外的子女要注意身体,但子女是否注意身体,怎么注意身体则完全取决于自己。这并不是使线程进入阻塞状态。

第四点 waitStatus的状态

源码里面说了很大一段,总结成下面几个意思。

注意,status表示的都是当前节点后面的节点状态

共有以下五种状态:

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒,也就是说后继节点被阻塞了。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):传播。共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。

二、AQS的加锁方法

2.1、acquire(int arg)

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。比如ReentrantLock的lock方法就会调用acquire。

public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           
  • tryAcquire会去获取锁,如果成功就返回true,否则返回false。
  • addWaiter会把当前线程加入队列的尾部,并且是Exclusive。
  • acquireQueued用于获取资源,如果线程被中断过返回true,此时就会执行下面的selfInterrupt()。

以ReentrantLock为例:

//FairSync
final void lock() {
            acquire(1);
        }
           

直接调用acquire。

2.2、tryAcquire(int acquires)

AQS并没有具体的实现tryAcquires,它让子类自行决定实现的方式。

//aqs并没有实现
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
           
这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。

以ReentrantLock的tryAcquire为例。

//FairSync调用的是父类Sync的tryAcquire
//ReentrantLock,lock传入参数qcquires=1
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //ReentrantLock的state表示上锁的次数
            //0表示未上锁,如果没上锁,cas上锁,然后把独占线程设置为当前的
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前线程就是那个独占的,也就是之前上锁的,现在是重入锁,state(c)加1(acquires)
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //获取不到锁,返回false
            return false;
        }
           

2.3、addWaiter(Node mode)

private Node addWaiter(Node mode) {
    //mode模式有独占和共享两种
    //Node.EXCLUSIVE for exclusive, Node.SHARED for shared
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 先试试最快速的直接插到队尾,如果失败,走下面的enq
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
           

2.4、enq(final Node node)

能看出是CAS自旋加入队尾。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize,队列为空,还没初始化,需要初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
                // 初始化之后还会再进行一遍循环
                //这个链表的创建方式是先创建了一个伪头节点,然后从下一个节点开始才是真正的node
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

2.5、acquireQueued(final Node node, int arg)

tryAcquire失败,addWaiter加入队列。那么下一步就应该是让他休息,等待别人唤醒他。

注意,正常情况下,该方法会一直阻塞,直到拿到锁才会返回(parkAndCheckInterrupt方法就是干这个的),但是如果抛出异常,就会移除节点,继续上抛异常。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//标记是否成功拿到资源
        try {
            boolean interrupted = false;//标记是否被中断过
            //注意自旋
            for (;;) {
                final Node p = node.predecessor();//拿到前驱节点
                if (p == head && tryAcquire(arg)) {
                    // 如果前一个节点是head,head是伪头结点,那么当前节点就是真正地第一个节点
                    // 再tryAcquire拿到资源,才能进入这里
                    setHead(node);
                    p.next = null; // help GC
                    // setHead把prev断了,现在要让head的next断掉,方便gc
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //没得拿到资源,那应该让他wait了
                    // 如果被中断过,要修改状态
                    interrupted = true;
            }
        } finally {
            if (failed)
            // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
                cancelAcquire(node);
        }
    }
           

先看shouldParkAfterFailedAcquire和parkAndCheckInterrupt。在这之前,建议了解中断什么意思,开篇就提到了。(本人到这里时候已经被中断整蒙了,所以去查了一下,写在了开头)

2.5.1、 shouldParkAfterFailedAcquire(Node,Node)

检查是否可以进入waiting状态。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // pred前驱节点和node当前节点
    // 当前节点的状态保存在前驱节点当中
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             // 前驱节点已经是signal,也就是前驱节点释放时会通知自己,可以安全的park
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             // 大于0只有一个状态,为cancelled。
             //前驱节点取消了,那就顺着链向前找到一个非cancelled的状态(正常等待),然后排在他后面
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //如果前驱正常,那就把前驱的状态设置成SIGNAL,因为前驱节点可能是new状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

总之,前一个节点必须是signal,自己才能去休息。

2.5.2、 parkAndCheckInterrupt()

检查过可以休息了,现在真正要去休息了。此方法会让线程真正的waiting。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
           

park会进入waiting状态。此时,只有unpark和interrupt可以唤醒他。

2.5.3、小结

acquiredQueued()分为下面几步:

  • 节点进入队尾,找到一个安全休息点
  • 调用park进入waiting,只有unpark和interrupt能唤醒
  • 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1

2.6.总结

再回到acquire()

public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           
  • 先调用tryAcquire尝试获取,成功就直接返回了
  • 如果获取失败,需要加入队列,调用addWaiter
  • acquireQueued让线程休息,轮到自己获取,并且获取到资源后才返回,如果被中断过,返回true,否则是false
  • 线程在等待过程中被中断过,不响应,只有在获取资源后,才进行selfInterrupt,把中断补上

下面是流程图

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

(图源网络)

三、解锁

3.1、 release(int arg)

与acquire对应,release是解锁时候会调用的。以ReentrantLock为例:

public void unlock() {
        sync.release(1);
    }
           

下面是release源码:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //tryRelease需要子类实现
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

用tryRelease释放锁,tryRelease需要实现AQS的类自己重写。

3.2、 tryRelease(int arg)

AQS的tryRelease同tryAcquire一样都只是抛出异常。下面以ReentrantLock的具体实现为例。

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
           

ReentrantLock的state是加锁的层数,传入参数releases是1,所以第一行把锁的层数减一。如果c为0,说明没有锁了,返回true,并且设置当前的独占线程是null。

3.3、 unparkSuccessor(Node)

唤醒当前节点的后继节点。

private void unparkSuccessor(Node node) {
    //node是当前节点
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)//更新head状态为0,head,状态为0说明下一个可以被唤醒了
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;//下一个要唤醒的节点
        if (s == null || s.waitStatus > 0) {
            //如果为空或者被取消了
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//从后向前找
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒
    }
           

unpark会唤醒等待队列中最靠前的,未被取消的线程。

3.4、 继续acquireQueued

另一个线程被唤醒之后,会从acquireQueued的阻塞位置恢复。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // 2.在这里tryAcquire能够获取到锁了,所以进来
                    setHead(node);
                    // 3.把当前节点设置为头结点
                    p.next = null; // help GC
                    failed = false;
                    // 4.返回
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
                // 1.从这里恢复,然后继续循环
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

四、示例

假设ABC三个线程竞争,以ReentrantLock的公平锁为例,下面是队列的变化关系:

step1

假设最开始A线程先获取锁,没有发生竞争,不需要等待,所以队列此时是空的。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

step2

现在B线程来获取锁,发现state是1,即已经加锁了。AQS会先创建一个伪头节点,然后把B进入队列。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

step3

最后C线程获取,同样需要排队。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

step4

A线程释放锁,会调用unparkSuccessor()把头结点的waitStatus置0,表示后续节点可以为唤醒了。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

step5

B开始执行。首先tryAcquire能获取到锁,setHead(B),并把B的waitStatus设为-1,表示下一个被阻塞。此时队列状态如下:

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

step6

B释放锁,unparkSuccessor()唤醒首节点。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

step7

C获取锁同理。

AQS源码解析AQS介绍一、背景知识二、AQS的加锁方法三、解锁四、示例五、总结

step8

C释放锁,什么也不用做,因为C是最后一个节点,C离开之后,队列为空。

五、总结

对于独占锁来说,自己实现自定义锁继承AQS基本只用实现tryAcquire和tryRelease两个方法(还有isHeldExclusively(),用condition时候才必须重写)。这两个方法内容是加锁和删除锁时候的逻辑部分,交由程序员自己编写。其他的对队列的方法,AQS都已经实现完了,可以直接拿来用。并且,AQS实现的队列是一种双向队列,当前节点的状态由前一节点保持,而且还利用一个伪头节点。

本篇文章为记录自己学习AQS的过程,如有不对,还请指正

参考文章:

https://segmentfault.com/a/1190000015804888

https://www.cnblogs.com/waterystone/p/4920797.html

https://blog.csdn.net/sscout/article/details/102616722