天天看點

Java 并發程式設計-再談 AbstractQueuedSynchronizer 2:共享模式與基于 Condition 的等待 / 通知機制實作

共享模式acquire實作流程

上文我們講解了AbstractQueuedSynchronizer獨占模式的acquire實作流程,本文趁熱打鐵繼續看一下AbstractQueuedSynchronizer共享模式acquire的實作流程。連續兩篇文章的學習,也可以對比獨占模式acquire和共享模式acquire的差別,加深對于AbstractQueuedSynchronizer的了解。

先看一下共享模式acquire的實作,方法為acquireShared和acquireSharedInterruptibly,兩者差别不大,差別就在于後者有中斷處理,以acquireShared為例:

1

2

3

4

public

final

void

acquireShared(

int

arg) {

if

(tryAcquireShared(arg) <

)

doAcquireShared(arg);

}

這裡就能看出第一個差别來了:獨占模式acquire的時候子類重寫的方法tryAcquire傳回的是boolean,即是否tryAcquire成功;共享模式acquire的時候,傳回的是一個int型變量,判斷是否<0。doAcquireShared方法的實作為:

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

private

void

doAcquireShared(

int

arg) {

final

Node node = addWaiter(Node.SHARED);

boolean

failed =

true

;

try

{

boolean

interrupted =

false

;

for

(;;) {

final

Node p = node.predecessor();

if

(p == head) {

int

r = tryAcquireShared(arg);

if

(r >=

) {

setHeadAndPropagate(node, r);

p.next =

null

;

// help GC

if

(interrupted)

selfInterrupt();

failed =

false

;

return

;

}

}

if

(shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted =

true

;

}

}

finally

{

if

(failed)

cancelAcquire(node);

}

}

我們來分析一下這段代碼做了什麼:

  1. addWaiter,把所有tryAcquireShared<0的線程執行個體化出一個Node,建構為一個FIFO隊列,這和獨占鎖是一樣的
  2. 拿目前節點的前驅節點,隻有前驅節點是head的節點才能tryAcquireShared,這和獨占鎖也是一樣的
  3. 前驅節點不是head的,執行”shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()”,for(;;)循環,”shouldParkAfterFailedAcquire()”方法執行2次,目前線程阻塞,這和獨占鎖也是一樣的

确實,共享模式下的acquire和獨占模式下的acquire大部分邏輯差不多,最大的差别在于tryAcquireShared成功之後,獨占模式的acquire是直接将目前節點設定為head節點即可,共享模式會執行setHeadAndPropagate方法,顧名思義,即在設定head之後多執行了一步propagate操作。setHeadAndPropagate方法源碼為:

private

void

setHeadAndPropagate(Node node,

int

propagate) {

Node h = head;

// Record old head for check below

setHead(node);

/*

* Try to signal next queued node if:

*   Propagation was indicated by caller,

*     or was recorded (as h.waitStatus) by a previous operation

*     (note: this uses sign-check of waitStatus because

*      PROPAGATE status may transition to SIGNAL.)

* and

*   The next node is waiting in shared mode,

*     or we don't know, because it appears null

*

* The conservatism in both of these checks may cause

* unnecessary wake-ups, but only when there are multiple

* racing acquires/releases, so most need signals now or soon

* anyway.

*/

if

(propagate >

|| h ==

null

|| h.waitStatus <

) {

Node s = node.next;

if

(s ==

null

|| s.isShared())

doReleaseShared();

}

}

第3行的代碼設定重設head,第2行的代碼由于第3行的代碼要重設head,是以先定義一個Node型變量h獲得原head的位址,這兩行代碼很簡單。

第19行~第23行的代碼是獨占鎖和共享鎖最不一樣的一個地方,我們再看獨占鎖acquireQueued的代碼:

final

boolean

acquireQueued(

final

Node node,

int

arg) {

boolean

failed =

true

;

try

{

boolean

interrupted =

false

;

for

(;;) {

final

Node p = node.predecessor();

if

(p == head && tryAcquire(arg)) {

setHead(node);

p.next =

null

;

// help GC

failed =

false

;

return

interrupted;

}

if

(shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted =

true

;

}

}

finally

{

if

(failed)

cancelAcquire(node);

}

}

這意味着獨占鎖某個節點被喚醒之後,它隻需要将這個節點設定成head就完事了,而共享鎖不一樣,某個節點被設定為head之後,如果它的後繼節點是SHARED狀态的,那麼将繼續通過doReleaseShared方法嘗試往後喚醒節點,實作了共享狀态的向後傳播。

共享模式release實作流程

上面講了共享模式下acquire是如何實作的,下面再看一下release的實作流程,方法為releaseShared:

public

final

boolean

releaseShared(

int

arg) {

if

(tryReleaseShared(arg)) {

doReleaseShared();

return

true

;

}

return

false

;

}

tryReleaseShared方法是子類實作的,如果tryReleaseShared成功,那麼執行doReleaseShared()方法:

28

29

private

void

doReleaseShared() {

/*

* Ensure that a release propagates, even if there are other

* in-progress acquires/releases.  This proceeds in the usual

* way of trying to unparkSuccessor of head if it needs

* signal. But if it does not, status is set to PROPAGATE to

* ensure that upon release, propagation continues.

* Additionally, we must loop in case a new node is added

* while we are doing this. Also, unlike other uses of

* unparkSuccessor, we need to know if CAS to reset status

* fails, if so rechecking.

*/

for

(;;) {

Node h = head;

if

(h !=

null

&& h != tail) {

int

ws = h.waitStatus;

if

(ws == Node.SIGNAL) {

if

(!compareAndSetWaitStatus(h, Node.SIGNAL,

))

continue

;           

// loop to recheck cases

unparkSuccessor(h);

}

else

if

(ws ==

&&

!compareAndSetWaitStatus(h,

, Node.PROPAGATE))

continue

;               

// loop on failed CAS

}

if

(h == head)                  

// loop if head changed

break

;

}

}

主要是兩層邏輯:

  1. 頭結點本身的waitStatus是SIGNAL且能通過CAS算法将頭結點的waitStatus從SIGNAL設定為0,喚醒頭結點的後繼節點
  2. 頭結點本身的waitStatus是0的話,嘗試将其設定為PROPAGATE狀态的,意味着共享狀态可以向後傳播

Condition的await()方法實作原理—-建構等待隊列

我們知道,Condition是用于實作通知/等待機制的,和Object的wait()/notify()一樣,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很長,加之Condition也是AbstractQueuedSynchronizer的一部分,是以将Condition也放在這裡寫了。

Condition分為await()和signal()兩部分,前者用于等待、後者用于喚醒,首先看一下await()是如何實作的。Condition本身是一個接口,其在AbstractQueuedSynchronizer中的實作為ConditionObject:

public

class

ConditionObject

implements

Condition, java.io.Serializable {

private

static

final

long

serialVersionUID = 1173984872572414699L;

/** First node of condition queue. */

private

transient

Node firstWaiter;

/** Last node of condition queue. */

private

transient

Node lastWaiter;

...

}

這裡貼了一些字段定義,後面都是方法就不貼了,會對重點方法進行分析的。從字段定義我們可以看到,ConditionObject全局性地記錄了第一個等待的節點與最後一個等待的節點。

像ReentrantLock每次要使用ConditionObject,直接new一個ConditionObject出來即可。我們關注一下await()方法的實作:

public

final

void

await()

throws

InterruptedException {

if

(Thread.interrupted())

throw

new

InterruptedException();

Node node = addConditionWaiter();

int

savedState = fullyRelease(node);

int

interruptMode =

;

while

(!isOnSyncQueue(node)) {

LockSupport.park(

this

);

if

((interruptMode = checkInterruptWhileWaiting(node)) !=

)

break

;

}

if

(acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if

(node.nextWaiter !=

null

)

// clean up if cancelled

unlinkCancelledWaiters();

if

(interruptMode !=

)

reportInterruptAfterWait(interruptMode);

}

第2行~第3行的代碼用于進行中斷,第4行代碼比較關鍵,添加Condition的等待者,看一下實作:

private

Node addConditionWaiter() {

Node t = lastWaiter;

// If lastWaiter is cancelled, clean out.

if

(t !=

null

&& t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

Node node =

new

Node(Thread.currentThread(), Node.CONDITION);

if

(t ==

null

)

firstWaiter = node;

else

t.nextWaiter = node;

lastWaiter = node;

return

node;

}

首先拿到隊列(注意資料結構,Condition建構出來的也是一個隊列)中最後一個等待者,緊接着第4行的的判斷,判斷最後一個等待者的waitStatus不是CONDITION的話,執行第5行的代碼,解綁取消的等待者,因為通過第8行的代碼,我們看到,new出來的Node的狀态都是CONDITION的。

那麼unlinkCancelledWaiters做了什麼?裡面的流程就不看了,就是一些指針周遊并判斷狀态的操作,總結一下就是:從頭到尾周遊每一個Node,遇到Node的waitStatus不是CONDITION的就從隊列中踢掉,該節點的前後節點相連。

接着第8行的代碼前面說過了,new出來了一個Node,存儲了目前線程,waitStatus是CONDITION,接着第9行~第13行的操作很好了解:

  1. 如果lastWaiter是null,說明FIFO隊列中沒有任何Node,firstWaiter=Node
  2. 如果lastWaiter不是null,說明FIFO隊列中有Node,原lastWaiter的next指向Node
  3. 無論如何,新加入的Node程式設計lastWaiter,即新加入的Node一定是在最後面

用一張圖表示一下建構的資料結構就是:

對比學習,我們總結一下Condition建構出來的隊列和AbstractQueuedSynchronizer建構出來的隊列的差别,主要展現在2點上:

  1. AbstractQueuedSynchronizer建構出來的隊列,頭節點是一個沒有Thread的空節點,其辨別作用,而Condition建構出來的隊列,頭節點就是真正等待的節點
  2. AbstractQueuedSynchronizer建構出來的隊列,節點之間有next與pred互相辨別該節點的前一個節點與後一個節點的位址,而Condition建構出來的隊列,隻使用了nextWaiter辨別下一個等待節點的位址

整個過程中,我們看到沒有使用任何CAS操作,firstWaiter和lastWaiter也沒有用volatile修飾,其實原因很簡單:要await()必然要先lock(),既然lock()了就表示沒有競争,沒有競争自然也沒必要使用volatile+CAS的機制去保證什麼。

Condition的await()方法實作原理—-線程等待

前面我們看了Condition建構等待隊列的過程,接下來我們看一下等待的過程,await()方法的代碼比較短,再貼一下:

public

final

void

await()

throws

InterruptedException {

if

(Thread.interrupted())

throw

new

InterruptedException();

Node node = addConditionWaiter();

int

savedState = fullyRelease(node);

int

interruptMode =

;

while

(!isOnSyncQueue(node)) {

LockSupport.park(

this

);

if

((interruptMode = checkInterruptWhileWaiting(node)) !=

)

break

;

}

if

(acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if

(node.nextWaiter !=

null

)

// clean up if cancelled

unlinkCancelledWaiters();

if

(interruptMode !=

)

reportInterruptAfterWait(interruptMode);

}

建構完畢隊列之後,執行第5行的fullyRelease方法,顧名思義:fullyRelease方法的作用是完全釋放Node的狀态。方法實作為:

final

int

fullyRelease(Node node) {

boolean

failed =

true

;

try

{

int

savedState = getState();

if

(release(savedState)) {

failed =

false

;

return

savedState;

}

else

{

throw

new

IllegalMonitorStateException();

}

}

finally

{

if

(failed)

node.waitStatus = Node.CANCELLED;

}

}

這裡第4行擷取state,第5行release的時候将整個state傳過去,理由是某線程可能多次調用了lock()方法,比如調用了10次lock,那麼此線程就将state加到了10,是以這裡要将10傳過去,将狀态全部釋放,這樣後面的線程才能重新從state=0開始競争鎖,這也是方法被命名為fullyRelease的原因,因為要完全釋放鎖,釋放鎖之後,如果有競争鎖的線程,那麼就喚醒第一個,這都是release方法的邏輯了,前面的文章詳細講解過。

接着看await()方法的第7行判斷”while(!isOnSyncQueue(node))”:

final

boolean

isOnSyncQueue(Node node) {

if

(node.waitStatus == Node.CONDITION || node.prev ==

null

)

return

false

;

if

(node.next !=

null

)

// If has successor, it must be on queue

return

true

;

/*

* node.prev can be non-null, but not yet on queue because

* the CAS to place it on queue can fail. So we have to

* traverse from tail to make sure it actually made it.  It

* will always be near the tail in calls to this method, and

* unless the CAS failed (which is unlikely), it will be

* there, so we hardly ever traverse much.

*/

return

findNodeFromTail(node);

}

注意這裡的判斷是Node是否在AbstractQueuedSynchronizer建構的隊列中而不是Node是否在Condition建構的隊列中,如果Node不在AbstractQueuedSynchronizer建構的隊列中,那麼調用LockSupport的park方法阻塞。

至此調用await()方法的線程建構Condition等待隊列–釋放鎖–等待的過程已經全部分析完畢。 

Condition的signal()實作原理

上面的代碼分析了建構Condition等待隊列–釋放鎖–等待的過程,接着看一下signal()方法通知是如何實作的:

public

final

void

signal() {

if

(!isHeldExclusively())

throw

new

IllegalMonitorStateException();

Node first = firstWaiter;

if

(first !=

null

)

doSignal(first);

}

首先從第2行的代碼我們看到,要能signal(),目前線程必須持有獨占鎖,否則抛出異常IllegalMonitorStateException。

那麼真正操作的時候,擷取第一個waiter,如果有waiter,調用doSignal方法:

private

void

doSignal(Node first) {

do

{

if

( (firstWaiter = first.nextWaiter) ==

null

)

lastWaiter =

null

;

first.nextWaiter =

null

;

}

while

(!transferForSignal(first) &&

(first = firstWaiter) !=

null

);

}

第3行~第5行的代碼很好了解:

  1. 重新設定firstWaiter,指向第一個waiter的nextWaiter
  2. 如果第一個waiter的nextWaiter為null,說明目前隊列中隻有一個waiter,lastWaiter置空
  3. 因為firstWaiter是要被signal的,是以它沒什麼用了,nextWaiter置空

接着執行第6行和第7行的代碼,這裡重點就是第6行的transferForSignal方法:

final

boolean

transferForSignal(Node node) {

/*

* If cannot change waitStatus, the node has been cancelled.

*/

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

/*

* Splice onto queue and try to set waitStatus of predecessor to

* indicate that thread is (probably) waiting. If cancelled or

* attempt to set waitStatus fails, wake up to resync (in which

* case the waitStatus can be transiently and harmlessly wrong).

*/

Node p = enq(node);

int

ws = p.waitStatus;

if

(ws >

|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);

return

true

;

}

方法本意是将一個節點從Condition隊列轉換為AbstractQueuedSynchronizer隊列,總結一下方法的實作:

  1. 嘗試将Node的waitStatus從CONDITION置為0,這一步失敗直接傳回false
  2. 目前節點進入調用enq方法進入AbstractQueuedSynchronizer隊列
  3. 目前節點通過CAS機制将waitStatus置為SIGNAL

最後上面的步驟全部成功,傳回true,傳回true喚醒等待節點成功。從喚醒的代碼我們可以得出一個重要結論:某個await()的節點被喚醒之後并不意味着它後面的代碼會立即執行,它會被加入到AbstractQueuedSynchronizer隊列的尾部,隻有前面等待的節點擷取鎖全部完畢才能輪到它。

代碼分析到這裡,我想類似的signalAll方法也沒有必要再分析了,顯然signalAll方法的作用就是将所有Condition隊列中等待的節點逐一隊列中從移除,由CONDITION狀态變為SIGNAL狀态并加入AbstractQueuedSynchronizer隊列的尾部。

代碼示例

可能大家看了我分析半天代碼會有點迷糊,這裡最後我貼一段我用于驗證上面Condition結論的示例代碼,首先建立一個Thread,我将之命名為ConditionThread:

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

/**

* @author 五月的倉颉http://www.cnblogs.com/xrq730/p/7067904.html

*/

public

class

ConditionThread

implements

Runnable {

private

Lock lock;

private

Condition condition;

public

ConditionThread(Lock lock, Condition condition) {

this

.lock = lock;

this

.condition = condition;

}

@Override

public

void

run() {

if

(

"線程0"

.equals(JdkUtil.getThreadName())) {

thread0Process();

}

else

if

(

"線程1"

.equals(JdkUtil.getThreadName())) {

thread1Process();

}

else

if

(

"線程2"

.equals(JdkUtil.getThreadName())) {

thread2Process();

}

}

private

void

thread0Process() {

try

{

lock.lock();

System.out.println(

"線程0休息5秒"

);

JdkUtil.sleep(

5000

);

condition.signal();

System.out.println(

"線程0喚醒等待線程"

);

}

finally

{

lock.unlock();

}

}

private

void

thread1Process() {

try

{

lock.lock();

System.out.println(

"線程1阻塞"

);

condition.await();

System.out.println(

"線程1被喚醒"

);

}

catch

(InterruptedException e) {

}

finally

{

lock.unlock();

}

}

private

void

thread2Process() {

try

{

System.out.println(

"線程2想要擷取鎖"

);

lock.lock();

System.out.println(

"線程2擷取鎖成功"

);

}

finally

{

lock.unlock();

}

}

}

這個類裡面的方法就不解釋了,反正就三個方法片段,根據線程名判斷,每個線層執行的是其中的一個代碼片段。寫一段測試代碼:

/**

* @author 五月的倉颉http://www.cnblogs.com/xrq730/p/7067904.html

*/

@Test

public

void

testCondition()

throws

Exception {

Lock lock =

new

ReentrantLock();

Condition condition = lock.newCondition();

// 線程0的作用是signal

Runnable runnable0 =

new

ConditionThread(lock, condition);

Thread thread0 =

new

Thread(runnable0);

thread0.setName(

"線程0"

);

// 線程1的作用是await

Runnable runnable1 =

new

ConditionThread(lock, condition);

Thread thread1 =

new

Thread(runnable1);

thread1.setName(

"線程1"

);

// 線程2的作用是lock

Runnable runnable2 =

new

ConditionThread(lock, condition);

Thread thread2 =

new

Thread(runnable2);

thread2.setName(

"線程2"

);

thread1.start();

Thread.sleep(

1000

);

thread0.start();

Thread.sleep(

1000

);

thread2.start();

thread1.join();

}

測試代碼的意思是:

  1. 線程1先啟動,擷取鎖,調用await()方法等待
  2. 線程0後啟動,擷取鎖,休眠5秒準備signal()
  3. 線程2最後啟動,擷取鎖,由于線程0未使用完畢鎖,是以線程2排隊,可以此時由于線程0還未signal(),是以線程1線上程0執行signal()後,在AbstractQueuedSynchronizer隊列中的順序是線上程2後面的

代碼執行結果為:

1

線程

1

阻塞

2

線程

休息

5

3

線程

2

想要擷取鎖

4

線程

喚醒等待線程

5

線程

2

擷取鎖成功

6

線程

1

被喚醒

符合我們的結論:signal()并不意味着被喚醒的線程立即執行。由于線程2先于線程0排隊,是以看到第5行列印的内容,線程2先擷取鎖。

熬夜不易,點選請老王喝杯烈酒!!!!!!!