天天看点

AQS类源码解读AQS类源码解读

AQS类源码解读

源码作者:Doug Lea

文章作者:我

之前写过有关AQS的源码流程图,但里面没有源码只有方法名,所以看的时候感觉很混乱,想必大家肯定也这么感觉,先谢谢大家没有吐槽我,但弟弟我知错必改,这次我把AQS类从共享锁和排他锁【获取锁】到共享锁和排他锁【释放锁】,从逻辑分析到源码解析都给大家安排的明明白白,由外到内一层层分析源码,

以下内容除源码,其他纯手打(包括所有注释)

,希望对大家有帮助哈!!!

首先了解AQS的Node内部类,AQS队列操作离不开对Node类的操作:

// AQS是一个双向链表,所以其中的节点都有自己的前驱和后继指针
static final class Node {
	// 节点类型 共享锁,如果创建共享锁节点,会创建一个空的Node节点
    static final Node SHARED = new Node();
    // 节点类型 排他锁,如果创建排他锁节点,会放一个null进入队列
    static final Node EXCLUSIVE = null;
    /* 
	     节点状态 取消状态,在释放节点或者出现中断操作时,会把节点置为这个状态,
	     一般遇到这种节点把这个节点的前驱后继指针指向null,便于垃圾回收
    */
    static final int CANCELLED =  1;
    // 节点状态 可被唤醒状态,这种状态是节点的正常状态,说明此节点可被前驱节点唤醒
    static final int SIGNAL    = -1;
    // 节点状态 存在于Condition队列状态,在Condition队列等待被唤醒的节点就是此状态
    static final int CONDITION = -2;
    // 节点状态 属于一个释放共享锁节点的过渡状态(下方有详解)
    static final int PROPAGATE = -3;
    // 节点等待状态 存放上述节点状态的容器
    volatile int waitStatus;
    // 存放前驱节点
    volatile Node prev;
    // 存放后继节点
    volatile Node next;
    /* 
	     存放节点中的任务 一般唤醒某个节点时,要确定这个节点的thread属性不为null,
	     因为头节点已经被唤醒了,所以头节点的这个属性是null
    */
    volatile Thread thread;
    // 存放后继节点(下方有详解)
    Node nextWaiter;
}
           
大家一定会问上面既然有了next属性,为什么还要有nextWaiter属性,这两个属性不一样吗?
  1. next属性用于AQS同步队列
  2. nextWaiter属性用于Condition条件队列
还有个需要注意的地方,代码中的waitStatus会有0状态,也就是waitStatus的默认状态,int类型默认就是0,所以如果一个节点没有干过什么事,那他的waitStatus就会是0了。
对于PROPAGATE状态:
因为在多线程环境少不了多个线程操作一个共享锁节点的情况,一个节点成功获取锁之后状态变为0,但如果发现其他线程已经在释放这个锁了,那就设置为这个状态,让后续线程能继续释放后继的共享锁,避免因多线程操作导致锁传递失败

acquireShared():获取共享锁

public final void acquireShared(int arg) {
	// 使用的是模板方法设计模式,这个会调用子类的具体实现方法来尝试获取共享锁
    if (tryAcquireShared(arg) < 0)
    	// 如果尝试获取共享锁失败,就进入此方法实现阻塞。
        doAcquireShared(arg);
}
           

doAcquireShared()

private void doAcquireShared(int arg) {
	// 创新新的共享节点并插入AQS队列
    final Node node = addWaiter(Node.SHARED);
    // 标识是否入队失败
    boolean failed = true;
    try {
    	// 标识是否被中断
        boolean interrupted = false;
        for (;;) {
        	// 拿到新节点的前驱节点快照
            final Node p = node.predecessor();
            if (p == head) {
            	/* 
	            	如果前驱节点是头节点,有可能到这个方法头节点正好释放锁,所以,
	            	调用子类的tryAcquireShared()方法尝试能不能拿到锁,
	            	根据不同子类的调用返回信号量
            	*/
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                	//如果拿到了锁,那就重新设置头节点并唤醒后继节点执行任务
                    setHeadAndPropagate(node, r);
                    //把原头节点的后继指针指向null,便于GC回收
                    p.next = null; 
                    // 如果中断标识为true,那就中断阻塞当前线程
                    if (interrupted)
                        selfInterrupt();
                    // 标识入队成功
                    failed = false;
                    return;
                }
            }
            /* 
	            如果新节点的前驱节点不是头节点,或者枪锁失败,
	            那判断新节点是不是要进入队列阻塞,如果需要阻塞那就通过中断阻塞节点
            */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 设置中断标识位
                interrupted = true;
        }
    } finally {
    	// 如果入队失败,那就消除该节点
        if (failed)
            cancelAcquire(node);
    }
}
           

