天天看點

LongAdder解析

 對

LongAdder

的最初了解是從Coolshell上的一篇文章中獲得的,但是一直都沒有深入的了解過其實作,隻知道它相較于

AtomicLong

來說,更加适合寫多讀少的并發情景。今天,我們就研究一下

LongAdder

的原理,探究一下它如此高效的原因。

基本原理和思想

 Java有很多并發控制機制,比如說以AQS為基礎的鎖或者以CAS為原理的自旋鎖。不了解AQS的朋友可以閱讀我之前的

AQS源碼解析文章

。一般來說,CAS适合輕量級的并發操作,也就是并發量并不多,而且等待時間不長的情況,否則就應該使用普通鎖,進入阻塞狀态,避免CPU空轉。

 是以,如果你有一個Long類型的值會被多線程修改,那麼使用CAS進行并發控制比較好,但是如果你是需要鎖住一些資源,然後進行資料庫操作,那麼還是使用阻塞鎖比較好。

 第一種情況下,我們一般都使用

AtomicLong

AtomicLong

是通過無限循環不停的采取CAS的方法去設定内部的value,直到成功為止。那麼當并發數比較多或出現更新熱點時,就會導緻CAS的失敗機率變高,重試次數更多,越多的線程重試,CAS失敗的機率越高,形成惡性循環,進而降低了效率。

 而LongAdder的原理就是降低對value更新的并發數,也就是将對單一value的變更壓力分散到多個value值上,降低單個value的“熱度”。

 我們知道

LongAdder

的大緻原理之後,再來詳細的了解一下它的具體實作,其中也有很多值得借鑒的并發程式設計的技巧。

LongAdder的成員變量

LongAdder

Striped64

的子類,其有三個比較重要的成員函數,在之後的函數分析中需要使用到,這裡先說明一下。

// CPU的數量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Cell對象的數組,長度一般是2的指數
transient volatile Cell[] cells;
// 基礎value值,當并發較低時,隻累加該值
transient volatile long base;
// 建立或者擴容Cells數組時使用的自旋鎖變量
transient volatile int cellsBusy;
           

cells

LongAdder

的父類

Striped64

中的

Cell

數組類型的成員變量。每個

Cell

對象中都包含一個value值,并提供對這個value值的CAS操作。

static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
}           

Add操作

 我們首先來看一下

LongAdder

add

函數,其會多次嘗試CAS操作将值進行累加,如果成功了就直接傳回,失敗則繼續執行。代碼比較複雜,而且涉及的情況比較多,我們就以梳理曆次嘗試CAS操作為主線,講清楚這些CAS操作的前提條件和場景。

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 當cells數組為null時,會進行第一次cas操作嘗試。
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null || 
            !(uncontended = a.cas(v = a.value, v + x)))
            // 當cells數組不為null,并且通過getProbe() & m
            // 定位的Cell對象不為null時進行第二次CAS操作。
            // 如果執行不成功,則進入longAccumulate函數。
            longAccumulate(x, null, uncontended); 
    }
}           

 當并發量較少時,cell數組尚未初始化,是以隻調用

casBase

函數,對base變量進行CAS累加。

 我們來看一下

casBase

函數相關的源碼吧。我們可以認為變量

base

就是第一個value值,也是基礎value變量。先調用casBase函數來cas一下base變量,如果成功了,就不需要在進行下面比較複雜的算法,

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}           

 當并發量逐漸提高時,

casBase

函數會失敗。如果cells數組為null或為空,就直接調用

longAccumulate

方法。因為cells為null或在為空,說明cells未初始化,是以調用

longAccumulate

進行初始化。否則繼續判斷。

 如果cells中已經初始化,就繼續進行後續判斷。我們先來了解一下

getProbe() & m

的這個操作吧,可以把這個操作當作一次計算"hash"值,然後将cells中這個位置的Cell對象指派給變量a。如果變量a不為null,那麼就調用該對象的cas方法去設定其value值。如果a為null,或在cas指派發生沖突,那麼調用

longAccumulate

方法。

LongAccumulate方法

longAccumulate

函數比較複雜,帶有我的注釋的代碼已經貼在了文章後邊,這裡我們就隻講一下其中比較關鍵的一些技巧和思想。

 首先,我們都知道隻有當對

base

的cas操作失敗之後,

LongAdder

才引入

Cell

數組.是以在

longAccumulate

中就是對

Cell

數組進行操作,分别涉及了數組的初始化,擴容和設定某個位置的Cell對象等操作。

 在這段代碼中,關于

cellBusy

的cas操作構成了一個SpinLock,這就是經典的SpinLock的程式設計技巧,大家可以學習一下。

 我們先來看一下

longAccumulate

的主體代碼,首先是一個無限for循環,然後根據cells數組的狀态來判斷是要進行cells數組的初始化,還是進行對象添加或者擴容。

