天天看點

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder & LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結

文章目錄

  • 概述
  • 小Demo
  • 源碼分析
  • 重要的方法
    • long sum()
    • reset
    • sumThenReset
    • longValue()
    • add(long x)
    • longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
  • LongAdder 小結
  • LongAccumulator
    • 概述
    • LongAdder#add vs LongAccumulator#accumulate
    • 小Demo
  • 原子類總結

概述

Java Review - 并發程式設計_原子操作類原理剖析中提到了 AtomicLong通過CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器來說它的性能已經很好了,但是JDK開發組并不滿足于此。使用AtomicLong時,在高并發下大量線程會同時去競争更新同一個原子變量,但是由于同時隻有一個線程的CAS操作會成功,這就造成了大量線程競争失敗後,會通過無限循環不斷進行自旋嘗試CAS的操作,而這會白白浪費CPU資源。

是以JDK 8新增了一個原子性遞增或者遞減類LongAdder用來克服在高并發下使用AtomicLong的缺點。

既然AtomicLong的性能瓶頸是由于過多線程同時去競争一個變量的更新而産生的,那麼如果把一個變量分解為多個變量,讓同樣多的線程去競争多個資源,是不是就解決了性能問題?是的,LongAdder就是這個思路。

下面通過圖來了解兩者設計的不同之處

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder & LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結
Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder & LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結
  • 使用LongAdder時,則是在内部維護多個Cell變量,每個Cell裡面有一個初始值為0的long型變量,這樣,在同等并發量的情況下,争奪單個變量更新操作的線程量會減少,這變相地減少了争奪共享資源的并發量。
  • 另外,多個線程在争奪同一個Cell原子變量時如果失敗了,它并不是在目前Cell變量上一直自旋CAS重試,而是嘗試在其他Cell的變量上進行CAS嘗試,這個改變增加了目前線程重試CAS成功的可能性。
  • 最後,在擷取LongAdder目前值時,是把所有Cell變量的value值累加後再加上base傳回的。

LongAdder維護了一個延遲初始化的原子性更新數組(預設情況下Cell數組是null)和一個基值變量base。由于Cells占用的記憶體是相對比較大的,是以一開始并不建立它,而是在需要時建立,也就是惰性加載。

當一開始判斷Cell數組是null并且并發線程較少時,所有的累加操作都是對base變量進行的。保持Cell數組的大小為2的N次方,在初始化時Cell數組中的Cell元素個數為2,數組裡面的變量實體是Cell類型。Cell類型是AtomicLong的一個改進,用來減少緩存的争用,也就是解決僞共享問題。

對于大多數孤立的多個原子操作進行位元組填充是浪費的,因為原子性操作都是無規律地分散在記憶體中的(也就是說多個原子性變量的記憶體位址是不連續的),多個原子變量被放入同一個緩存行的可能性很小。但是原子性數組元素的記憶體位址是連續的,是以數組内的多個元素能經常共享緩存行,是以這裡使用@sun.misc.Contended注解對Cell類進行位元組填充,這防止了數組中多個元素共享一個緩存行,在性能上是一個提升。

小Demo

import java.util.concurrent.atomic.LongAdder;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/11/30 22:52
 * @mark: show me the code , change the world
 */
public class AtomicLongTest {

    //(10)建立Long型原子計數器
  //  private static AtomicLong atomicLong = new AtomicLong();

    private static LongAdder longAdder = new LongAdder();

    //(11)建立資料源
    private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 5, 6, 0, 56, 0};

    private static Integer[] arrayTwo = new Integer[]{10, 1, 2, 3, 0, 5, 6, 0, 56, 0};

    public static void main(String[] args) throws InterruptedException {
        //(12)線程one統計數組arrayOne中0的個數
        Thread threadOne = new Thread(() -> {
            int size = arrayOne.length;
            for (int i = 0; i < size; ++i) {
                if (arrayOne[i].intValue() == 0) {
                    longAdder.increment();
                }
            }

        });
        //(13)線程two統計數組arrayTwo中0的個數
        Thread threadTwo = new Thread(() -> {
            int size = arrayTwo.length;
            for (int i = 0; i < size; ++i) {
                if (arrayTwo[i].intValue() == 0) {
                    longAdder.increment();
                }
            }
        });
        //(14)啟動子線程
        threadOne.start();
        threadTwo.start();
        //(15)等待線程執行完畢
        threadOne.join();
        threadTwo.join();
        System.out.println("count 0:" + longAdder.sum());
    }

}           

