天天看点

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

     在开始解读aqs的共享功能前,我们再重温一下countdownlatch,countdownlatch为java.util.concurrent包下的计数器工具类,常被用在多线程环境下,它在初始时需要指定一个计数器的大小,然后可被多个线程并发的实现减1操作,并在计数器为0后调用await方法的线程被唤醒,从而实现多线程间的协作。它在多线程环境下的基本使用方式为:

     注意,线程thread 1,2,3各自调用 countdown后,countdownlatch 的计数为0,await方法返回,控制台输入“over”,在此之前main thread 会一直沉睡。

      可以看到countdownlatch的作用类似于一个“栏栅”,在countdownlatch的计数为0前,调用await方法的线程将一直阻塞,直到countdownlatch计数为0,await方法才会返回,

     而countdownlatch的countdown()方法则一般由各个线程调用,实现countdownlatch计数的减1。

      知道了countdownlatch的基本使用方式,我们就从上述demo的第一行new countdownlatch(3)开始,看看countdownlatch是怎么实现的。     

     首先,看下countdownlatch的构造方法:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

     和reentrantlock类似,countdownlatch内部也有一个叫做sync的内部类,同样也是用它继承了aqs。

     再看下sync:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

     如果你看过本系列的上半部分,你对setstate方法一定不会陌生,它是aqs的一个“状态位”,在不同的场景下,代表不同的含义,比如在reentrantlock中,表示加锁的次数,在countdownlatch中,

    则表示countdownlatch的计数器的初始大小。

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

    设置完计数器大小后countdownlatch的构造方法返回,下面我们再看下countdownlatch的await()方法:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

    调用了sync的acquiresharedinterruptibly方法,因为sync是aqs子类的原因,这里其实是直接调用了aqs的acquiresharedinterruptibly方法:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

    从方法名上看,这个方法的调用是响应线程的打断的,所以在前两行会检查下线程是否被打断。接着,尝试着获取共享锁,小于0,表示获取失败,通过本系列的上半部分的解读,

   我们知道aqs在获取锁的思路是,先尝试直接获取锁,如果失败会将当前线程放在队列中,按照fifo的原则等待锁。

    而对于共享锁也是这个思路,如果和独占锁一致,这里的tryacquireshared应该是个空方法,留给子类去判断:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

    再看看countdownlatch:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

     如果state变成0了,则返回1,表示获取成功,否则返回-1则表示获取失败。

     看到这里,读者可能会发现, await方法的获取方式更像是在获取一个独占锁,那为什么这里还会用tryacquireshared呢?

     回想下countdownlatch的await方法是不是只能在主线程中调用?答案是否定的,countdownlatch的await方法可以在多个线程中调用,当countdownlatch的计数器为0后,调用await的方法都会依次返回。

     也就是说可以多个线程同时在等待await方法返回,所以它被设计成了实现tryacquireshared方法,获取的是一个共享锁,锁在所有调用await方法的线程间共享,所以叫共享锁。

    回到acquiresharedinterruptibly方法:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

   如果获取共享锁失败(返回了-1,说明state不为0,也就是countdownlatch的计数器还不为0),进入调用doacquiresharedinterruptibly方法中,按照我们上述的猜想,应该是要将当前线程放入到队列中去。

  在这之前,我们再回顾一下aqs队列的数据结构:aqs是一个双向链表,通过节点中的next,pre变量分别指向当前节点后一个节点和前一个节点。其中,每个节点中都包含了一个线程和一个类型变量:表示当前节点是独占节点还是共享节点,头节点中的线程为正在占有锁的线程,而后的所有节点的线程表示为正在等待获取锁的线程。如下图所示:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

黄色节点,表示正在获取锁的节点,剩下的蓝色节点(node1、node2、node3)为正在等待锁的节点,他们通过各自的next,pre变量分别指向前后节点,形成了aqs中的双向链表。 

    再看看doacquiresharedinterruptibly方法:

这里有几点需要说明的:

 1. setheadandpropagate方法:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

    首先,使用了cas更换了头节点,然后,将当前节点的下一个节点取出来,如果同样是“shared”类型的,再做一个”releaseshared”操作。看下doreleaseshared方法:

  为什么要这么做呢?这就是共享功能和独占功能最不一样的地方,对于独占功能来说,有且只有一个线程(通常只对应一个节点,拿reentantlock举例,如果当前持有锁的线程重复调用lock()方法,

那根据本系列上半部分我们的介绍,我们知道,会被包装成多个节点在aqs的队列中,所以用一个线程来描述更准确),能够获取锁,但是对于共享功能来说。

共享的状态是可以被共享的,也就是意味着其他aqs队列中的其他节点也应能第一时间知道状态的变化。因此,一个节点获取到共享状态流程图是这样的:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

     比如现在有如下队列:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

     当node1调用tryacquireshared成功后,更换了头节点:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

     node1变成了头节点然后调用unparksuccessor()方法唤醒了node2,node2中持有的线程a出于上面流程图的park node的位置,

     线程a被唤醒后,重复黄色线条的流程,重新检查调用tryacquireshared方法,看能否成功,如果成功,则又更改头结点,重复以上步骤,以实现节点自身获取共享锁成功后,唤醒下一个共享类型结点的操作,实现共享状态的向后传递。

 2.其实对于doacquireshared方法,aqs还提供了集中类似的实现:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

 分别对应了:

 1. 带参数请求共享锁。 (忽略中断)

 2. 带参数请求共享锁,且响应中断。(每次循环时,会检查当前线程的中断状态,以实现对线程中断的响应)

 3. 带参数请求共享锁但是限制等待时间。(第二个参数设置超时时间,超出时间后,方法返回。)

