天天看点

并发编程实践二:AbstractQueuedSynchronizerUnsafeAQS的运用同步状态AQS的主体流程等待队列阻塞机制 结束语

abstractqueuedsynchronizer,简称aqs,是java.util.concurrent包的synchronizer的基础框架,其它的synchronizer(包括lock、semaphore、countdownlatch、futuretask等)都是以它作为基础构建的,这篇文章我将对aqs的框架结构作出介绍,包括它对同步状态的管理,功能流程,等待队列的管理等,并涉及到一些实现的细节,但这里不涉及原理性的东西,原理将放到后面具体的实现类中去讲述。

这片文章采用从整体到局部的方式来讲述,从总体框架一步一步细化,但并不涉及所有代码,了解了这些后,你就可以自己阅读源码学习了。doug lea在论文《the java.util.concurrent synchronizer framework》中讲述了aqs的设计理念,有兴趣的可以看看,在这里()你可以看到。

由于文章过长,这篇文章没有涉及到conditionobject,我将在下一篇文章中讲述。

好了,接下来我们就开始了,从unsafe开始。

首先对用到的unsafe类的方法做一下说明:

aqs根据synchronizer的共同特征提供了一套基础框架,这些共同特征包括:一个或者多个acquire操作用于阻塞线程直到存在空闲锁允许线程通过,一个或者多个release操作释放一个或者多个锁使一个或者多个线程唤醒。我们可以使用伪码来表示这些操作,acqure操作可以表示为:

release的操作可以表示为:

下面通过一个实际的例子来看aqs是怎么被应用的,这个是java.util.concurrent.semaphore的一个简化版(只包含非公平设置的semaphore的代码),semaphore的作用是为某些资源的访问提供最大线程数目限制,你可以为它设置一个最大许可值permits,则permits个线程可以同时通过acquire调用,超出的将被阻塞,直到有线程释放锁(即调用release操作)。

semaphore的所有操作都是通过一个内部类来完成,这个内部类是aqs的子类,:

 1)为aqs设置了同步状态(即锁的数量);

 2)继承了aqs的tryacquireshared方法,该方法用于为acquire操作的线程获取锁,并返回剩下的锁的数量,如果剩下的锁为负数,则表示获取锁失败,线程将被阻塞;

 3)继承了aqs的tryreleaseshared方法,该方法用于为release操作的线程释放锁,并返回释放是否成功,true则表示成功,成功后会执行唤醒阻塞线程的操作。

下面看具体的代码:

mysemaphore展示了aqs的运用:同步状态的设置和管理、tryacquireshared和tryreleaseshared的实现,而mysemaphore的acquire和release操作都是通过使用aqs来实现的。当多个线程同时操作acquire和release时,aqs是怎么保证操作的正确性的呢?通过对aqs的内部机制的学习,你就会知道了,下面我们从同步状态的管理开始。

aqs中同步状态的申明和提供的操作方法如下:

同步状态是一个32位的整数,表示锁的数量,之所以使用一个32位整数,主要是考虑到一般情况下锁的数量不会需要那么多(见《the java.util.concurrent synchronizer framework》),子类可以通过aqs提供的方法获取和改变同步状态的值,而在tryacquireshared和tryreleaseshared中将使用同步状态来判断线程是否该阻塞和唤醒。

aqs提供了两种模式,独占模式和共享模式,对应的方法如下:

我们可以把独占模式看作共享模式的一个特例,即独占模式是将锁数量设置为1的共享模式。另外,aqs还提供了一组提供时间限制的方法:

这组就是在比上面的方法增加了一个时间的限制,当时间较短的情况下(小于等于1微秒)使用轮询的方式,否则采用阻塞方式。

我对流程的讲解将以共享模式为主,其它的有兴趣可以查看源代码。从宏观上来看,线程通过acquiresharedinterruptibly获取锁,操作完成后,并通过releaseshared来释放锁,我们先来看获取锁的操作:

线程操作完成后,释放锁的操作:

线程获取锁失败后(通过调用acquiresharedinterruptibly),将进入等待队列,然后进入阻塞状态(经过一些判断之后);当另一个线程释放锁后(通过调用releaseshared),将执行唤醒阻塞线程的操作;被唤醒的线程将把自己移出等待队列,并执行一些其它操作。可以说,操作都是围绕等待队列来做的,下面我们就来看看等待队列。