複制

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder &amp; LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結

源碼分析

為了解決高并發下多線程對一個變量CAS争奪失敗後進行自旋而造成的降低并發性能問題,LongAdder在内部維護多個Cell元素(一個動态的Cell數組)來分擔對單個變量進行争奪的開銷。

先來思考幾個問題

(1)LongAdder的結構是怎樣的?

(2)目前線程應該通路Cell數組裡面的哪一個Cell元素?

(3)如何初始化Cell數組?

(4)Cell數組如何擴容?

(5)線程通路配置設定的Cell元素有沖突後如何處理?

(6)如何保證線程操作被配置設定的Cell元素的原子性?

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder &amp; LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結

LongAdder類繼承自Striped64類,在Striped64内部維護着三個變量。

LongAdder的真實值其實是base的值與Cell數組裡面所有Cell元素中的value值的累加,base是個基礎值,預設為0。

cellsBusy用來實作自旋鎖,狀态值隻有0和1,當建立Cell元素,擴容Cell數組或者初始化Cell數組時,使用CAS操作該變量來保證同時隻有一個線程可以進行其中之一的操作。

/**
     * Padded variant of AtomicLong supporting only raw accesses plus CAS.
     *
     * JVM intrinsics note: It would be possible to use a release-only
     * form of CAS here, if it were provided.
     */
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }           

複制

  • Cell的構造很簡單,其内部維護一個被聲明為volatile的變量,這裡聲明為volatile是因為線程操作value變量時沒有使用鎖,為了保證變量的記憶體可見性這裡将其聲明為volatile的。
  • cas函數通過CAS操作,保證了目前線程更新時被配置設定的Cell元素中value值的原子性。
  • Cell類使用@sun.misc.Contended修飾是為了避免僞共享。

到這裡我們回答了問題1和問題6。

重要的方法

long sum()

傳回目前的值,内部操作是累加所有Cell内部的value值後再累加base。

/**
     * Returns the current sum.  The returned value is NOT an
     * atomic snapshot; invocation in the absence of concurrent
     * updates returns an accurate result, but concurrent updates that
     * occur while the sum is being calculated might not be
     * incorporated.
     *
     * @return the sum
     */
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }           

複制

由于計算總和時沒有對Cell數組進行加鎖,是以在累加過程中可能有其他線程對Cell中的值進行了修改,也有可能對數組進行了擴容,是以sum傳回的值并不是非常精确的,其傳回值并不是一個調用sum方法時的原子快照值。

reset

重置操作

/**
     * Resets variables maintaining the sum to zero.  This method may
     * be a useful alternative to creating a new adder, but is only
     * effective if there are no concurrent updates.  Because this
     * method is intrinsically racy, it should only be used when it is
     * known that no threads are concurrently updating.
     */
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }           

複制

base置為0,如果Cell數組有元素,則元素值被重置為0

sumThenReset

sum的改造版本

/**
     * Equivalent in effect to {@link #sum} followed by {@link
     * #reset}. This method may apply for example during quiescent
     * points between multithreaded computations.  If there are
     * updates concurrent with this method, the returned value is
     * not guaranteed to be the final value occurring before
     * the reset.
     *
     * @return the sum
     */
    public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }           

複制

在使用sum累加對應的Cell值後,把目前Cell的值重置為0, base重置為0。這樣,當多線程調用該方法時會有問題,比如考慮第一個調用線程清空Cell的值,則後一個線程調用時累加的都是0值。

longValue()

等價于sum

/**
     * Equivalent to {@link #sum}.
     *
     * @return the sum
     */
    public long longValue() {
        return sum();
    }           

複制

add(long x)

/**
     * Adds the given value.
     *
     * @param x the value to add
     */
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
      
        if ((as = cells) != null || !casBase(b = base, b + x)) {   // 1 
            boolean uncontended = true;
            
            if (as == null || (m = as.length - 1) < 0 ||  // 2 
                (a = as[getProbe() & m]) == null ||    // 3
                !(uncontended = a.cas(v = a.value, v + x)))  // 4 
                longAccumulate(x, null, uncontended); // 5
        }
    }


  /**
     * CASes the base field.
     */
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }           

