天天看点

Java 并发编程-再谈 AbstractQueuedSynchronizer 2:共享模式与基于 Condition 的等待 / 通知机制实现

共享模式acquire实现流程

上文我们讲解了AbstractQueuedSynchronizer独占模式的acquire实现流程,本文趁热打铁继续看一下AbstractQueuedSynchronizer共享模式acquire的实现流程。连续两篇文章的学习,也可以对比独占模式acquire和共享模式acquire的区别,加深对于AbstractQueuedSynchronizer的理解。

先看一下共享模式acquire的实现,方法为acquireShared和acquireSharedInterruptibly,两者差别不大,区别就在于后者有中断处理,以acquireShared为例:

1

2

3

4

public

final

void

acquireShared(

int

arg) {

if

(tryAcquireShared(arg) <

)

doAcquireShared(arg);

}

这里就能看出第一个差别来了:独占模式acquire的时候子类重写的方法tryAcquire返回的是boolean,即是否tryAcquire成功;共享模式acquire的时候,返回的是一个int型变量,判断是否<0。doAcquireShared方法的实现为:

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

private

void

doAcquireShared(

int

arg) {

final

Node node = addWaiter(Node.SHARED);

boolean

failed =

true

;

try

{

boolean

interrupted =

false

;

for

(;;) {

final

Node p = node.predecessor();

if

(p == head) {

int

r = tryAcquireShared(arg);

if

(r >=

) {

setHeadAndPropagate(node, r);

p.next =

null

;

// help GC

if

(interrupted)

selfInterrupt();

failed =

false

;

return

;

}

}

if

(shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted =

true

;

}

}

finally

{

if

(failed)

cancelAcquire(node);

}

}

我们来分析一下这段代码做了什么:

  1. addWaiter,把所有tryAcquireShared<0的线程实例化出一个Node,构建为一个FIFO队列,这和独占锁是一样的
  2. 拿当前节点的前驱节点,只有前驱节点是head的节点才能tryAcquireShared,这和独占锁也是一样的
  3. 前驱节点不是head的,执行”shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()”,for(;;)循环,”shouldParkAfterFailedAcquire()”方法执行2次,当前线程阻塞,这和独占锁也是一样的

确实,共享模式下的acquire和独占模式下的acquire大部分逻辑差不多,最大的差别在于tryAcquireShared成功之后,独占模式的acquire是直接将当前节点设置为head节点即可,共享模式会执行setHeadAndPropagate方法,顾名思义,即在设置head之后多执行了一步propagate操作。setHeadAndPropagate方法源码为:

private

void

setHeadAndPropagate(Node node,

int

propagate) {

Node h = head;

// Record old head for check below

setHead(node);

/*

* Try to signal next queued node if:

*   Propagation was indicated by caller,

*     or was recorded (as h.waitStatus) by a previous operation

*     (note: this uses sign-check of waitStatus because

*      PROPAGATE status may transition to SIGNAL.)

* and

*   The next node is waiting in shared mode,

*     or we don't know, because it appears null

*

* The conservatism in both of these checks may cause

* unnecessary wake-ups, but only when there are multiple

* racing acquires/releases, so most need signals now or soon

* anyway.

*/

if

(propagate >

|| h ==

null

|| h.waitStatus <

) {

Node s = node.next;

if

(s ==

null

|| s.isShared())

doReleaseShared();

}

}

第3行的代码设置重设head,第2行的代码由于第3行的代码要重设head,因此先定义一个Node型变量h获得原head的地址,这两行代码很简单。

第19行~第23行的代码是独占锁和共享锁最不一样的一个地方,我们再看独占锁acquireQueued的代码:

final

boolean

acquireQueued(

final

Node node,

int

arg) {

boolean

failed =

true

;

try

{

boolean

interrupted =

false

;

for

(;;) {

final

Node p = node.predecessor();

if

(p == head && tryAcquire(arg)) {

setHead(node);

p.next =

null

;

// help GC

failed =

false

;

return

interrupted;

}

if

(shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted =

true

;

}

}

finally

{

if

(failed)

cancelAcquire(node);

}

}

这意味着独占锁某个节点被唤醒之后,它只需要将这个节点设置成head就完事了,而共享锁不一样,某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。

共享模式release实现流程

上面讲了共享模式下acquire是如何实现的,下面再看一下release的实现流程,方法为releaseShared:

public

final

boolean

releaseShared(

int

arg) {

if

(tryReleaseShared(arg)) {

doReleaseShared();

return

true

;

}

return

false

;

}

tryReleaseShared方法是子类实现的,如果tryReleaseShared成功,那么执行doReleaseShared()方法:

28

29

private

void

doReleaseShared() {

/*

* Ensure that a release propagates, even if there are other

* in-progress acquires/releases.  This proceeds in the usual

* way of trying to unparkSuccessor of head if it needs

* signal. But if it does not, status is set to PROPAGATE to

* ensure that upon release, propagation continues.

* Additionally, we must loop in case a new node is added

* while we are doing this. Also, unlike other uses of

* unparkSuccessor, we need to know if CAS to reset status

* fails, if so rechecking.

*/

for

(;;) {

Node h = head;

if

(h !=

null

&& h != tail) {

int

ws = h.waitStatus;

if

(ws == Node.SIGNAL) {

if

(!compareAndSetWaitStatus(h, Node.SIGNAL,

))

continue

;           

// loop to recheck cases

unparkSuccessor(h);

}

else

if

(ws ==

&&

!compareAndSetWaitStatus(h,

, Node.PROPAGATE))

continue

;               

// loop on failed CAS

}

if

(h == head)                  

// loop if head changed

break

;

}

}

主要是两层逻辑:

  1. 头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点
  2. 头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播

Condition的await()方法实现原理—-构建等待队列

我们知道,Condition是用于实现通知/等待机制的,和Object的wait()/notify()一样,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很长,加之Condition也是AbstractQueuedSynchronizer的一部分,因此将Condition也放在这里写了。

Condition分为await()和signal()两部分,前者用于等待、后者用于唤醒,首先看一下await()是如何实现的。Condition本身是一个接口,其在AbstractQueuedSynchronizer中的实现为ConditionObject:

public

class

ConditionObject

implements

Condition, java.io.Serializable {

private

static

final

long

serialVersionUID = 1173984872572414699L;

/** First node of condition queue. */

private

transient

Node firstWaiter;

/** Last node of condition queue. */

private

transient

Node lastWaiter;

...

}

这里贴了一些字段定义,后面都是方法就不贴了,会对重点方法进行分析的。从字段定义我们可以看到,ConditionObject全局性地记录了第一个等待的节点与最后一个等待的节点。

像ReentrantLock每次要使用ConditionObject,直接new一个ConditionObject出来即可。我们关注一下await()方法的实现:

public

final

void

await()

throws

InterruptedException {

if

(Thread.interrupted())

throw

new

InterruptedException();

Node node = addConditionWaiter();

int

savedState = fullyRelease(node);

int

interruptMode =

;

while

(!isOnSyncQueue(node)) {

LockSupport.park(

this

);

if

((interruptMode = checkInterruptWhileWaiting(node)) !=

)

break

;

}

if

(acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if

(node.nextWaiter !=

null

)

// clean up if cancelled

unlinkCancelledWaiters();

if

(interruptMode !=

)

reportInterruptAfterWait(interruptMode);

}

第2行~第3行的代码用于处理中断,第4行代码比较关键,添加Condition的等待者,看一下实现:

private

Node addConditionWaiter() {

Node t = lastWaiter;

// If lastWaiter is cancelled, clean out.

if

(t !=

null

&& t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

Node node =

new

Node(Thread.currentThread(), Node.CONDITION);

if

(t ==

null

)

firstWaiter = node;

else

t.nextWaiter = node;

lastWaiter = node;

return

node;

}

首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着第4行的的判断,判断最后一个等待者的waitStatus不是CONDITION的话,执行第5行的代码,解绑取消的等待者,因为通过第8行的代码,我们看到,new出来的Node的状态都是CONDITION的。

那么unlinkCancelledWaiters做了什么?里面的流程就不看了,就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。

接着第8行的代码前面说过了,new出来了一个Node,存储了当前线程,waitStatus是CONDITION,接着第9行~第13行的操作很好理解:

  1. 如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node
  2. 如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node
  3. 无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面

用一张图表示一下构建的数据结构就是:

对比学习,我们总结一下Condition构建出来的队列和AbstractQueuedSynchronizer构建出来的队列的差别,主要体现在2点上:

  1. AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点
  2. AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址

整个过程中,我们看到没有使用任何CAS操作,firstWaiter和lastWaiter也没有用volatile修饰,其实原因很简单:要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么。

Condition的await()方法实现原理—-线程等待

前面我们看了Condition构建等待队列的过程,接下来我们看一下等待的过程,await()方法的代码比较短,再贴一下:

public

final

void

await()

throws

InterruptedException {

if

(Thread.interrupted())

throw

new

InterruptedException();

Node node = addConditionWaiter();

int

savedState = fullyRelease(node);

int

interruptMode =

;

while

(!isOnSyncQueue(node)) {

LockSupport.park(

this

);

if

((interruptMode = checkInterruptWhileWaiting(node)) !=

)

break

;

}

if

(acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if

(node.nextWaiter !=

null

)

// clean up if cancelled

unlinkCancelledWaiters();

if

(interruptMode !=

)

reportInterruptAfterWait(interruptMode);

}

构建完毕队列之后,执行第5行的fullyRelease方法,顾名思义:fullyRelease方法的作用是完全释放Node的状态。方法实现为:

final

int

fullyRelease(Node node) {

boolean

failed =

true

;

try

{

int

savedState = getState();

if

(release(savedState)) {

failed =

false

;

return

savedState;

}

else

{

throw

new

IllegalMonitorStateException();

}

}

finally

{

if

(failed)

node.waitStatus = Node.CANCELLED;

}

}

这里第4行获取state,第5行release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了,前面的文章详细讲解过。

接着看await()方法的第7行判断”while(!isOnSyncQueue(node))”:

final

boolean

isOnSyncQueue(Node node) {

if

(node.waitStatus == Node.CONDITION || node.prev ==

null

)

return

false

;

if

(node.next !=

null

)

// If has successor, it must be on queue

return

true

;

/*

* node.prev can be non-null, but not yet on queue because

* the CAS to place it on queue can fail. So we have to

* traverse from tail to make sure it actually made it.  It

* will always be near the tail in calls to this method, and

* unless the CAS failed (which is unlikely), it will be

* there, so we hardly ever traverse much.

*/

return

findNodeFromTail(node);

}

注意这里的判断是Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。

至此调用await()方法的线程构建Condition等待队列–释放锁–等待的过程已经全部分析完毕。 

Condition的signal()实现原理

上面的代码分析了构建Condition等待队列–释放锁–等待的过程,接着看一下signal()方法通知是如何实现的:

public

final

void

signal() {

if

(!isHeldExclusively())

throw

new

IllegalMonitorStateException();

Node first = firstWaiter;

if

(first !=

null

)

doSignal(first);

}

首先从第2行的代码我们看到,要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。

那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:

private

void

doSignal(Node first) {

do

{

if

( (firstWaiter = first.nextWaiter) ==

null

)

lastWaiter =

null

;

first.nextWaiter =

null

;

}

while

(!transferForSignal(first) &&

(first = firstWaiter) !=

null

);

}

第3行~第5行的代码很好理解:

  1. 重新设置firstWaiter,指向第一个waiter的nextWaiter
  2. 如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空
  3. 因为firstWaiter是要被signal的,因此它没什么用了,nextWaiter置空

接着执行第6行和第7行的代码,这里重点就是第6行的transferForSignal方法:

final

boolean

transferForSignal(Node node) {

/*

* If cannot change waitStatus, the node has been cancelled.

*/

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

/*

* Splice onto queue and try to set waitStatus of predecessor to

* indicate that thread is (probably) waiting. If cancelled or

* attempt to set waitStatus fails, wake up to resync (in which

* case the waitStatus can be transiently and harmlessly wrong).

*/

Node p = enq(node);

int

ws = p.waitStatus;

if

(ws >

|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);

return

true

;

}

方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,总结一下方法的实现:

  1. 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
  2. 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列
  3. 当前节点通过CAS机制将waitStatus置为SIGNAL

最后上面的步骤全部成功,返回true,返回true唤醒等待节点成功。从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它。

代码分析到这里,我想类似的signalAll方法也没有必要再分析了,显然signalAll方法的作用就是将所有Condition队列中等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。

代码示例

可能大家看了我分析半天代码会有点迷糊,这里最后我贴一段我用于验证上面Condition结论的示例代码,首先建立一个Thread,我将之命名为ConditionThread:

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

/**

* @author 五月的仓颉http://www.cnblogs.com/xrq730/p/7067904.html

*/

public

class

ConditionThread

implements

Runnable {

private

Lock lock;

private

Condition condition;

public

ConditionThread(Lock lock, Condition condition) {

this

.lock = lock;

this

.condition = condition;

}

@Override

public

void

run() {

if

(

"线程0"

.equals(JdkUtil.getThreadName())) {

thread0Process();

}

else

if

(

"线程1"

.equals(JdkUtil.getThreadName())) {

thread1Process();

}

else

if

(

"线程2"

.equals(JdkUtil.getThreadName())) {

thread2Process();

}

}

private

void

thread0Process() {

try

{

lock.lock();

System.out.println(

"线程0休息5秒"

);

JdkUtil.sleep(

5000

);

condition.signal();

System.out.println(

"线程0唤醒等待线程"

);

}

finally

{

lock.unlock();

}

}

private

void

thread1Process() {

try

{

lock.lock();

System.out.println(

"线程1阻塞"

);

condition.await();

System.out.println(

"线程1被唤醒"

);

}

catch

(InterruptedException e) {

}

finally

{

lock.unlock();

}

}

private

void

thread2Process() {

try

{

System.out.println(

"线程2想要获取锁"

);

lock.lock();

System.out.println(

"线程2获取锁成功"

);

}

finally

{

lock.unlock();

}

}

}

这个类里面的方法就不解释了,反正就三个方法片段,根据线程名判断,每个线层执行的是其中的一个代码片段。写一段测试代码:

/**

* @author 五月的仓颉http://www.cnblogs.com/xrq730/p/7067904.html

*/

@Test

public

void

testCondition()

throws

Exception {

Lock lock =

new

ReentrantLock();

Condition condition = lock.newCondition();

// 线程0的作用是signal

Runnable runnable0 =

new

ConditionThread(lock, condition);

Thread thread0 =

new

Thread(runnable0);

thread0.setName(

"线程0"

);

// 线程1的作用是await

Runnable runnable1 =

new

ConditionThread(lock, condition);

Thread thread1 =

new

Thread(runnable1);

thread1.setName(

"线程1"

);

// 线程2的作用是lock

Runnable runnable2 =

new

ConditionThread(lock, condition);

Thread thread2 =

new

Thread(runnable2);

thread2.setName(

"线程2"

);

thread1.start();

Thread.sleep(

1000

);

thread0.start();

Thread.sleep(

1000

);

thread2.start();

thread1.join();

}

测试代码的意思是:

  1. 线程1先启动,获取锁,调用await()方法等待
  2. 线程0后启动,获取锁,休眠5秒准备signal()
  3. 线程2最后启动,获取锁,由于线程0未使用完毕锁,因此线程2排队,可以此时由于线程0还未signal(),因此线程1在线程0执行signal()后,在AbstractQueuedSynchronizer队列中的顺序是在线程2后面的

代码执行结果为:

1

线程

1

阻塞

2

线程

休息

5

3

线程

2

想要获取锁

4

线程

唤醒等待线程

5

线程

2

获取锁成功

6

线程

1

被唤醒

符合我们的结论:signal()并不意味着被唤醒的线程立即执行。由于线程2先于线程0排队,因此看到第5行打印的内容,线程2先获取锁。

熬夜不易,点击请老王喝杯烈酒!!!!!!!