天天看點

Java 并發程式設計-再談 AbstractQueuedSynchronizer 3 :基于 AbstractQueuedSynchronizer 的并發類實作

公平模式ReentrantLock實作原理

前面的文章研究了AbstractQueuedSynchronizer的獨占鎖和共享鎖,有了前兩篇文章的基礎,就可以乘勝追擊,看一下基于AbstractQueuedSynchronizer的并發類是如何實作的。

ReentrantLock顯然是一種獨占鎖,首先是公平模式的ReentrantLock,Sync是ReentractLock中的基礎類,繼承自AbstractQueuedSynchronizer,看一下代碼實作:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

abstract

static

class

Sync

extends

AbstractQueuedSynchronizer {

private

static

final

long

serialVersionUID = -5179523762034025860L;

/**

* Performs {@link Lock#lock}. The main reason for subclassing

* is to allow fast path for nonfair version.

*/

abstract

void

lock();

/**

* Performs non-fair tryLock.  tryAcquire is

* implemented in subclasses, but both need nonfair

* try for trylock method.

*/

final

boolean

nonfairTryAcquire(

int

acquires) {

final

Thread current = Thread.currentThread();

int

c = getState();

if

(c ==

) {

if

(compareAndSetState(

, acquires)) {

setExclusiveOwnerThread(current);

return

true

;

}

}

else

if

(current == getExclusiveOwnerThread()) {

int

nextc = c + acquires;

if

(nextc <

)

// overflow

throw

new

Error(

"Maximum lock count exceeded"

);

setState(nextc);

return

true

;

}

return

false

;

}

protected

final

boolean

tryRelease(

int

releases) {

int

c = getState() - releases;

if

(Thread.currentThread() != getExclusiveOwnerThread())

throw

new

IllegalMonitorStateException();

boolean

free =

false

;

if

(c ==

) {

free =

true

;

setExclusiveOwnerThread(

null

);

}

setState(c);

return

free;

}

protected

final

boolean

isHeldExclusively() {

// While we must in general read state before owner,

// we don't need to do so to check if current thread is owner

return

getExclusiveOwnerThread() == Thread.currentThread();

}

final

ConditionObject newCondition() {

return

new

ConditionObject();

}

// Methods relayed from outer class

final

Thread getOwner() {

return

getState() ==

?

null

: getExclusiveOwnerThread();

}

final

int

getHoldCount() {

return

isHeldExclusively() ? getState() :

;

}

final

boolean

isLocked() {

return

getState() !=

;

}

/**

* Reconstitutes this lock instance from a stream.

* @param s the stream

*/

private

void

readObject(java.io.ObjectInputStream s)

throws

java.io.IOException, ClassNotFoundException {

s.defaultReadObject();

setState(

);

// reset to unlocked state

}

}

Sync屬于一個公共類,它是抽象的說明Sync會被繼承,簡單整理一下Sync主要做了哪些事(因為Sync不是ReentrantLock公平鎖的關鍵):

  1. 定義了一個lock方法讓子類去實作,我們平時之是以能調用ReentrantLock的lock()方法,就是因為Sync定義了它
  2. 實作了非公平鎖tryAcquira的方法
  3. 實作了tryRelease方法,比較簡單,狀态-1,獨占鎖的線程置空
  4. 實作了isHeldExclusively方法
  5. 定義了newCondition方法,讓開發者可以利用Condition實作通知/等待

接着,看一下公平鎖的實作,FairSync類,它繼承自Sync:

static

final

class

FairSync

extends

Sync {

private

static

final

long

serialVersionUID = -3000897897090466540L;

final

void

lock() {

acquire(

1

);

}

/**

* Fair version of tryAcquire.  Don't grant access unless

* recursive call or no waiters or is first.

*/

protected

final

boolean

tryAcquire(

int

acquires) {

final

Thread current = Thread.currentThread();

int

c = getState();

if

(c ==

) {

if

(!hasQueuedPredecessors() &&

compareAndSetState(

, acquires)) {

setExclusiveOwnerThread(current);

return

true

;

}

}

else

if

(current == getExclusiveOwnerThread()) {

int

nextc = c + acquires;

if

(nextc <

)

throw

new

Error(

"Maximum lock count exceeded"

);

setState(nextc);

return

true

;

}

return

false

;

}

}