複制

  • 代碼(1)首先看cells是否為null,如果為null則目前在基礎變量base上進行累加,這時候就類似AtomicLong的操作。 如果cells不為null或者線程執行代碼(1)的CAS操作失敗了,則會去執行代碼(2)。
  • 代碼(2)(3)決定目前線程應該通路cells數組裡面的哪一個Cell元素,如果目前線程映射的元素存在則執行代碼(4),使用CAS操作去更新配置設定的Cell元素的value值,如果目前線程映射的元素不存在或者存在但是CAS操作失敗則執行代碼(5)。

其實将代碼(2)(3)(4)合起來看就是擷取目前線程應該通路的cells數組的Cell元素,然後進行CAS更新操作,隻是在擷取期間如果有些條件不滿足則會跳轉到代碼(5)執行。

另外目前線程應該通路cells數組的哪一個Cell元素是通過

getProbe() & m

進行計算的,其中m是目前cells數組元素個數-1,

getProbe()

則用于擷取目前線程中變量

threadLocalRandomProbe

的值,這個值一開始為0,在代碼(5)裡面會對其進行初始化。并且目前線程通過配置設定的Cell元素的cas函數來保證對Cell元素value值更新的原子性,到這裡我們回答了問題2和問題6。

longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)

cells數組被初始化和擴容的地方

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //(6) 初始化目前線程的變量threadLocalRandomProbe的值
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); //
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;
        for (; ; ) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) ! = null && (n = as.length) > 0) {//(7)
                if ((a = as[(n -1) & h]) == null) {//(8)
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) ! = null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m -1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (! wasUncontended)       // CAS already known to fail
                    wasUncontended = true;
                //目前Cell存在,則執行CAS設定(9)
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                              fn.applyAsLong(v, x))))
                    break;
                //目前Cell數組元素個數大于CPU個數(10)
                else if (n >= NCPU || cells ! = as)
                    collide = false;            // At max size or stale
                //是否有沖突(11)
                else if (! collide)
                    collide = true;
                //如果目前元素個數沒有達到CPU個數并且有沖突則擴容(12)
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            //12.1
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        //12.2
                        cellsBusy = 0;
 
                    }
                    //12.3
                    collide = false;
                    continue; // Retry with expanded table
                }
                //(13)為了能夠找到一個空閑的Cell,重新計算hash值,xorshift算法生成随機數
                h = advanceProbe(h);
            }
            //初始化Cell數組(14)
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {
                    if (cells == as) {
                        //14.1
                        Cell[] rs = new Cell[2];
                        //14.2
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //14.3
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }           

複制

上面代碼比較複雜,這裡我們主要關注問題3、問題4和問題5。

當每個線程第一次執行到代碼(6)時,會初始化目前線程變量

threadLocalRandomProbe

的值,上面也說了,這個變量在計算目前線程應該被配置設定到cells數組的哪一個Cell元素時會用到。

cells數組的初始化是在代碼(14)中進行的,其中cellsBusy是一個标示,為0說明目前cells數組沒有在被初始化或者擴容,也沒有在建立Cell元素,為1則說明cells數組在被初始化或者擴容,或者目前在建立新的Cell元素、通過CAS操作來進行0或1狀态的切換,這裡使用casCellsBusy函數。假設目前線程通過CAS設定cellsBusy為1,則目前線程開始初始化操作,那麼這時候其他線程就不能進行擴容了。

如代碼(14.1)初始化cells數組元素個數為2,然後使用h&1計算目前線程應該通路celll數組的哪個位置,也就是使用目前線程的threadLocalRandomProbe變量值&(cells數組元素個數-1),然後标示cells數組已經被初始化,最後代碼(14.3)重置了cellsBusy标記。

顯然這裡沒有使用CAS操作,卻是線程安全的,原因是cellsBusy是volatile類型的,這保證了變量的記憶體可見性,另外此時其他地方的代碼沒有機會修改cellsBusy的值。在這裡初始化的cells數組裡面的兩個元素的值目前還是null。這裡回答了問題3,知道了cells數組如何被初始化。

cells數組的擴容是在代碼(12)中進行的,對cells擴容是有條件的,也就是代碼(10)(11)的條件都不滿足的時候。具體就是目前cells的元素個數小于目前機器CPU個數并且目前多個線程通路了cells中同一個元素,進而導緻沖突使其中一個線程CAS失敗時才會進行擴容操作。

這裡為何要涉及CPU個數呢?隻有當每個CPU都運作一個線程時才會使多線程的效果最佳,也就是當cells數組元素個數與CPU個數一緻時,每個Cell都使用一個CPU進行處理,這時性能才是最佳的。

代碼(12)中的擴容操作也是先通過CAS設定cellsBusy為1,然後才能進行擴容。假設CAS成功則執行代碼(12.1)将容量擴充為之前的2倍,并複制Cell元素到擴容後數組。

另外,擴容後cells數組裡面除了包含複制過來的元素外,還包含其他新元素,這些元素的值目前還是null。這裡回答了問題4。

在代碼(7)(8)中,目前線程調用add方法并根據目前線程的随機數

threadLocalRandomProbe

和cells元素個數計算要通路的Cell元素下标,然後如果發現對應下标元素的值為null,則新增一個Cell元素到cells數組,并且在将其添加到cells數組之前要競争設定cellsBusy為1。

代碼(13)對CAS失敗的線程重新計算目前線程的随機值threadLocalRandomProbe,以減少下次通路cells元素時的沖突機會。這裡回答了問題5。

LongAdder 小結

JDK 8中新增的LongAdder原子性操作類,該類通過内部cells數組分擔了高并發下多線程同時對一個原子變量進行更新時的競争量,讓多個線程可以同時對cells數組裡面的元素進行并行的更新操作。

另外,數組元素Cell使用@sun.misc.Contended注解進行修飾,這避免了cells數組内多個原子變量被放入同一個緩存行,也就是避免了僞共享,這對性能也是一個提升。

LongAccumulator

概述

LongAdder類是LongAccumulator的一個特例, LongAccumulator比LongAdder的功能更強大。

例如下面的構造函數,其中accumulatorFunction是一個雙目運算器接口,其根據輸入的兩個參數傳回一個計算值,identity則是LongAccumulator累加器的初始值。

/**
     * Creates a new instance using the given accumulator function
     * and identity element.
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @param identity identity (initial value) for the accumulator function
     */
    public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }           

