天天看點

Java并發程式設計-再談 AbstractQueuedSynchronizer 1 :獨占模式

關于AbstractQueuedSynchronizer

JDK1.5之後引入了并發包java.util.concurrent,大大提高了Java程式的并發性能。關于java.util.concurrent包我總結如下:

  • AbstractQueuedSynchronizer是并發類諸如ReentrantLock、CountDownLatch、Semphore的核心
  • CAS算法是AbstractQueuedSynchronizer的核心

可以說AbstractQueuedSynchronizer是并發類的重中之重。其實之前在ReentrantLock實作原理深入探究一文中已經有結合ReentrantLock詳細解讀過AbstractQueuedSynchronizer,但限于當時水準原因,回看一年半前的此文,感覺對于AbstractQueuedSynchronizer的解讀了解還不夠深,是以這裡更新一篇文章,再次解讀AbstractQueuedSynchronizer的資料結構即相關源碼實作,本文基于JDK1.7版本。 

AbstactQueuedSynchronizer的基本資料結構

AbstractQueuedSynchronizer的基本資料結構為Node,關于Node,JDK作者寫了詳細的注釋,這裡我大緻總結幾點:

  1. AbstractQueuedSynchronizer的等待隊列是CLH隊列的變種,CLH隊列通常用于自旋鎖,AbstractQueuedSynchronizer的等待隊列用于阻塞同步器
  2. 每個節點中持有一個名為”status”的字段用于是否一條線程應當阻塞的追蹤,但是status字段并不保證加鎖
  3. 一條線程如果它處于隊列的頭,那麼他會嘗試去acquire,但是成為頭并不保證成功,它隻是有權利去競争
  4. 要進入隊列,你隻需要自動将它拼接在隊列尾部即可;要從隊列中移除,你隻需要設定header字段

下面我用一張表格總結一下Node中持有哪些變量且每個變量的含義:

關于SIGNAL、CANCELLED、CONDITION、PROPAGATE四個狀态,JDK源碼的注釋中同樣有了詳細的解讀,再用一張表格總結一下:

AbstractQueuedSynchronizer供子類實作的方法

AbstractQueuedSynchzonizer是基于模闆模式的實作,不過它的模闆模式寫法有點特别,整個類中沒有任何一個abstract的抽象方法,取而代之的是,需要子類去實作的那些方法通過一個方法體抛出UnsupportedOperationException異常來讓子類知道。

AbstractQueuedSynchronizer類中一共有五處方法供子類實作,用表格總結一下:

這裡的acquire不好翻譯,是以就直接原詞放上來了,因為acquire是一個動詞,後面并沒有帶賓語,是以不知道具體acquire的是什麼。按照我個人了解,acquire的意思應當是根據狀态字段state去擷取一個執行目前動作的資格。

比如ReentrantLock的lock()方法最終會調用acquire方法,那麼:

  1. 線程1去lock(),執行acquire,發現state=0,是以有資格執行lock()的動作,将state設定為1,傳回true
  2. 線程2去lock(),執行acquire,發現state=1,是以沒有資格執行lock()的動作,傳回false

這種了解我認為應當是比較準确的。

獨占模式acquire實作流程

有了上面的這些基礎,我們看一下獨占式acquire的實作流程,主要是線上程acquire失敗後,是如何建構資料結構的,先看理論,之後再用一個例子畫圖說明。

看一下AbstractQuueuedSynchronizer的acquire方法實作流程,acquire方法是用于獨占模式下進行操作的:

1

2

3

4

5

public

final

void

