天天看點

java多線程--AtomicLong和LongAdder

AtomicLong簡要介紹

AtomicLong是作用是對長整形進行原子操作,顯而易見,在java1.8中新加入了一個新的原子類LongAdder,該類也可以保證Long類型操作的原子性,相對于AtomicLong,LongAdder有着更高的性能和更好的表現,可以完全替代AtomicLong的來進行原子操作。

在32位作業系統中,64位的long 和 double 變量由于會被JVM當作兩個分離的32位來進行操作,是以不具有原子性。而使用AtomicLong能讓long的操作保持原子型。

AtomicLong的代碼很簡單,下面僅以incrementAndGet()為例,對AtomicLong的原理進行說明。

incrementAndGet()源碼如下:

public final long incrementAndGet() {
    for (;;) {
        // 擷取AtomicLong目前對應的long值
        long current = get();
        // 将current加1
        long next = current + 1;
        // 通過CAS函數,更新current的值
        if (compareAndSet(current, next))
            return next;
    }
}<span style="font-family: 'Courier New' !important; font-size: 12px !important; line-height: 1.5 !important; color: rgb(0, 0, 0);"></span>
           

說明:

(01) incrementAndGet()首先會根據get()擷取AtomicLong對應的long值。該值是volatile類型的變量,get()的源碼如下:

// value是AtomicLong對應的long值
private volatile long value;
// 傳回AtomicLong對應的long值
public final long get() {
    return value;
}<span style="font-family: 'Courier New' !important; font-size: 12px !important; line-height: 1.5 !important; color: rgb(0, 0, 0);"></span>
           

(02) incrementAndGet()接着将current加1,然後通過CAS函數,将新的值指派給value。

compareAndSet()的源碼如下:

public final boolean compareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}<span style="font-family: 'Courier New' !important; font-size: 12px !important; line-height: 1.5 !important; color: rgb(0, 0, 0);"></span>
           

compareAndSet()的作用是更新AtomicLong對應的long值。它會比較AtomicLong的原始值是否與expect相等,若相等的話,則設定AtomicLong的值為update。

比AtomicLong更高效的LongAdder 源碼解析

接觸到AtomicLong的原因是在看guava的LoadingCache相關代碼時,關于LoadingCache,其實思路也非常簡單清晰:用模闆模式解決了緩存不命中時擷取資料的邏輯,這個思路我早前也正好在項目中使用到。

言歸正傳,為什麼說LongAdder引起了我的注意,原因有二:

1. 作者是Doug lea ,地位實在舉足輕重。

2. 他說這個比AtomicLong高效。

我們知道,AtomicLong已經是非常好的解決方案了,涉及并發的地方都是使用CAS操作,在硬體層次上去做 compare and set操作。效率非常高。

是以,我決定研究下,為什麼LongAdder比AtomicLong高效。

首先,看LongAdder的繼承樹:

java多線程--AtomicLong和LongAdder

繼承自Striped64,這個類包裝了一些很重要的内部類和操作。稍候會看到。

正式開始前,強調下,我們知道,AtomicLong的實作方式是内部有個value 變量,當多線程并發自增,自減時,均通過cas 指令從機器指令級别操作保證并發的原子性。

再看看LongAdder的方法:

java多線程--AtomicLong和LongAdder

怪不得可以和AtomicLong作比較,連API都這麼像。我們随便挑一個API入手分析,這個API通了,其他API都大同小異,是以,我選擇了add這個方法。事實上,其他API也都依賴這個方法。

java多線程--AtomicLong和LongAdder

LongAdder中包含了一個Cell 數組,Cell是Striped64的一個内部類,顧名思義,Cell 代表了一個最小單元,這個單元有什麼用,稍候會說道。先看定義:

java多線程--AtomicLong和LongAdder

Cell内部有一個非常重要的value變量,并且提供了一個cas更新其值的方法。

回到add方法:

java多線程--AtomicLong和LongAdder

這裡,我有個疑問,AtomicLong已經使用CAS指令,非常高效了(比起各種鎖),LongAdder如果還是用CAS指令更新值,怎麼可能比AtomicLong高效了? 何況内部還這麼多判斷!!!

這是我開始時最大的疑問,是以,我猜想,難道有比CAS指令更高效的方式出現了? 帶着這個疑問,繼續。

第一if 判斷,第一次調用的時候cells數組肯定為null,是以,進入casBase方法:

java多線程--AtomicLong和LongAdder

原子更新base沒啥好說的,如果更新成功,本地調用開始傳回,否則進入分支内部。