複制

看看 LongBinaryOperator

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder &amp; LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結

LongAccumulator相比于LongAdder,可以為累加器提供非0的初始值,後者隻能提供預設的0值。另外,前者還可以指定累加規則,比如不進行累加而進行相乘,隻需要在構造LongAccumulator時傳入自定義的雙目運算器即可,後者則内置累加的規則。

LongAdder#add vs LongAccumulator#accumulate

LongAdder#add方法

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder &amp; LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結

LongAccumulator#accumulate方法

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder &amp; LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結

LongAccumulator相比于LongAdder的不同在于,在調用casBase時後者傳遞的是

b+x

,前者則使用了

r = function.applyAsLong(b = base, x)

來計算。

另外,前者在調用longAccumulate時傳遞的是function,而後者是null。從下面的代碼可知,當fn為null時就使用v+x加法運算,這時候就等價于LongAdder,當fn不為null時則使用傳遞的fn函數計算。

Java Review - 并發程式設計_JDK 8新增的原子操作類LongAdder &amp; LongAccumulator概述小Demo源碼分析重要的方法LongAdder 小結LongAccumulator原子類總結

LongAdder類是LongAccumulator的一個特例,隻是後者提供了更加強大的功能,可以讓使用者自定義累加規則。

小Demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.LongBinaryOperator;
import java.util.stream.IntStream;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/1 0:02
 * @mark: show me the code , change the world
 */
public class LongAccumulator1 {

    public static void main(String[] args) {
        testAccumulate();
    }

    private static void testAccumulate() {
        LongBinaryOperator op = (x, y) -> 2 * x + y;
        LongAccumulator accumulator = new LongAccumulator(op, 1L);

        ExecutorService executor = Executors.newFixedThreadPool(2);

        IntStream.range(0, 100)
                .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));



        System.out.format("Add: %d\n", accumulator.getThenReset());

        executor.shutdown();
    }
}           

複制

原子類總結

并發包中的原子性操作類都是使用非阻塞算法CAS實作的,這相比使用鎖實作原子性操作在性能上有很大提高。

梳理了AtomicLong類的實作原理,然後JDK 8中新增的LongAdder類和LongAccumulator類的原理。