aqs中使用了一个fifo队列来管理等待线程,而底层锁的实现使用clh锁的一个变种,clh锁通常被用于自旋锁,我们可以通过一个例子看了解一下clh锁,可以帮助我们理解aqs的队列算法:

clhlock的思想就是使用一个锁队列,后面的线程反复查看前面线程的状态,如果状态为锁定,则自旋等待,否则通过。

采用队列的方式可以带来性能上的好处,在现代的处理器架构中,每个处理器自身都有一个cache,用于存储处理器感兴趣的数据,处理器从cache中获取数据的效率要远远高于从内存中获取数据,处理器之间通过总线进行通信,而总线是独占设备,也就是说,每次只能有一个处理器使用总线,基于这样的架构,为了提高性能,我们应该尽量使用本地cache,尽量避免通过总线存取数据,或者尽量少的通过总线存取数据。采用队列就可以带来这样的好处,每个线程修改自己的locked只会影响后续的线程,而其它线程不会受到影响,这样就只会有一个线程会因为数据的改变而去内存中取数据,减少了数据竞争,从而提高性能。

aqs的队列算法也是基于这样的理念,但实现要比clhlock复杂的多,为了尽量减少数据竞争,每个节点的状态只和它后续的节点相关,等待队列中的线程依次唤醒,减少获取锁的竞争。下面我们就开始详细讲述。

aqs使用了一个非阻塞队列来保存数据(如果想更多的了解无阻塞队列,可以参考我的“并发编程实践一”),我们先来看队列节点的定义:

nextwaiter表示了两种不同的模式:

 1)共享模式(shared,允许多个线程通过);

 2)排它模式(exclusive,只允许一个线程通过)。

我们的讲解只涉及到共享模式。

waitstatus表示了节点的状态:

 1)cancelled:节点对应的线程已经被取消;

 2)signal:节点的下一个节点的线程等待被唤醒;

 3)condition:节点对应的线程等待在condition队列中,在后面讲condition的时候会涉及到;

 4)propagate:该节点不需要处理,直接越过。

下面我们看看算法的主要流程:

 1)入队列:线程获取锁失败后,创建一个节点,并将节点添加到等待队列尾,然后将线程阻塞,等待唤醒;

 2)唤醒:另一个线程释放锁,取队列的第一个节点,将节点对应线程唤醒;

 3)出队列:唤醒后的线程将尝试获取锁,成功后将自己移出队列,同时判断是否任然存在空闲的锁,如果存在则继续唤醒下一个节点。

每次只会唤醒第一个节点,如果同时释放多个锁,后续的节点将由前面被唤醒的节点来唤醒,尽量减少数据竞争。

下面我们来看具体的代码。从总体流程中,线程通过acquiresharedinterruptibly请求锁,当尝试获取锁(tryacquireshared)失败后,将进入doacquiresharedinterruptibly处理:

入队列的操作是由addwaiter来完成的,它首先尝试入队列一次,失败后再在enq中循环尝试,直到成功:

节点进入等待队列后,如果节点的前续节点不是head,线程将在parkandcheckinterrupt中进入阻塞状态。

线程阻塞直到另一个线程调用releaseshared释放锁,然后在doreleaseshared中将把等待队列中的第一个节点唤醒:

线程被唤醒后,将重新尝试获取锁,成功后,将开始将自己移出队列,移出队列的代码在setheadandpropagate中:

这里执行唤醒操作和release时执行的操作一致,由于多个线程同时调用release操作的情况下,虽然释放了多个锁,但可能只会执行一次doreleaseshared的操作,这里就做了弥补,在队列的第一个线程被唤醒,获取锁后,将再次调用doreleaseshared唤醒下一个线程,一直往下,直到锁全部用完或者队列为空。这样就能做到线程一个一个唤醒,依次获取锁,尽量减少了数据竞争(如果有新增线程请求锁,任然会存在数据竞争,但这里减少了已阻塞线程的数据竞争)。

aqs的阻塞操作使用locksupport类,最终使用unsafe的park和unpark操作实现,如下:

aqs向我们充分展现了并发编程的复杂性,在多个线程的交互下,情况将变得非常复杂,你往往需要将整个流程作为一个整体来分析,因此建议和源代码结合来看这篇文章,充分考虑每一个交互的环节可能出现的问题。

这是我自己对代码的分析结果,给你作为学习参考,可能存在错误,发现问题麻烦给我留言,我将非常感谢。

谢谢。