整理一下要點:

1. 每次acquire的時候,state+1,如果目前線程lock()之後又lock()了,state不斷+1,相應的unlock()的時候state-1,直到将state減到0為之,說明目前線程釋放完所有的狀态,其它線程可以競争

2. state=0的時候,通過hasQueuedPredecessors方法做一次判斷,hasQueuedPredecessors的實作為”h != t && ((s = h.next) == null || s.thread != Thread.currentThread());”,其中h是head、t是tail,由于代碼中對結果取反,是以取反之後的判斷為”h == t || ((s = h.next) != null && s.thread == Thread.currentThread());”,總結起來有兩種情況可以通過!hasQueuedPredecessors()這個判斷:

  • h==t,h==t的情況為要麼目前FIFO隊列中沒有任何資料,要麼隻建構出了一個head還沒往後面連過任何一個Node,是以head就是tail
  • (s = h.next) != null && s.thread == Thread.currentThread(),目前線程為正在等待的第一個Node中的線程

3. 如果沒有線程比目前線程等待更久去執行acquire操作,那麼通過CAS操作将state從0變為1的線程tryAcquire成功

4. 沒有tryAcquire成功的線程,按照tryAcquire的先後順序,建構為一個FIFO隊列,即第一個tryAcquire失敗的排在head的後一位,第二個tryAcquire失敗的排在head的後二位

5. 當tryAcquire成功的線程release完畢,第一個tryAcquire失敗的線程第一個嘗試tryAcquire,這就是先到先得,典型的公平鎖

非公平模式ReentrantLock實作原理

看完了公平模式ReentrantLock,接着我們看一下非公平模式ReentrantLock是如何實作的。NonfairSync類,同樣是繼承自Sync類,實作為:

static

final

class

NonfairSync

extends

Sync {

private

static

final

long

serialVersionUID = 7316153563782823691L;

/**

* Performs lock.  Try immediate barge, backing up to normal

* acquire on failure.

*/

final

void

lock() {

if

(compareAndSetState(

,

1

))

setExclusiveOwnerThread(Thread.currentThread());

else

acquire(

1

);

}

protected

final

boolean

tryAcquire(

int

acquires) {

return

nonfairTryAcquire(acquires);

}

}

結合nonfairTryAcquire方法一起講解,nonfairTryAcquire方法的實作為:

final

boolean

nonfairTryAcquire(

int

acquires) {

final

Thread current = Thread.currentThread();

int

c = getState();

if

(c ==

) {

if

(compareAndSetState(

, acquires)) {

setExclusiveOwnerThread(current);

return

true

;

}

}

else

if

(current == getExclusiveOwnerThread()) {

int

nextc = c + acquires;

if

(nextc <

)

// overflow

throw

new

Error(

"Maximum lock count exceeded"

);

setState(nextc);

return

true

;

}

return

false

;

}

看到差别就在于非公平鎖lock()的時候會先嘗試通過CAS看看能不能把state從0變為1(即擷取鎖),如果可以的話,直接擷取鎖而不需要排隊。舉個實際例子就很好了解了:

  1. 線程1、線程2、線程3競争鎖,線程1競争成功擷取鎖,線程2、線程3依次排隊
  2. 線程1執行完畢,釋放鎖,state變為0,喚醒了第一個排隊的線程2
  3. 此時線程4來嘗試擷取鎖了,由于線程2被喚醒了,是以線程2與線程4競争鎖
  4. 線程4成功将state從0變為1,線程2競争鎖失敗,繼續park

看到整個過程中,後來的線程4反而比先來的線程2先擷取鎖,相當于是一種非公平的模式,

那為什麼非公平鎖效率會比公平鎖效率高?上面第(3)步如果線程2和線程4不競争鎖就是答案。為什麼這麼說,後面的解釋很重要,希望大家可以了解:

線程1是先将state設為0,再去喚醒線程2,這兩個過程之間是有時間差的。

