天天看點

AtomicInteger超級更新版LongAdder詳解

LongAdder原理分析

AtomicLong通過CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器來說它的性能已經很好了,但是JDK開發組并不滿足于此。使用AtomicLong時,在高并發下大量線程會同時去競争更新同一個原子變量,但是由于同時隻有一個線程的CAS操作會成功,這就造成了大量線程競争失敗後,會通過無限循環不斷進行自旋嘗試CAS的操作,而這會白白浪費CPU資源。

是以JDK 8新增了一個原子性遞增或者遞減類LongAdder用來克服在高并發下使用AtomicLong的缺點。既然AtomicLong的性能瓶頸是由于過多線程同時去競争一個變量的更新而産生的,那麼如果把一個變量分解為多個變量,讓同樣多的線程去競争多個資源,是不是就解決了性能問題?是的,LongAdder就是這個思路。下面通過圖來了解兩者設計的不同之處,如下圖所示

AtomicInteger超級更新版LongAdder詳解

如下圖所示,使用AtomicLong時,是多個線程同時競争同一個原子變量

AtomicInteger超級更新版LongAdder詳解

使用LongAdder時,則是在内部維護多個Cell變量,每個Cell裡面有一個初始值為0的long型變量,這樣,在同等并發量的情況下,争奪單個變量更新操作的線程量會減少,這變相地減少了争奪共享資源的并發量。另外,多個線程在争奪同一個Cell原子變量時如果失敗了,它并不是在目前Cell變量上一直自旋CAS重試,而是嘗試在其他Cell的變量上進行CAS嘗試,這個改變增加了目前線程重試CAS成功的可能性。最後,在擷取LongAdder目前值時,是把所有Cell變量的value值累加後再加上base傳回的。

LongAdder維護了一個延遲初始化的原子性更新數組(預設情況下Cell數組是null)和一個基值變量base。由于Cells占用的記憶體是相對比較大的,是以一開始并不建立它,而是在需要時建立,也就是惰性加載。

當一開始判斷Cell數組是null并且并發線程較少時,所有的累加操作都是對base變量進行的。保持Cell數組的大小為2的N次方,在初始化時Cell數組中的Cell元素個數為2,數組裡面的變量實體是Cell類型。Cell類型是AtomicLong的一個改進,用來減少緩存的争用,也就是解決僞共享問題。

對于大多數孤立的多個原子操作進行位元組填充是浪費的,因為原子性操作都是無規律地分散在記憶體中的(也就是說多個原子性變量的記憶體位址是不連續的),多個原子變量被放入同一個緩存行的可能性很小。但是原子性數組元素的記憶體位址是連續的,是以數組内的多個元素能經常共享緩存行,是以這裡使用@sun.misc.Contended注解對Cell類進行位元組填充,這防止了數組中多個元素共享一個緩存行,在性能上是一個提升。

LongAdder代碼分析

為了解決高并發下多線程對一個變量CAS争奪失敗後進行自旋而造成的降低并發性能問題,LongAdder在内部維護多個Cell元素(一個動态的Cell數組)來分擔對單個變量進行争奪的開銷。下面圍繞以下話題從源碼角度來分析LongAdder的實作:

(1)LongAdder的結構是怎樣的?

(2)目前線程應該通路Cell數組裡面的哪一個Cell元素?

(3)如何初始化Cell數組?

(4)Cell數組如何擴容?

(5)線程通路配置設定的Cell元素有沖突後如何處理?

(6)如何保證線程操作被配置設定的Cell元素的原子性?

首先看下LongAdder的類圖結構

AtomicInteger超級更新版LongAdder詳解

由該圖可知,LongAdder類繼承自Striped64類,在Striped64内部維護着三個變量。LongAdder的真實值其實是base的值與Cell數組裡面所有Cell元素中的value值的累加,base是個基礎值,預設為0。cellsBusy用來實作自旋鎖,狀态值隻有0和1,當建立Cell元素,擴容Cell數組或者初始化Cell數組時,使用CAS操作該變量來保證同時隻有一個線程可以進行其中之一的操作。

下面看Cell的構造

/**
     * Padded variant of AtomicLong supporting only raw accesses plus CAS.
     *
     * JVM intrinsics note: It would be possible to use a release-only
     * form of CAS here, if it were provided.
     */
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
           

可以看到,Cell的構造很簡單,其内部維護一個被聲明為volatile的變量,這裡聲明為volatile是因為線程操作value變量時沒有使用鎖,為了保證變量的記憶體可見性這裡将其聲明為volatile的。另外cas函數通過CAS操作,保證了目前線程更新時被配置設定的Cell元素中value值的原子性。另外,Cell類使用@sun.misc.Contended修飾是為了避免僞共享。到這裡我們回答了問題1和問題6。