什麼時候會更新失敗? 沒錯,并發的時候,好戲開始了,AtomicLong的處理方式是死循環嘗試更新,直到成功才傳回,而LongAdder則是進入這個分支。

分支内部,通過一個Threadlocal變量threadHashCode 擷取一個HashCode對象,該HashCode對象依然是Striped64類的内部類,看定義:

java多線程--AtomicLong和LongAdder

有個code變量,儲存了一個非0的随機數随機值。

回到add方法:

java多線程--AtomicLong和LongAdder

拿到該線程相關的HashCode對象後,擷取它的code變量,as[(n-1)h] 這句話相當于對h取模,隻不過比起取摸,因為是 與 的運算是以效率更高。

計算出一個在Cells 數組中當先線程的HashCode對應的 索引位置,并将該位置的Cell 對象拿出來更新cas 更新它的value值。

當然,如果as 為null 并且更新失敗,才會進入retryUpdate方法。

看到這裡我想應該有很多人明白為什麼LongAdder會比AtomicLong更高效了,沒錯,唯一會制約AtomicLong高效的原因是高并發,高并發意味着CAS的失敗幾率更高, 重試次數更多,越多線程重試,CAS失敗幾率又越高,變成惡性循環,AtomicLong效率降低。 那怎麼解決? LongAdder給了我們一個非常容易想到的解決方案: 減少并發,将單一value的更新壓力分擔到多個value中去,降低單個value的 “熱度”,分段更新!!!   這樣,線程數再多也會分擔到多個value上去更新,隻需要增加value就可以降低 value的 “熱度”  AtomicLong中的 惡性循環不就解決了嗎? cells 就是這個 “段” cell中的value 就是存放更新值的, 這樣,當我需要總數時,把cells 中的value都累加一下不就可以了麼!!

當然,聰明之處遠遠不僅僅這裡,在看看add方法中的代碼,casBase方法可不可以不要,直接分段更新,上來就計算 索引位置,然後更新value?

答案是不好,不是不行,因為,casBase操作等價于AtomicLong中的cas操作,要知道,LongAdder這樣的處理方式是有壞處的,分段操作必然帶來空間上的浪費,可以空間換時間,但是,能不換就不換,看空間時間都節約~! 是以,casBase操作保證了在低并發時,不會立即進入分支做分段更新操作,因為低并發時,casBase操作基本都會成功,隻有并發高到一定程度了,才會進入分支,是以,Doug Lead對該類的說明是: 低并發時LongAdder和AtomicLong性能差不多,高并發時LongAdder更高效!

java多線程--AtomicLong和LongAdder

但是,Doung Lea 還是沒這麼簡單,聰明之處還沒有結束……

如此,retryUpdate中做了什麼事,也基本略知一二了,因為cell中的value都更新失敗(說明該索引到這個cell的線程也很多,并發也很高時) 或者cells數組為空時才會調用retryUpdate,

是以,retryUpdate裡面應該會做兩件事:

1. 擴容,将cells數組擴大,降低每個cell的并發量,同樣,這也意味着cells數組的rehash動作。

2. 給空的cells變量賦一個新的Cell數組。

是不是這樣呢? 繼續看代碼:

代碼比較長,變成文本看看,為了友善大家看if else 分支,對應的  { } 我用相同的顔色标注出來

可以看到,這個時候Doug Lea才願意使用死循環保證更新成功~!

01

final

void