那麼如果線程1将state設定為0的時候,線程4就通過CAS算法擷取到了鎖,且線上程1喚醒線程2之前就已經使用完畢鎖,那麼相當于線程2擷取鎖的時間并沒有推遲,線上程1将state設定為0到線程1喚醒線程2的這段時間裡,反而有線程4擷取了鎖執行了任務,這就增加了系統的吞吐量,相當于機關時間處理了更多的任務。

從這段解釋我們也應該能看出來了,非公平鎖比較适合加鎖時間比較短的任務。這是因為加鎖時間長,相當于線程2将state設為0并去喚醒線程2的這段時間,線程4無法完成釋放鎖,那麼線程2被喚醒由于沒法擷取到鎖,又被阻塞了,這種喚醒-阻塞的操作會引起線程的上下文切換,繼而影響系統的性能。

Semaphore實作原理

Semaphore即信号量,用于控制代碼塊的并發數,将Semaphore的permits設定為1相當于就是synchronized或者ReentrantLock,Semaphore具體用法可見Java多線程19:多線程下的其他元件之CountDownLatch、Semaphore、Exchanger。信号量允許多條線程擷取鎖,顯然它的鎖是一種共享鎖,信号量也有公平模式與非公平模式,相信看懂了上面ReentrantLock的公平模式與非公平模式的朋友應該對Semaphore的公平模式與非公平模式了解起來會更快,這裡就放在一起寫了。

首先還是看一下Semaphore的基礎設施,它和ReentrantLock一樣,也有一個Sync:

abstract

static

class

Sync

extends

AbstractQueuedSynchronizer {

private

static

final

long

serialVersionUID = 1192457210091910933L;

Sync(

int

permits) {

setState(permits);

}

final

int

getPermits() {

return

getState();

}

final

int

nonfairTryAcquireShared(

int

acquires) {

for

(;;) {

int

available = getState();

int

remaining = available - acquires;

if

(remaining <

||

compareAndSetState(available, remaining))

return

remaining;

}

}

protected

final

boolean

tryReleaseShared(

int

releases) {

for

(;;) {

int

current = getState();

int

next = current + releases;

if

(next < current)

// overflow

throw

new

Error(

"Maximum permit count exceeded"

);

if

(compareAndSetState(current, next))

return

true

;

}

}

final

void

reducePermits(

int

reductions) {

for

(;;) {

int

current = getState();

int

next = current - reductions;

if

(next > current)

// underflow

throw

new

Error(

"Permit count underflow"

);

if

(compareAndSetState(current, next))

return

;

}

}

final

int

drainPermits() {

for

(;;) {

int

current = getState();

if

(current ==

|| compareAndSetState(current,

))

return

current;

}

}

}

和ReentrantLock的Sync差不多,Semaphore的Sync定義了以下的一些主要内容:

  1. getPermits方法擷取目前的許可剩餘量還剩多少,即還有多少線程可以同時獲得信号量
  2. 定義了非公平信号量擷取共享鎖的邏輯nonfairTryAcquireShared
  3. 定義了公平模式釋放信号量的邏輯tryReleaseShared,相當于釋放一次信号量,state就向上+1(信号量每次的擷取與釋放都是以1為機關的)

再看下公平信号量的實作,同樣的FairSync,繼承自Sync,代碼為:

static

final

class

FairSync

extends

Sync {

private

static

final

long

serialVersionUID = 2014338818796000944L;

FairSync(

int

permits) {

super

(permits);

}

protected

int

tryAcquireShared(

int

acquires) {

for

(;;) {

if

(hasQueuedPredecessors())

return

-

1

;

int

available = getState();

int

remaining = available - acquires;

if

(remaining <

||

compareAndSetState(available, remaining))

return

remaining;

}

}

}

首先第10行的hasQueuedPredecessors方法,前面已經說過了,如果已經有了FIFO隊列或者目前線程不是FIFO隊列中在等待的第一條線程,傳回-1,表示無法擷取共享鎖成功。

接着擷取available,available就是state,用volatile修飾,是以線程中可以看到最新的state,信号量的acquires是1,每次擷取信号量都對state-1,兩種情況直接傳回:

  1. remaining減完<0
  2. 通過cas設定成功

之後就是和之前說過的共享鎖的邏輯了,如果傳回的是一個<0的數字,那麼建構FIFO隊列,線程阻塞,直到前面的執行完才能喚醒後面的。