addWaiter()

private Node addWaiter(Node mode) {
	// 新创建一个mode类型的节点,分为共享锁、排他锁类型
    Node node = new Node(Thread.currentThread(), mode);
    // 先存储AQS队列队尾节点的快照,用来防止其他线程执行时修改队列的队尾节点
    Node pred = tail;
    // (下方有详解)
    if (pred != null) {
    	// 如果存在队尾节点,就把新创建的共享锁的node结点的前驱指针指向队列的队尾结点
        node.prev = pred;
        /* 
	        为防止有其他现成已经修改了队尾节点了,
	        所以使用CAS来把队列队尾的结点由快照获取的pred修改为新创建的node
        */
        if (compareAndSetTail(pred, node)) {
        	// 如果CAS替换队尾节点成功,那就把原队尾结点pred的后继指针指向新创建的node节点
        	// 并返回新节点
            pred.next = node;
            return node;
        }
    }
    //节点入队死循环逻辑
    enq(node);
    return node;
}
           
if (pred != null)代码行解析:
  1. 这个if逻辑用到了一个【优化前置】的技术,继续往下看大家会看到这个if分支逻辑和enq方法的死循环逻辑的一部分是相同的。
  2. 如果没有线程竞争,那这个if逻辑中就可以把节点加入到队列,那就不用执行下面的enq的死循环方法,这样就能提高系统的执行效率。
  3. 后面代码会有很多用到了这种【优化前置】的技术,大家明白这种代码优化就好,也可以说是【空间换时间】的一种技术表现形式吧。

enq()

private Node enq(final Node node) {
    for (;;) {
    	// 先存储AQS队列队尾节点的快照
        Node t = tail;
        if (t == null) {
        	// 如果没有队尾节点,说明队列中没有结点,那就创建一个空节点并CAS设置为队首结点
            if (compareAndSetHead(new Node()))
            	/* 
	            	CAS成功后,因为队列现在只有一个节点,
	            	所以队首和队尾节点都设置为当前这个新的节点,并执行死循环逻辑
            	*/
                tail = head;
        } else {
        	/**
	        	如果队尾指针有值,说明当前队列不是空队列,
	        	那就把新创建的共享锁的node结点的前驱指针指向队列的队尾结点
        	*/
            node.prev = t;
            /* 
		        为防止有其他现成已经修改了队尾节点了,
		        所以使用CAS来把队列队尾的结点由快照获取的t修改为新创建的node
	        */
            if (compareAndSetTail(t, node)) {
            	// 如果CAS替换队尾节点成功,那就把原队尾结点t的后继指针指向新创建的node节点
        		// 并返回原队尾结点t
                t.next = node;
                return t;
            }
        }
    }
}
           

AQS队列是个双向列表,添加新节点时,有以下流程:

先将新节点的前驱指针指向原队尾节点。

然后再把原队尾节点的后继指针指向新节点。

setHeadAndPropagate()

private void setHeadAndPropagate(Node node, int propagate) {
    // 存放队列头节点的快照
    Node h = head; 
    // 将被唤醒的节点设置成队列的头节点
    setHead(node);
    if (propagate > 0 || 		// 如果传入的信号量大于0
    	h == null ||     		// 如果头节点为空
    	h.waitStatus < 0 ||		// 如果头节点状态不是取消状态
        (h = head) == null || 	// 如果头节点为空
        h.waitStatus < 0) {		// 如果头节点状态不是取消状态
        // 存放要唤醒的头节点的后继节点快照
        Node s = node.next;
        if (s == null || s.isShared())
        	// 如果后继节点为null或者后继节点是共享锁节点,那就唤醒共享锁节点
            doReleaseShared();
    }
}
           

doReleaseShared()