retryUpdate(

long

x, HashCode hc, 

boolean

wasUncontended) {

02

int

h = hc.code;

03

boolean

collide = 

false

;                

// True if last slot nonempty

04

for

(;;) {

05

Cell[] as; Cell a; 

int

n; 

long

v;

06

if

((as = cells) != 

null

&amp;&amp; (n = as.length) &gt; 

) {

// 分支1

07

if

((a = as[(n - 

1

) &amp; h]) == 

null

) {

08

if

(busy == 

) {            

// Try to attach new Cell

09

Cell r = 

new

Cell(x);   

// Optimistically create

10

if

(busy == 

&amp;&amp; casBusy()) {

11

boolean

created = 

false

;

12

try

{               

// Recheck under lock

13

Cell[] rs; 

int

m, j;

14

if

((rs = cells) != 

null

&amp;&amp;

15

(m = rs.length) &gt; 

&amp;&amp;

16

rs[j = (m - 

1

) &amp; h] == 

null

) {

17

rs[j] = r;

18

created = 

true

;

19

}

20

finally

{

21

busy = 

;

22

}

23

if

(created)

24

break

;

25

continue

;           

// Slot is now non-empty

26

}

27

}

28

collide = 

false

;

29

}

30

else

if

(!wasUncontended)       

// CAS already known to fail

31

wasUncontended = 

true

;      

// Continue after rehash

32

else

if

(a.cas(v = a.value, fn(v, x)))

33

break

;

34

else

if

(n &gt;= NCPU || cells != as)

35

collide = 

false

;            

// At max size or stale

36

else

if

(!collide)

37

collide = 

true

;

38

else

if

(busy == 

&amp;&amp; casBusy()) {

39

try

{

40

if

(cells == as) {      

// Expand table unless stale

41

Cell[] rs = 

new

Cell[n &lt;&lt; 

1

];

42

for

(

int

i = 

; i &lt; n; ++i)

43

rs[i] = as[i];

44

cells = rs;

45

}

46

finally

{

47

busy = 

;

48

}

49

collide = 

false

;

50

continue

;                   

// Retry with expanded table

51

}

52

h ^= h &lt;&lt; 

13

;                   

// Rehash                 h ^= h &gt;&gt;&gt; 17;

53

h ^= h &lt;&lt; 

5

;

54

}

55

else

if

(busy == 

&amp;&amp; cells == as &amp;&amp; casBusy()) {

//分支2

56

boolean

init = 

false

;

57

try

{                           

// Initialize table

58

if

(cells == as) {

59

Cell[] rs = 

new

Cell[

2

];

60

rs[h &amp; 

1

] = 

new

Cell(x);

61

cells = rs;

62

init = 

true

;

63

}

64

finally

{

65

busy = 

;

66

}

67

if

(init)

68

break

;

69

}

70

else

if

(casBase(v = base, fn(v, x)))

71

break

;                          

// Fall back on using base

72

}

73

hc.code = h;                            

// Record index for next time

74

}

分支2中,為cells為空的情況,需要new 一個Cell數組。

分支1分支中,略複雜一點點:

注意,幾個分支中都提到了busy這個方法,這個可以了解為一個cas實作的鎖,隻有在需要更新cells數組的時候才會更新該值為1,如果更新失敗,則說明目前有線程在更新cells數組,目前線程需要等待。重試。

回到分支1中,這裡首先判斷目前cells數組中的索引位置的cell元素是否為空,如果為空,則添加一個cell到數組中。

否則更新 标示沖突的标志位wasUncontended 為 true ,重試。

否則,再次更新cell中的value,如果失敗,重試。

。。。。。。。一系列的判斷後,如果還是失敗,下下下策,reHash,直接将cells數組擴容一倍,并更新目前線程的hash值,保證下次更新能盡可能成功。

可以看到,LongAdder确實用了很多心思減少并發量,并且,每一步都是在”沒有更好的辦法“的時候才會選擇更大開銷的操作,進而盡可能的用最最簡單的辦法去完成操作。追求簡單,但是絕對不粗暴。

————————————————分割線——————————————————————-

昨天和左耳朵耗子簡單讨論了下,發現左耳朵耗子對讀者思維的引導還是非常不錯的,在第一次發現這個類後,對裡面的實作又提出了更多的問題,引導大家思考,值得學習。

我們 發現的問題有這麼幾個:

1. jdk 1.7中是不是有這個類?

我确認後,結果如下:    jdk-7u51 版本上還沒有  但是jdk-8u20版本上已經有了。代碼基本一樣 ,增加了對double類型的支援和删除了一些備援的代碼。有興趣的同學可以去下載下傳下JDK 1.8看看

2. base有沒有參與彙總?

base在調用intValue等方法的時候是會彙總的:

java多線程--AtomicLong和LongAdder

3. base的順序可不可以調換?

左耳朵耗子,提出了這麼一個問題: 在add方法中,如果cells不會為空後,casBase方法一直都沒有用了?

是以,我想可不可以調換add方法中的判斷順序,比如,先做casBase的判斷,結果是 不調換可能更好,調換後每次都要CAS一下,在高并發時,失敗幾率非常高,并且是惡性循環,比起一次判斷,後者的開銷明顯小很多,還沒有副作用。是以,不調換可能會更好。

4. AtomicLong可不可以廢掉?

我的想法是可以廢掉了,因為,雖然LongAdder在空間上占用略大,但是,它的性能已經足以說明一切了,無論是從節約空的角度還是執行效率上,AtomicLong基本沒有優勢了,具體看這個測試(感謝coolshell讀者Lemon的回複):http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/