long sum()傳回目前的值,内部操作是累加所有Cell内部的value值後再累加base。例如下面的代碼,由于計算總和時沒有對Cell數組進行加鎖,是以在累加過程中可能有其他線程對Cell中的值進行了修改,也有可能對數組進行了擴容,是以sum傳回的值并不是非常精确的,其傳回值并不是一個調用sum方法時的原子快照值。

/**
     * Returns the current sum.  The returned value is <em>NOT</em> an
     * atomic snapshot; invocation in the absence of concurrent
     * updates returns an accurate result, but concurrent updates that
     * occur while the sum is being calculated might not be
     * incorporated.
     *
     * @return the sum
     */
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
           

下面主要看下add方法的實作,從這個方法裡面就可以找到其他問題的答案。

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        //如果cells不為null或者線程執行CAS操作失敗了
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            //決定目前線程應該通路cells數組裡面的哪一個Cell元素
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                //目前線程映射的元素存在則執行下面這行代碼
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
           

代碼首先看cells是否為null,如果為null則目前在基礎變量base上進行累加,這時候就類似AtomicLong的操作。如果cells不為null或者線程執行CAS操作失敗了,則會進入第一個if判斷。第二個if判斷決定目前線程應該通路cells數組裡面的哪一個Cell元素

下面重點研究longAccumulate的代碼邏輯,這是cells數組被初始化和擴容的地方。

/**
     * Handles cases of updates involving initialization, resizing,
     * creating new Cells, and/or contention. See above for
     * explanation. This method suffers the usual non-modularity
     * problems of optimistic retry code, relying on rechecked sets of
     * reads.
     *
     * @param x the value
     * @param fn the update function, or null for add (this convention
     * avoids the need for an extra field or function in LongAdder).
     * @param wasUncontended false if CAS failed before call
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //(6) 初始化目前線程變量ThreadLocalRandomProbe得值
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { //(7)
                if ((a = as[(n - 1) & h]) == null) { //(8)
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                    //目前cell存在,執行cas操作(9)
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                    //目前Cell數組元素個數大于CPU個數(10)
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) { //初始化Cell數組(14)
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
           

當每個線程第一次執行到代碼(6)時,會初始化目前線程變量threadLocalRandomProbe的值,上面也說了,這個變量在計算目前線程應該被配置設定到cells數組的哪一個Cell元素時會用到。

cells數組的初始化是在代碼(14)中進行的,其中cellsBusy是一個标示,為0說明目前cells數組沒有在被初始化或者擴容,也沒有在建立Cell元素,為1則說明cells數組在被初始化或者擴容,或者目前在建立新的Cell元素、通過CAS操作來進行0或1狀态的切換,這裡使用casCellsBusy函數。假設目前線程通過CAS設定cellsBusy為1,則目前線程開始初始化操作,那麼這時候其他線程就不能進行擴容了

如代碼(14)初始化cells數組元素個數為2,然後使用h&1計算目前線程應該通路celll數組的哪個位置,也就是使用目前線程的threadLocalRandomProbe變量值&(cells數組元素個數-1),然後标示cells數組已經被初始化,最後代碼重置了cellsBusy标記。顯然這裡沒有使用CAS操作,卻是線程安全的,原因是cellsBusy是volatile類型的,這保證了變量的記憶體可見性,另外此時其他地方的代碼沒有機會修改cellsBusy的值。在這裡初始化的cells數組裡面的兩個元素的值目前還是null。這裡回答了問題3,知道了cells數組如何被初始化。

在代碼(7)(8)中,目前線程調用add方法并根據目前線程的随機數threadLocalRandomProbe和cells元素個數計算要通路的Cell元素下标,然後如果發現對應下标元素的值為null,則新增一個Cell元素到cells數組,并且在将其添加到cells數組之前要競争設定cellsBusy為1。

總結:

  1. 類通過内部cells數組分擔了高并發下多線程同時對一個原子變量進行更新時的競争量,讓多個線程可以同時對cells數組裡面的元素進行并行的更新操作。另外,數組元素Cell使用@sun.misc.Contended注解進行修飾,這避免了cells數組内多個原子變量被放入同一個緩存行,也就是避免了僞共享,這對性能也是一個提升。
  2. 由于SUM() 不具有原子性,是以需要嚴格原子自增場景不适用LongAdapter,如果可以最終一緻得自增場景就非常适用了

繼續閱讀