比较特别的为最后一个doacquiresharednanos方法,我们一起看下它怎么实现超时时间的控制的。

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

因为该方法和其余获取共享锁的方法逻辑是类似的,我用红色框圈出了它所不一样的地方,也就是实现超时时间控制的地方。

可以看到,其实就是在进入方法时,计算出了一个“deadline”,每次循环的时候用当前时间和“deadline”比较,大于“dealine”说明超时时间已到,直接返回方法。

注意,最后一个红框中的这行代码:

    nanostimeout > spinfortimeoutthreshold

从变量的字面意思可知,这是拿超时时间和超时自旋的最小阀值作比较,在这里doug lea把超时自旋的阀值设置成了1000ns,即只有超时时间大于1000ns才会去挂起线程,否则,再次循环,以实现“自旋”操作。这是“自旋”在aqs中的应用之处。

看完await方法,我们再来看下countdown()方法:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

调用了aqs的releaseshared方法,并传入了参数1:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

同样先尝试去释放锁,tryreleaseshared同样为空方法,留给子类自己去实现,以下是countdownlatch的内部类sync的实现:

深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

死循环更新state的值,实现state的减1操作,之所以用死循环是为了确保state值的更新成功。

从上文的分析中可知,如果state的值为0,在countdownlatch中意味:所有的子线程已经执行完毕,这个时候可以唤醒调用await()方法的线程了,而这些线程正在aqs的队列中,并被挂起的,

所以下一步应该去唤醒aqs队列中的头结点了(aqs的队列为fifo队列),然后由头节点去依次唤醒aqs队列中的其他共享节点。如果tryreleaseshared返回true,进入doreleaseshared()方法:

当线程被唤醒后,会重新尝试获取共享锁,而对于countdownlatch线程获取共享锁判断依据是state是否为0,而这个时候显然state已经变成了0,因此可以顺利获取共享锁并且依次唤醒aqs队里中后面的节点及对应的线程。

     本文从countdownlatch入手,深入分析了aqs关于共享锁方面的实现方式:

     如果获取共享锁失败后,将请求共享锁的线程封装成node对象放入aqs的队列中,并挂起node对象对应的线程,实现请求锁线程的等待操作。待共享锁可以被获取后,从头节点开始,依次唤醒头节点及其以后的所有共享类型的节点。实现共享状态的传播。这里有几点值得注意:

1.     与aqs的独占功能一样,共享锁是否可以被获取的判断为空方法,交由子类去实现。

2.     与aqs的独占功能不同,当锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒aqs队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。

以上的分析都是从aqs子类的角度去看待aqs的部分功能的,而如果直接看待aqs,或许可以这么去解读:

首先,aqs并不关心“是什么锁”,对于aqs来说它只是实现了一系列的用于判断“资源”是否可以访问的api,并且封装了在“访问资源”受限时将请求访问的线程的加入队列、挂起、唤醒等操作, aqs只关心“资源不可以访问时,怎么处理?”、“资源是可以被同时访问,还是在同一时间只能被一个线程访问?”、“如果有线程等不及资源了,怎么从aqs的队列中退出?”等一系列围绕资源访问的问题,而至于“资源是否可以被访问?”这个问题则交给aqs的子类去实现。

当aqs的子类是实现独占功能时,例如reentrantlock,“资源是否可以被访问”被定义为只要aqs的state变量不为0,并且持有锁的线程不是当前线程,则代表资源不能访问。

当aqs的子类是实现共享功能时,例如:countdownlatch,“资源是否可以被访问”被定义为只要aqs的state变量不为0,说明资源不能访问。这是典型的将规则和操作分开的设计思路:规则子类定义,操作逻辑因为具有公用性,放在父类中去封装。当然,正式因为aqs只是关心“资源在什么条件下可被访问”,所以子类还可以同时使用aqs的共享功能和独占功能的api以实现更为复杂的功能。

比如:reentrantreadwritelock,我们知道reentrantreadwritelock的中也有一个叫sync的内部类继承了aqs,而aqs的队列可以同时存放共享锁和独占锁,对于reentrantreadwritelock来说分别代表读锁和写锁,当队列中的头节点为读锁时,代表读操作可以执行,而写操作不能执行,因此请求写操作的线程会被挂起,当读操作依次推出后,写锁成为头节点,请求写操作的线程被唤醒,可以执行写操作,而此时的读请求将被封装成node放入aqs的队列中。如此往复,实现读写锁的读写交替进行。

而本系列文章上半部分提到的futuretask,其实思路也是:封装一个存放线程执行结果的变量a,使用aqs的独占api实现线程对变量a的独占访问,判断规则是,线程没有执行完毕:call()方法没有返回前,不能访问变量a,或者是超时时间没到前不能访问变量a(这就是futuretask的get方法可以实现获取线程执行结果时,设置超时时间的原因)。

综上所述,本系列文章从aqs独占锁和共享锁两个方面深入分析了aqs的实现方式和独特的设计思路,希望对读者有启发,下一篇文章,我们将继续jdk 1.8下 j.u.c (java.util.concurrent)包中的其他工具类,敬请期待。