接着看一下非公平信号量的實作,NonfairSync繼承Sync:

static

final

class

NonfairSync

extends

Sync {

private

static

final

long

serialVersionUID = -2694183684443567898L;

NonfairSync(

int

permits) {

super

(permits);

}

protected

int

tryAcquireShared(

int

acquires) {

return

nonfairTryAcquireShared(acquires);

}

}

nonfairTryAcquireShared在父類已經實作了,再貼一下代碼:

final

int

nonfairTryAcquireShared(

int

acquires) {

for

(;;) {

int

available = getState();

int

remaining = available - acquires;

if

(remaining <

||

compareAndSetState(available, remaining))

return

remaining;

}

}

看到這裡和公平Semaphore隻有一點差别:不會前置進行一次hasQueuedPredecessors()判斷。即目前有沒有建構為一個FIFO隊列,隊列裡面第一個等待的線程是不是自身都無所謂,對于非公平Semaphore都一樣,反正線程調用Semaphore的acquire方法就将目前state-1,如果得到的remaining設定成功或者CAS操作成功就傳回,這種操作沒有遵循先到先得的原則,即非公平信号量。

至于非公平信号量對比公平信号量的優點,和ReentrantLock的非公平鎖對比ReentrantLock的公平鎖一樣,就不說了。

CountDownLatch實作原理

CountDownLatch即計數器自減的一種閉鎖,某線程阻塞,對一個計數器自減到0,此線程被喚醒,CountDownLatch具體用法可見Java多線程19:多線程下的其他元件之CountDownLatch、Semaphore、Exchanger。

CountDownLatch是一種共享鎖,通過await()方法與countDown()兩個方法實作自身的功能,首先看一下await()方法的實作:

public

void

await()

throws

InterruptedException {

sync.acquireSharedInterruptibly(

1

);

}

acquireSharedInterruptibly最終又回到tryAcquireShared方法上,直接貼整個Sync的代碼實作:

private

static

final

class

Sync

extends

AbstractQueuedSynchronizer {

private

static

final

long

serialVersionUID = 4982264981922014374L;

Sync(

int

count) {

setState(count);

}

int

getCount() {

return

getState();

}

protected

int

tryAcquireShared(

int

acquires) {

return

(getState() ==

) ?

1

: -

1

;

}

protected

boolean

tryReleaseShared(

int

releases) {

// Decrement count; signal when transition to zero

for

(;;) {

int

c = getState();

if

(c ==

)

return

false

;

int

nextc = c-

1

;

if

(compareAndSetState(c, nextc))

return

nextc ==

;

}

}

}

其實看到tryAcquireShared方法,了解AbstractQueuedSynchronizer共享鎖原理的,不用看countDown方法應該都能猜countDown方法是如何實作的。我這裡總結一下:

  1. 傳入一個count,state就等于count,await的時候判斷是不是0,是0傳回1表示成功,不是0傳回-1表示失敗,建構FIFO隊列,head頭隻連接配接一個Node,Node中的線程就是調用CountDownLatch的await()方法的線程
  2. 每次countDown的時候對state-1,直到state減到0的時候才算tryReleaseShared成功,tryReleaseShared成功,喚醒被挂起的線程

為了驗證(2),看一下上面Sync的tryReleaseShared方法就可以了,确實是這麼實作的。

再了解獨占鎖與共享鎖的差別

本文詳細分析了ReentrantLock、Semaphore、CountDownLatch的實作原理,第一個是基于獨占鎖的實作,後兩個是基于共享鎖的實作,從這三個類我們可以再總結一下獨占鎖與共享鎖的差別,主要在兩點上:

  1. 獨占鎖同時隻有一條線程可以acquire成功,獨占鎖同時可能有多條線程可以acquire成功,Semaphore是典型例子;
  2. 獨占鎖每次隻能喚醒一個Node,共享鎖每次喚醒的時候可以将狀态向後傳播,即可能喚醒多個Node,CountDownLatch是典型例子。

帶着這兩個結論再看ReentrantLock、Semaphore、CountDownLatch,你一定會對獨占鎖與共享鎖了解更深。 

熬夜不易,點選請老王喝杯烈酒!!!!!!!