acquire(

int

arg) {

if

(!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

tryAcquire方法前面說過了,是子類實作的一個方法,如果tryAcquire傳回的是true(成功),即表明目前線程獲得了一個執行目前動作的資格,自然也就不需要建構資料結構進行阻塞等待。

如果tryAcquire方法傳回的是false,那麼目前線程沒有獲得執行目前動作的資格,接着執行”acquireQueued(addWaiter(Node.EXCLUSIVE), arg))”這句代碼,這句話很明顯,它是由兩步構成的:

  1. addWaiter,添加一個等待者
  2. acquireQueued,嘗試從等待隊列中去擷取執行一次acquire動作

分别看一下每一步做了什麼。

addWaiter

先看第一步,addWaiter做了什麼,從傳入的參數Node.EXCLUSIVE我們知道這是獨占模式的:

6

7

8

9

10

11

12

13

14

private

Node addWaiter(Node mode) {

Node node =

new

Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node prev = tail;

if

(prev !=

null

) {

node.prev = prev;

if

(compareAndSetTail(prev, node)) {

prev.next = node;

return

node;

}

}

enq(node);

return

node;

}

首先看第4行~第11行的代碼,獲得目前資料結構中的尾節點,如果有尾節點,那麼先擷取這個節點認為它是前驅節點prev,然後:

  • 新生成的Node的前驅節點指向prev
  • 并發下隻有一條線程可以通過CAS算法讓自己的Node成為尾節點,此時将此prev的next指向該線程對應的Node

是以在資料結構中有節點的情況下,所有新增節點都是作為尾節點插入資料結構。從注釋上來看,這段邏輯的存在的意義是以最短路徑O(1)的效果完成快速入隊,以最大化減小開銷。

假如目前節點沒有被設定為尾節點,那麼執行enq方法:

15

private

Node enq(

final

Node node) {

for

(;;) {

Node t = tail;

if

(t ==

null

) {

// Must initialize

if

(compareAndSetHead(

new

Node()))

tail = head;

}

else

{

node.prev = t;

if

(compareAndSetTail(t, node)) {

t.next = node;

return

t;

}

}

}

}

這段代碼的邏輯為:

  1. 如果尾節點為空,即目前資料結構中沒有節點,那麼new一個不帶任何狀态的Node作為頭節點
  2. 如果尾節點不為空,那麼并發下使用CAS算法将目前Node追加成為尾節點,由于是一個for(;;)循環,是以所有沒有成功acquire的Node最終都會被追加到資料結構中

看完了代碼,用一張圖表示一下AbstractQueuedSynchronizer的整體資料結構(比較簡單,就不自己畫了,網上随便找了一張圖):

acquireQueued

隊列建構好了,下一步就是在必要的時候從隊列裡面拿出一個Node了,這就是acquireQueued方法,顧名思義,從隊列裡面acquire。看下acquireQueued方法的實作:

16

17

18

19

20

21

final

boolean

acquireQueued(

final

Node node,

int

arg) {

boolean

failed =

true

;

try

{

boolean

interrupted =

false

;

for

(;;) {

final

Node p = node.prevecessor();

if

(p == head && tryAcquire(arg)) {

setHead(node);

p.next =

null

;

// help GC

failed =

false

;

return

interrupted;

}

if

(shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted =

true

;

}

}

finally

{

if

(failed)

cancelAcquire(node);

}

}

這段代碼描述了幾件事:

  1. 從第6行的代碼擷取節點的前驅節點p,第7行的代碼判斷p是前驅節點并tryAcquire我們知道,隻有目前第一個持有Thread的節點才會嘗試acquire,如果節點acquire成功,那麼setHead方法,将目前節點作為head、将目前節點中的thread設定為null、将目前節點的prev設定為null,這保證了資料結構中頭結點永遠是一個不帶Thread的空節點
  2. 如果目前節點不是前驅節點或者tryAcquire失敗,那麼執行第13行~第15行的代碼,做了兩步操作,首先判斷在acquie失敗後是否應該park,其次park并檢查中斷狀态

看一下第一步shouldParkAfterFailedAcquire代碼做了什麼:

22

23

24

25

26

27

private

static

boolean

shouldParkAfterFailedAcquire(Node prev, Node node) {

int

ws = prev.waitStatus;

if

(ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park.

*/

return true;

if (ws > 0) {

/*

* prevecessor was cancelled. Skip over prevecessors and

* indicate retry.

*/

do {

node.prev = prev = prev.prev;

} while (prev.waitStatus > 0);

prev.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE.  Indicate that we

* need a signal, but don't park yet.  Caller will need to

* retry to make sure it cannot acquire before parking.

*/

compareAndSetWaitStatus(prev, ws, Node.SIGNAL);

}

return

false

;

}

這裡每個節點判斷它前驅節點的狀态,如果:

  1. 它的前驅節點是SIGNAL狀态的,傳回true,表示目前節點應當park
  2. 它的前驅節點的waitStatus>0,相當于CANCELLED(因為狀态值裡面隻有CANCELLED是大于0的),那麼CANCELLED的節點廢棄,目前節點不斷向前找并重新連接配接為雙向隊列,直到找到一個前驅節點waitStats不是CANCELLED的為止
  3. 它的前驅節點不是SIGNAL狀态且waitStatus<=0,此時執行第24行代碼,利用CAS機制,如果waitStatus的前驅節點是0那麼更新為SIGNAL狀态

如果判斷判斷應當park,那麼parkAndCheckInterrupt方法:

private

final

boolean

parkAndCheckInterrupt() {

LockSupport.park(

this

);

return

Thread.interrupted();

}

利用LockSupport的park方法讓目前線程阻塞。 

獨占模式release流程

上面整理了獨占模式的acquire流程,看到了等待的Node是如何建構成一個資料結構的,下面看一下釋放的時候做了什麼,release方法的實作為:

public

final

boolean

release(

int

arg) {

if

(tryRelease(arg)) {

Node h = head;

if

(h !=

null

&& h.waitStatus !=

)

unparkSuccessor(h);

return

true

;

}

return

false

;

}

tryRelease同樣是子類去實作的,表示目前動作我執行完了,要釋放我執行目前動作的資格,講這個資格讓給其它線程,然後tryRelease釋放成功,擷取到head節點,如果head節點的waitStatus不為0的話,執行unparkSuccessor方法,顧名思義unparkSuccessor意為unpark頭結點的繼承者,方法實作為:

private

void

unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling.  It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node.  But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

Node s = node.next;

if

(s ==

null

|| s.waitStatus >

) {

s =

null

;

for

(Node t = tail; t !=

null

&& t != node; t = t.prev)

if

(t.waitStatus <=

)

s = t;

}

if

(s !=

null

)

LockSupport.unpark(s.thread);

}

這段代碼比較好了解,整理一下流程:

  1. 頭節點的waitStatus<0,将頭節點的waitStatus設定為0
  2. 拿到頭節點的下一個節點s,如果s==null或者s的waitStatus>0(被取消了),那麼從隊列尾巴開始向前尋找一個waitStatus<=0的節點作為後繼要喚醒的節點

最後,如果拿到了一個不等于null的節點s,就利用LockSupport的unpark方法讓它取消阻塞。

實戰舉例:資料結構建構

上面的例子講解地過于理論,下面利用ReentrantLock舉個例子,但是這裡不講ReentrantLock實作原理,隻是利用ReentrantLock研究AbstractQueuedSynchronizer的acquire和release。示例代碼為:

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

/**

* @author 五月的倉颉http://www.cnblogs.com/xrq730/p/7056614.html

*/

public

class

AbstractQueuedSynchronizerTest {

@Test

public

void

testAbstractQueuedSynchronizer() {

Lock lock =

new

ReentrantLock();

Runnable runnable0 =

new

ReentrantLockThread(lock);

Thread thread0 =

new

Thread(runnable0);

thread0.setName(

"線程0"

);

Runnable runnable1 =

new

ReentrantLockThread(lock);

Thread thread1 =

new

Thread(runnable1);

thread1.setName(

"線程1"

);

Runnable runnable2 =

new

ReentrantLockThread(lock);

Thread thread2 =

new

Thread(runnable2);

thread2.setName(

"線程2"

);

thread0.start();

thread1.start();

thread2.start();

for

(;;);

}

private

class

ReentrantLockThread

implements

Runnable {

private

Lock lock;

public

ReentrantLockThread(Lock lock) {

this

.lock = lock;

}

@Override

public

void

run() {

try

{

lock.lock();

for

(;;);

}

finally

{

lock.unlock();

}

}

}

}

全部是死循環,相當于第一條線程(線程0)acquire成功之後,後兩條線程(線程1、線程2)阻塞,下面的代碼就不考慮後兩條線程誰先誰後的問題,就一條線程(線程1)流程執行到底、另一條線程(線程2)流程執行到底這麼分析了。

這裡再把addWaiter和enq兩個方法源碼貼一下:

private

Node addWaiter(Node mode) {

Node node =

new

Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node prev = tail;

if

(prev !=

null

) {

node.prev = prev;

if

(compareAndSetTail(prev, node)) {

prev.next = node;

return

node;

}

}

enq(node);

return

node;

}

private

Node enq(

final

Node node) {

for

(;;) {

Node t = tail;

if

(t ==

null

) {

// Must initialize

if

(compareAndSetHead(

new

Node()))

tail = head;

}

else

{

node.prev = t;

if

(compareAndSetTail(t, node)) {

t.next = node;

return

t;

}

}

}

}

首先第一個acquire失敗的線程1,由于此時整個資料結構中麼沒有任何資料,是以addWaiter方法第4行中拿到的prev=tail為空,執行enq方法,首先第3行擷取tail,第4行判斷到tail是null,是以頭結點new一個Node出來通過CAS算法設定為資料結構的head,tail同樣也是這個Node,此時資料結構為:

為了友善描述,prev和next,我給每個Node随便加了一個位址。接着繼續enq,因為enq内是一個死循環,是以繼續第3行擷取tail,new了一個空的Node之後tail就有了,執行else判斷,通過第8行~第10行代碼将目前線程對應的Node追加到資料結構尾部,那麼目前建構的資料結構為:

這樣,線程1對應的Node被加入資料結構,成為資料結構的tail,而資料結構的head是一個什麼都沒有的空Node。

接着線程2也acquire失敗了,線程2既然acquire失敗,那也要準備被加入資料結構中,繼續先執行addWaiter方法,由于此時已經有了tail,是以不需要執行enq方法,可以直接将目前Node添加到資料結構尾部,那麼目前建構的資料結構為:

至此,兩個阻塞的線程建構的三個Node已經全部歸位。

實戰舉例:線程阻塞

上述流程隻是描述了建構資料結構的過程,并沒有描述線程1、線程2阻塞的流程,是以接着繼續用實際例子看一下線程1、線程2如何阻塞。貼一下acquireQueued、shouldParkAfterFailedAcquire兩個方法源碼:

final

boolean

acquireQueued(

final

Node node,

int

arg) {

boolean

failed =

true

;

try

{

boolean

interrupted =

false

;

for

(;;) {

final

Node p = node.prevecessor();

if

(p == head && tryAcquire(arg)) {

setHead(node);

p.next =

null

;

// help GC

failed =

false

;

return

interrupted;

}

if

(shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted =

true

;

}

}

finally

{

if

(failed)

cancelAcquire(node);

}

}

private

static

boolean

shouldParkAfterFailedAcquire(Node prev, Node node) {

int

ws = prev.waitStatus;

if

(ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park.

*/

return true;

if (ws > 0) {

/*

* prevecessor was cancelled. Skip over prevecessors and

* indicate retry.

*/

do {

node.prev = prev = prev.prev;

} while (prev.waitStatus > 0);

prev.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE.  Indicate that we

* need a signal, but don't park yet.  Caller will need to

* retry to make sure it cannot acquire before parking.

*/

compareAndSetWaitStatus(prev, ws, Node.SIGNAL);

}

return

false

;

}

首先是線程1,它的前驅節點是head節點,在它tryAcquire成功的情況下,執行第8行~第11行的代碼。做幾件事情:

  1. head為線程1對應的Node
  2. 線程1對應的Node的thread置空
  3. 線程1對應的Node的prev置空
  4. 原head的next置空,這樣原head中的prev、next、thread都為空,對象内沒有引用指向其他地方,GC可以認為這個Node是垃圾,對這個Node進行回收,注釋”Help GC”就是這個意思
  5. failed=false表示沒有失敗

是以,如果線程1執行tryAcquire成功,那麼資料結構将變為:

從上述流程可以總結到:隻有前驅節點為head的節點會嘗試tryAcquire,其餘都不會,結合後面的release選繼承者的方式,保證了先acquire失敗的線程會優先從阻塞狀态中解除去重新acquire。這是一種公平的acquire方式,因為它遵循”先到先得”原則,但是我們可以動動手腳讓這種公平變為非公平,比如ReentrantLock預設的非公平模式,這個留在後面說。

那如果線程1執行tryAcquire失敗,那麼要執行shouldParkAfterFailedAcquire方法了,shouldParkAfterFailedAcquire拿線程1的前驅節點也就是head節點的waitStatus做了一個判斷,因為waitStatus=0,是以執行第18行~第20行的邏輯,将head的waitStatus設定為SIGNAL即-1,然後方法傳回false,資料結構變為:

看到這裡就一個變化:head的waitStatus從0變成了-1。既然shouldParkAfterFailedAcquire傳回false,acquireQueued的第13行~第14行的判斷自然不通過,繼續走for(;;)循環,如果tryAcquire失敗顯然又來到了shouldParkAfterFailedAcquire方法,此時線程1對應的Node的前驅節點head節點的waitStatus已經變為了SIGNAL即-1,是以執行第4行~第8行的代碼,直接傳回true出去。

shouldParkAfterFailedAcquire傳回true,parkAndCheckInterrupt直接調用LockSupport的park方法:

private

final

boolean

parkAndCheckInterrupt() {

LockSupport.park(

this

);

return

Thread.interrupted();

}

至此線程1阻塞,線程2阻塞的流程與線程1阻塞的流程相同,可以自己分析一下。

另外再提一個問題,不知道大家會不會想:

  1. 為什麼線程1對應的Node建構完畢不直接調用LockSupport的park方法進行阻塞?
  2. 為什麼不直接把head的waitStatus直接設定為Signal而要從0設定為Signal?

我認為這是AbstractQueuedSynchronizer開發人員做了類似自旋的操作。因為很多時候擷取acquire進行操作的時間很短,阻塞會引起上下文的切換,而很短時間就從阻塞狀态解除,這樣相對會比較耗費性能。

是以我們看到線程1自建構完畢Node加入資料結構到阻塞,一共嘗試了兩次tryAcquire,如果其中有一次成功,那麼線程1就沒有必要被阻塞,提升了性能。

熬夜不易,點選請老王喝杯烈酒!!!!!!!