final void longAccumulate(long x, LongBinaryOperator fn,
                             boolean wasUncontended) {
       int h;
       if ((h = getProbe()) == 0) { 
           //擷取PROBE變量,探針變量,與目前運作的線程相關,不同線程不同
           ThreadLocalRandom.current(); 
       //初始化PROBE變量,和getProbe都使用Unsafe類提供的原子性操作。
           h = getProbe();
           wasUncontended = true;
       }
       boolean collide = false;
       for (;;) { //cas經典無限循環,不斷嘗試
           Cell[] as; Cell a; int n; long v;
           if ((as = cells) != null && (n = as.length) > 0) { 
           // cells不為null,并且數組size大于0,表示cells已經初始化了
           // 初始化Cell對象并設定到數組中或者進行數組擴容
           }
           else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
           //cells數組未初始化,獲得cellsBusy lock,進行cells數組的初始化
           // cells數組初始化操作
           }
          //如果初始化數組失敗了,那就再次嘗試一下直接cas base變量,
          // 如果成功了就直接傳回,這是最後一個進行CAS操作的地方。
           else if (casBase(v = base, ((fn == null) ? v + x :
                                       fn.applyAsLong(v, x))))
               break;
       }
   }           

 進行Cell數組代碼如下所示,它首先調用

casCellsBusy

函數擷取了

cellsBusy

‘鎖’,然後進行數組的初始化操作,最後将

cellBusy

'鎖'釋放掉。

// 注意在進入這段代碼之前已經casCellsBusy獲得cellsBusy這個鎖變量了。
boolean init = false;
try {
    if (cells == as) {
        Cell[] rs = new Cell[2];
        rs[h & 1] = new Cell(x); //設定x的值為cell對象的value值
        cells = rs;
        init = true;
    }
} finally {
    cellsBusy = 0;
}
if (init)
    break;           

 如果Cell數組已經初始化過了,那麼就進行Cell數組的設定或者擴容。這部分代碼有一系列的if else的判斷,如果前一個條件不成立,才會進入下一條判斷。

 首先,當Cell數組中對應位置的cell對象為null時,表明該位置的Cell對象需要進行初始化,是以使用

casCellsBusy

函數擷取'鎖',然後初始化Cell對象,并且設定進cells數組,最後釋放掉'鎖'。

 當Cell數組中對應位置的cell對象不為null,則直接調用其cas操作進行累加。

 當上述操作都失敗後,認為多個線程在對同一個位置的Cell對象進行操作,這個Cell對象是一個“熱點”,是以Cell數組需要進行擴容,将熱點分散。

if ((a = as[(n - 1) & h]) == null) { //通過與操作計算出來需要操作的Cell對象的坐标
    if (cellsBusy == 0) { //volatile 變量,用來實作spinLock,來在初始化和resize cells數組時使用。
    //當cellsBusy為0時,表示目前可以對cells數組進行操作。 
        Cell r = new Cell(x);//将x值直接指派給Cell對象
        if (cellsBusy == 0 && casCellsBusy()) {//如果這個時候cellsBusy還是0
        //就cas将其設定為非0,如果成功了就是獲得了spinLock的鎖.可以對cells數組進行操作.
        //如果失敗了,就會再次執行一次循環
            boolean created = false;
            try {
                Cell[] rs; int m, j;
                //判斷cells是否已經初始化,并且要操作的位置上沒有cell對象.
                if ((rs = cells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                    rs[j] = r; //将之前建立的值為x的cell對象指派到cells數組的響應位置.
                    created = true;
                }
            } finally {
                //經典的spinLock程式設計技巧,先獲得鎖,然後try finally将鎖釋放掉
                //将cellBusy設定為0就是釋放鎖.
                cellsBusy = 0;
            }
            if (created)
                break; //如果建立成功了,就是使用x建立了新的cell對象,也就是新建立了一個分擔熱點的value
            continue; 
        }
    }
    collide = false; //未發生碰撞
}
else if (!wasUncontended)//是否已經發生過一次cas操作失敗
    wasUncontended = true; //設定成true,以便第二次進入下一個else if 判斷
else if (a.cas(v = a.value, ((fn == null) ? v + x :
                            fn.applyAsLong(v, x))))
     //fn是操作類型,如果是空,就是相加,是以讓a這個cell對象中的value值和x相加,然後在cas設定,如果成果
    //就直接傳回
    break;
else if (n >= NCPU || cells != as)
  //如果cells數組的大小大于系統的可獲得處理器數量或在as不再和cells相等.
    collide = false;
else if (!collide)
    collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
  //再次獲得cellsBusy這個spinLock,對數組進行resize
    try {
        if (cells == as) {//要再次檢測as是否等于cells以免其他線程已經對cells進行了操作.
            Cell[] rs = new Cell[n << 1]; //擴容一倍
            for (int i = 0; i < n; ++i)
                rs[i] = as[i];
            cells = rs;//賦予cells一個新的數組對象
        }
    } finally {
        cellsBusy = 0;
    }
    collide = false;
    continue;
}
h = advanceProbe(h);//由于使用目前探針變量無法操作成功,是以重新設定一個,再次嘗試           

後記

 本篇文章寫的不是很好,我寫完之後又看了一遍coolshell上的關于

LongAdder

的文章,感覺自己沒有人家寫的那麼簡潔明了。我對代碼細節的注釋和投入太多了。其實很多代碼大家都可以看懂,并不需要大量的代碼片段加注釋。以後要注意一下。之後會接着研究一下JUC包中的其他類,希望大家多多關注。