private void doReleaseShared() {
    for (;;) {
   		// 存放头节点快照
        Node h = head;
        /* 
	        如果队列中还有其他排队元素
	        (因为头节点不为空,并且头节点不是尾节点,说明队列现在不只有一个节点)
        */
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            /* 
	            如果头节点状态是正常状态,并且通过CAS把节点状态修改为已被唤醒状态,
	            如果修改失败,那就执行死循环逻辑
            */
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                // 如果修改成功,那就唤醒该头节点执行任务
                unparkSuccessor(h);
            }
            //如果发现该节点已经拿到锁并状态变为0了,那就CAS将该节点设置为PROPAGATE过渡状态
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                // 如果修改失败,那就执行死循环逻辑
                continue;                
        }
        // 如果头节点一直没有改变过 或者 队列为空 或者 队列就一个节点,那就停止唤醒。
        if (h == head)                   
            break;
    }
}
           

unparkSuccessor()

private void unparkSuccessor(Node node) {
	// 获取到要唤醒节点的当前状态
    int ws = node.waitStatus;
    // 如果当前状态不是取消状态,说明正常,那就把状态修改为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
	// 获取要唤醒节点的后继节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        /* 
	        如果后继节点为null或者后继节点是取消状态,
	        那就在队列从后往前找要唤醒节点的有效后继节点,
	        并把该节点赋给刚才存放后继节点的容器s
        */
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果找到了有效的后继节点,那就使用LockSupport唤醒该后继节点中的线程对象
    if (s != null)
        LockSupport.unpark(s.thread);
}
           

shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	/* 
		获取前驱节点的状态,如果是SIGNAL,
		说明还没有被该前驱节点的前驱节点唤醒,那就需要阻塞
	*/
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    // 如果前驱节点是消除状态
    if (ws > 0) {
    	// 	在队列中从后往前寻找有效的前驱节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //将有效的前驱节点的后继指针指向要阻塞的节点
        pred.next = node;
    } else {
    	/* 
	    	如果前驱节点不是取消状态,不是等待被唤醒状态,在AQS里面就剩下PROPAGATE状态了,
	    	那就把这种过渡状态设置为SIGNAL可被唤醒状态,
	    	node就可以等待被唤醒了
    	*/
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 不让节点阻塞
    return false;
}
           

parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
	// 阻塞该线程所在的对象
    LockSupport.park(this);
    // 判断当前线程是否被中断(下方有详解)
    // 此方法会判断指定线程是不是已经被中断,并会重置中断状态为false
    return Thread.interrupted();
}
           
唤醒节点的方式:
  1. unpark(正常唤醒)
  2. interrupt(中断唤醒)

cancelAcquire()

