转自:http://blog.csdn.net/chen77716/article/details/6641477
在java.util.concurrent.locks包中有很多lock的实现类,常用的有reentrantlock、readwritelock(实现类reentrantreadwritelock),其实现都依赖java.util.concurrent.abstractqueuedsynchronizer类,实现思路都大同小异,因此我们以reentrantlock作为讲解切入点。
经过观察reentrantlock把所有lock接口的操作都委派到一个sync类上,该类继承了abstractqueuedsynchronizer:
static abstract class sync extends abstractqueuedsynchronizer
sync又有两个子类:
final static class nonfairsync extends sync
final static class fairsync extends sync
显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁。
先理一下reentrant.lock()方法的调用过程(默认非公平锁):
这些讨厌的template模式导致很难直观的看到整个调用过程,其实通过上面调用过程及abstractqueuedsynchronizer的注释可以发现,abstractqueuedsynchronizer中抽象了绝大多数lock的功能,而只把tryacquire方法延迟到子类中实现。tryacquire方法的语义在于用具体子类判断请求线程是否可以获得锁,无论成功与否abstractqueuedsynchronizer都将处理后面的流程。
简单说来,abstractqueuedsynchronizer会把所有的请求线程构成一个clh队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态,经过调查线程的显式阻塞是通过调用locksupport.park()完成,而locksupport.park()则调用sun.misc.unsafe.park()本地方法,再进一步,hotspot在linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。
该队列如图:
与synchronized相同的是,这也是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。令人疑惑的是为什么采用clh队列呢?原生的clh队列是用于自旋锁,但doug lea把其改造为阻塞锁。
当有线程竞争锁时,该线程会首先尝试获得锁,这对于那些已经在队列中排队的线程来说显得不公平,这也是非公平锁的由来,与synchronized实现类似,这样会极大提高吞吐量。
如果已经存在running线程,则新的竞争线程会被追加到队尾,具体是采用基于cas的lock-free算法,因为线程并发对tail调用cas可能会导致其他线程cas失败,解决办法是循环cas直至成功。abstractqueuedsynchronizer的实现非常精巧,令人叹为观止,不入细节难以完全领会其精髓,下面详细说明实现过程:
nonfairtryacquire方法将是lock方法间接调用的第一个方法,每次请求锁时都会首先调用该方法。
final boolean nonfairtryacquire(int acquires) {
final thread current = thread.currentthread();
int c = getstate();
if (c == 0) {
if (compareandsetstate(0, acquires)) {
setexclusiveownerthread(current);
return true;
}
}
else if (current == getexclusiveownerthread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new error("maximum lock count exceeded");
setstate(nextc);
return true;
return false;
}
该方法会首先判断当前状态,如果c==0说明没有线程正在竞争该锁,如果不c !=0 说明有线程正拥有了该锁。
如果发现c==0,则通过cas设置该状态值为acquires,acquires的初始调用值为1,每次线程重入该锁都会+1,每次unlock都会-1,但为0时释放锁。如果cas设置成功,则可以预计其他任何线程调用cas都不会再成功,也就认为当前线程得到了该锁,也作为running线程,很显然这个running线程并未进入等待队列。
如果c !=0 但发现自己已经拥有锁,只是简单地++acquires,并修改status值,但因为没有竞争,所以通过setstatus修改,而非cas,也就是说这段代码实现了偏向锁的功能,并且实现的非常漂亮。
addwaiter方法负责把当前无法获得锁的线程包装为一个node添加到队尾:
private node addwaiter(node mode) {
node node = new node(thread.currentthread(), mode);
// try the fast path of enq; backup to full enq on failure
node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareandsettail(pred, node)) {
pred.next = node;
return node;
enq(node);
return node;
其中参数mode是独占锁还是共享锁,默认为null,独占锁。追加到队尾的动作分两步:
如果当前队尾已经存在(tail!=null),则使用cas把当前线程更新为tail
如果当前tail为null或则线程调用cas设置队尾失败,则通过enq方法继续设置tail
下面是enq方法:
private node enq(final node node) {
for (;;) {
node t = tail;
if (t == null) { // must initialize
node h = new node(); // dummy header
h.next = node;
node.prev = h;
if (compareandsethead(h)) {
tail = node;
return h;
}
else {
node.prev = t;
if (compareandsettail(t, node)) {
t.next = node;
return t;
该方法就是循环调用cas,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addwaiter的目的就是通过cas把当前现在追加到队尾,并返回包装后的node实例。
把线程要包装为node对象的主要原因,除了用node构造供虚拟队列外,还用node包装了各种线程状态,这些状态被精心设计为一些数字值:
signal(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)
cancelled(1):因为超时或中断,该线程已经被取消
condition(-2):表明该线程被处于条件队列,就是因为调用了condition.await而被阻塞
propagate(-3):传播共享锁
0:0代表无状态
acquirequeued的主要作用是把已经追加到队列的线程节点(addwaiter方法返回值)进行阻塞,但阻塞前又通过tryaccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回
final boolean acquirequeued(final node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final node p = node.predecessor();
if (p == head && tryacquire(arg)) {
sethead(node);
p.next = null; // help gc
return interrupted;
if (shouldparkafterfailedacquire(p, node) &&
parkandcheckinterrupt())
interrupted = true;
} catch (runtimeexception ex) {
cancelacquire(node);
throw ex;
仔细看看这个方法是个无限循环,感觉如果p == head && tryacquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第12行的parkandcheckinterrupt会把当前线程挂起,从而阻塞住线程的调用栈。
private final boolean parkandcheckinterrupt() {
locksupport.park(this);
return thread.interrupted();
如前面所述,locksupport.park最终把线程交给系统(linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于cancel状态则没有必要,具体的检查在shouldparkafterfailedacquire中:
private static boolean shouldparkafterfailedacquire(node pred, node node) {
int ws = pred.waitstatus;
if (ws == node.signal)
/*
* this node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (ws > 0) {
* predecessor was cancelled. skip over predecessors and
* indicate retry.
do {
node.prev = pred = pred.prev;
} while (pred.waitstatus > 0);
pred.next = node;
} else {
* waitstatus must be 0 or propagate. indicate that we
* need a signal, but don't park yet. caller will need to
* retry to make sure it cannot acquire before parking.
compareandsetwaitstatus(pred, ws, node.signal);
}
return false;
}
检查原则在于:
规则1:如果前继的节点状态为signal,表明当前节点需要unpark,则返回成功,此时acquirequeued方法的第12行(parkandcheckinterrupt)将导致线程阻塞
规则2:如果前继节点状态为cancelled(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquirequeued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
规则3:如果前继节点状态为非signal、非cancelled,则设置前继的状态为signal,返回false后进入acquirequeued的无限循环,与规则2同
总体看来,shouldparkafterfailedacquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于cancelled状态,则顺便删除这些节点重新构造队列。
至此,锁住线程的逻辑已经完成,下面讨论解锁的过程。
请求锁不成功的线程会被挂起在acquirequeued方法的第12行,12行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第13行,即设置interrupted = true,之后又进入无限循环。
从无限循环的代码可以看出,并不是得到解锁的线程一定能获得锁,必须在第6行中调用tryaccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。通过之后将要介绍的解锁机制会看到,第一个被解锁的线程就是head,因此p == head的判断基本都会成功。
至此可以看到,把tryacquire方法延迟到子类中实现的做法非常精妙并具有极强的可扩展性,令人叹为观止!当然精妙的不是这个templae设计模式,而是doug lea对锁结构的精心布局。
解锁代码相对简单,主要体现在abstractqueuedsynchronizer.release和sync.tryrelease方法中:
class abstractqueuedsynchronizer
public final boolean release(int arg) {
if (tryrelease(arg)) {
node h = head;
if (h != null && h.waitstatus != 0)
unparksuccessor(h);
class sync
protected final boolean tryrelease(int releases) {
int c = getstate() - releases;
if (thread.currentthread() != getexclusiveownerthread())
throw new illegalmonitorstateexception();
boolean free = false;
free = true;
setexclusiveownerthread(null);
setstate(c);
return free;
tryrelease与tryacquire语义相同,把如何释放的逻辑延迟到子类中。tryrelease语义很明确:如果线程多次锁定,则进行多次释放,直至status==0则真正释放锁,所谓释放锁即设置status为0,因为无竞争所以没有使用cas。
release的语义在于:如果可以释放锁,则唤醒队列第一个线程(head),具体唤醒代码如下:
private void unparksuccessor(node node) {
/*
* if status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. it is ok if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitstatus;
if (ws < 0)
compareandsetwaitstatus(node, ws, 0);
* thread to unpark is held in successor, which is normally
* just the next node. but if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
node s = node.next;
if (s == null || s.waitstatus > 0) {
s = null;
for (node t = tail; t != null && t != node; t = t.prev)
if (t.waitstatus <= 0)
s = t;
if (s != null)
locksupport.unpark(s.thread);
这段代码的意思在于找出第一个可以unpark的线程,一般说来head.next == head,head就是第一个线程,但head.next可能被取消或被置为null,因此比较稳妥的办法是从后往前找第一个可用线程。貌似回溯会导致性能降低,其实这个发生的几率很小,所以不会有性能影响。之后便是通知系统内核继续该线程,在linux下是通过pthread_mutex_unlock完成。之后,被解锁的线程进入上面所说的重新竞争状态。
abstractqueuedsynchronizer通过构造一个基于阻塞的clh队列容纳所有的阻塞线程,而对该队列的操作均通过lock-free(cas)操作,但对已经获得锁的线程而言,reentrantlock实现了偏向锁的功能。
synchronized的底层也是一个基于cas操作的等待队列,但jvm实现的更精细,把等待队列分为contentionlist和entrylist,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但synchronized还实现了自旋锁,并针对不同的系统和硬件体系进行了优化,而lock则完全依靠系统阻塞挂起等待线程。
当然lock比synchronized更适合在应用层扩展,可以继承abstractqueuedsynchronizer定义各种实现,比如实现读写锁(readwritelock),公平或不公平锁;同时,lock对应的condition也比wait/notify要方便的多、灵活的多。