private void cancelAcquire(Node node) {
    if (node == null)
        return;
    // 先将要消除节点的thread属性设置为null
    node.thread = null;
    Node pred = node.prev;
    // 如果要消除节点的前驱节点是CANCELLED状态那就在队列从该节点往前找有效的前驱节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 拿到有效前驱节点的后继指针
    Node predNext = pred.next;
    // 将要消除节点的状态设置为CANCELLED状态
    node.waitStatus = Node.CANCELLED;

    // 如果要消除节点是尾节点,那就CAS把尾节点设置为有效的前驱节点
    if (node == tail && compareAndSetTail(node, pred)) {
    	// 修改尾节点成功后,将新尾节点的后继指针设置为null
        compareAndSetNext(pred, predNext, null);
    } else {
    	// 如果要消除节点不是尾节点,或者CAS尾节点失败
        int ws;
        // 有效前驱节点不是头节点
        if (pred != head &&				
        	// 有效前驱节点状态正常						
            ((ws = pred.waitStatus) == Node.SIGNAL ||		
            // 有效前驱节点状态是PROPAGATE或者0并CAS为SIGNAL
             (ws <= 0 && 						
             compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
             //有效前驱节点线程任务不为null
            pred.thread != null) {
            // 总结起来就是说,如果有效的前驱节点是一个不是头结点的正常可被唤醒的节点
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
            	/*
	            	如果要消除节点的后继节点不是消除状态,
	            	那就CAS把有效前驱节点的后继指针指向要消除节点的后继节点
            	*/
                compareAndSetNext(pred, predNext, next);
        } else {
        	// 如果要消除节点是头节点,那就唤醒该节点的后继节点
            unparkSuccessor(node);
        }
		// 消除节点操作完成后,把已消除节点的后继指针指向自己,便于GC垃圾回收
        node.next = node;
    }
}
           

acquire():获取排他锁

public final void acquire(int arg) {
	// 子类尝试获取排他锁
    if (!tryAcquire(arg) &&
    	// 如果获取排他锁失败,那就新增排他锁节点并进行入队操作,并判断是否中断阻塞
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // (下方有详解)
        selfInterrupt();
}
           
selfInterrupt() 方法:
  1. 如果acquireQueued()判断当前状态是已中断状态,那肯定是通过Thread.interrupted()方法拿到的中断状态,Thread.interrupted()之后会重置中断状态,所以需要再通过selfInterrupt()方法来中断线程。

addWaiter()

在上面内容添加共享锁节点时用过此方法,这两个方法的唯一区别就是添加的锁类型不同。

acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
	// 设置入队失败标识
    boolean failed = true;
    try {
    	// 中断标识设置为false
        boolean interrupted = false;
        for (;;) {
        	// 获取新增结点的前驱节点
            final Node p = node.predecessor();
            // 如果新增节点的前驱节点是头结点并获取锁成功
            if (p == head && tryAcquire(arg)) {
            	// 设置新节点为头结点
                setHead(node);
                // 把原头结点后继指针指向null,便于垃圾回收
                p.next = null; 
                // 标识修改为入队成功
                failed = false;
                // 返回中断标识,表明可被直接唤醒,不需要阻塞
                return interrupted;
            }
            // 如果不是头结点或者获取锁失败,那就判断是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
            	// 如果需要阻塞就使用LockSupport进行阻塞并判断是否已被中断
                parkAndCheckInterrupt())
                // 如果需要阻塞并当前已被中断,那就修改中断标志位
                interrupted = true;
        }
    } finally {
    	// 如果入队失败就消除新节点,上面的获取共享锁源码有讲解cancelAcquire方法
        if (failed)
            cancelAcquire(node);
    }
}
           

releaseShared():释放共享锁

public final boolean releaseShared(int arg) {
	// 子类尝试释放共享锁成功,那就执行释放AQS节点逻辑
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
           

doReleaseShared()

private void doReleaseShared() {
    for (;;) {
    	// 获取头节点快照
        Node h = head;
        // 如果队列中有正在阻塞的节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果头节点当前状态是SIGNAL,那说明还没有被唤醒
            if (ws == Node.SIGNAL) {
            	// CAS修改头节点状态为0,如果失败就走死循环逻辑
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                // 如果CAS成功,就释放头结点,此方法在上面有讲解
                unparkSuccessor(h);
            }
            // 如果当前状态是0,那就CAS修改为PROPAGATE过渡状态,失败就进入死循环逻辑
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        // 如果头节点一直没有改变过 或者 队列为空 或者 队列就一个节点,那就停止唤醒。
        if (h == head)                   
            break;
    }
}
           

release():释放排他锁

public final boolean release(int arg) {
	// 子类尝试释放排他锁成功
    if (tryRelease(arg)) {
    	// 如果当前队列头节点不为null并且还未被释放,那就执行唤醒逻辑
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	// 此方法在上面已经讲解过了
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           
讲完AQS之后,有两个问题:
  1. JUC中的锁类,不是都有公平锁和非公平锁之分吗?
  2. 在这个AQS源码里面怎么没有提现公平和非公平呢?
这两个问题我当时就有想过,不知道大家有没有,我一个个给大家讲解一下吧
  1. JUC中有常用锁类Semaphore、CountDownLatch、CyclicBarrier、ReentrantLock、ReentrantReadWriteLock等,这些锁类(CyclicBarrier除外)的构造器都有是否公平的标识参数。
  2. JUC锁类(CyclicBarrier除外)中都有一个继承于AQS类的Sync内部类,以上我讲的AQS源码有tryRelease()、tryReleaseShared()、tryAcquire()、tryAcquireShared()这些方法在AQS类内部用的是模板方法设计模式,是都没有实现的,都是直接抛一个UnsupportedOperationException(没有实现方法)的异常,这些方法就是在JUC各种锁类中的Sync内部类中实现的,里面就有对公平锁和非公平锁的具体判断。

我感觉这个源码解析配上我上一期的【AQS核心流程源码图像解读】,应该就比较易懂了,大家可以试试。

AQS还有一个内部类是ConditionObject类,是Condition类的子类,Condition也是一个队列,但底层是一个单向链表,对与【Condition类及其子类的源码讲解】,我就放到下期了,内容会包含Condition和ConditionObject类详解。

对与【JUC常用锁类的源码讲解】,我就放到下下期了,大家先消化消化AQS的源码,JUC锁类的拿锁和放锁操作都是基于AQS的。

本篇博客要是有什么技术问题,欢迎大家来指正。

如果大家有什么不理解的,也欢迎大家评论,我很高兴和大家一起讨论技术问题。