天天看點

深讀源碼-java并發包之LongAdder源碼分析

問題

(1)java8中為什麼要新增LongAdder?

(2)LongAdder的實作方式?

(3)LongAdder與AtomicLong的對比?

簡介

LongAdder是java8中新增的原子類,在多線程環境中,它比AtomicLong性能要高出不少,特别是寫多的場景。

它是怎麼實作的呢?讓我們一起來學習吧。

原理

LongAdder的原理是,在最初無競争時,隻更新base的值,當有多線程競争時通過分段的思想,讓不同的線程更新不同的段,最後把這些段相加就得到了完整的LongAdder存儲的值。

深讀源碼-java并發包之LongAdder源碼分析

源碼分析

LongAdder繼承自Striped64抽象類,Striped64中定義了Cell内部類和各重要屬性。

主要内部類

// Striped64中的内部類,使用@sun.misc.Contended注解,說明裡面的值消除僞共享
@sun.misc.Contended static final class Cell {
        // 存儲元素的值,使用volatile修飾保證可見性
        volatile long value;
        Cell(long x) { value = x; }
        // CAS更新value的值
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe執行個體
        private static final sun.misc.Unsafe UNSAFE;
        // value字段的偏移量
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
           

Cell類使用@sun.misc.Contended注解,說明是要避免僞共享的。

使用Unsafe的CAS更新value的值,其中value的值使用volatile修飾,保證可見性。

關于Unsafe的介紹請檢視《深讀源碼-java魔法類之Unsafe解析》。

關于僞共享的介紹請檢視《什麼是僞共享》。

主要屬性

// 這三個屬性都在Striped64中
    // cells數組,存儲各個段的值
    transient volatile Cell[] cells;

    // 最初無競争時使用的,也算一個特殊的段
    transient volatile long base;

    // 标記目前是否有線程在建立或擴容cells,或者在建立Cell
    // 通過CAS更新該值,相當于是一個鎖
    transient volatile int cellsBusy;
           

最初無競争或有其它線程在建立cells數組時使用base更新值,有過競争時使用cells更新值。

最初無競争是指一開始沒有線程之間的競争,但也有可能是多線程在操作,隻是這些線程沒有同時去更新base的值。

有過競争是指隻要出現過競争不管後面有沒有競争都使用cells更新值,規則是不同的線程hash到不同的cell上去更新,減少競争。

add(x)方法

add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中存儲的值增加x,x可為正可為負。

public void add(long x) {
        // as是Striped64中的cells屬性
        // b是Striped64中的base屬性
        // v是目前線程hash到的Cell中存儲的值
        // m是cells的長度減1,hash時作為掩碼使用
        // a是目前線程hash到的Cell
        Cell[] as; long b, v; int m; Cell a;
        // 條件1:cells不為空,說明出現過競争,cells已經建立
        // 條件2:cas操作base失敗,說明其它線程先一步修改了base,正在出現競争
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            // true表示目前競争還不激烈
            // false表示競争激烈,多個線程hash到同一個Cell,可能要擴容
            boolean uncontended = true;
            // 條件1:cells為空,說明正在出現競争,上面是從條件2過來的
            // 條件2:應該不會出現
            // 條件3:目前線程所在的Cell為空,說明目前線程還沒有更新過Cell,應初始化一個Cell
            // 條件4:更新目前線程所在的Cell失敗,說明現在競争很激烈,多個線程hash到了同一個Cell,應擴容
            if (as == null || (m = as.length - 1) < 0 ||
                // getProbe()方法傳回的是線程中的threadLocalRandomProbe字段
                // 它是通過随機數生成的一個值,對于一個确定的線程這個值是固定的
                // 除非刻意修改它
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                // 調用Striped64中的方法處理
                longAccumulate(x, null, uncontended);
        }
    }
           

(1)最初無競争時隻更新base;

(2)直到更新base失敗時,建立cells數組;

(3)當多個線程競争同一個Cell比較激烈時,可能要擴容;

longAccumulate()方法

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        // 存儲線程的probe值
        int h;
        // 如果getProbe()方法傳回0,說明随機數未初始化
        if ((h = getProbe()) == 0) {
            // 強制初始化
            ThreadLocalRandom.current(); // force initialization
            // 重新擷取probe值
            h = getProbe();
            // 都未初始化,肯定還不存在競争激烈
            wasUncontended = true;
        }
        // 是否發生碰撞
        boolean collide = false;
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            // cells已經初始化過
            if ((as = cells) != null && (n = as.length) > 0) {
                // 目前線程所在的Cell未初始化
                if ((a = as[(n - 1) & h]) == null) {
                    // 目前無其它線程在建立或擴容cells,也沒有線程在建立Cell
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        // 建立一個Cell,值為目前需要增加的值
                        Cell r = new Cell(x);   // Optimistically create
                        // 再次檢測cellsBusy,并嘗試更新它為1
                        // 相當于目前線程加鎖
                        if (cellsBusy == 0 && casCellsBusy()) {
                            // 是否建立成功
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                // 重新擷取cells,并找到目前線程hash到cells數組中的位置
                                // 這裡一定要重新擷取cells,因為as并不在鎖定範圍内
                                // 有可能已經擴容了,這裡要重新擷取
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    // 把上面建立的Cell放在cells的j位置處
                                    rs[j] = r;
                                    // 建立成功
                                    created = true;
                                }
                            } finally {
                                // 相當于釋放鎖
                                cellsBusy = 0;
                            }
                            // 建立成功了就傳回
                            // 值已經放在建立的Cell裡面了
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    // 标記目前未出現沖突
                    collide = false;
                }
                // 目前線程所在的Cell不為空,且更新失敗了
                // 這裡簡單地設為true,相當于簡單地自旋一次
                // 通過下面的語句修改線程的probe再重新嘗試
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 再次嘗試CAS更新目前線程所在Cell的值,如果成功了就傳回
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                // 如果cells數組的長度達到了CPU核心數,或者cells擴容了
                // 設定collide為false并通過下面的語句修改線程的probe再重新嘗試
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                // 上上個elseif都更新失敗了,且上個條件不成立,說明出現沖突了
                else if (!collide)
                    collide = true;
                // 明确出現沖突了,嘗試占有鎖,并擴容
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        // 檢查是否有其它線程已經擴容過了
                        if (cells == as) {      // Expand table unless stale
                            // 新數組為原數組的兩倍
                            Cell[] rs = new Cell[n << 1];
                            // 把舊數組元素拷貝到新數組中
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            // 重新指派cells為新數組
                            cells = rs;
                        }
                    } finally {
                        // 釋放鎖
                        cellsBusy = 0;
                    }
                    // 已解決沖突
                    collide = false;
                    // 使用擴容後的新數組重新嘗試
                    continue;                   // Retry with expanded table
                }
                // 更新失敗或者達到了CPU核心數,重新生成probe,并重試
                h = advanceProbe(h);
            }
            // 未初始化過cells數組,嘗試占有鎖并初始化cells數組
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                // 是否初始化成功
                boolean init = false;
                try {                           // Initialize table
                    // 檢測是否有其它線程初始化過
                    if (cells == as) {
                        // 建立一個大小為2的Cell數組
                        Cell[] rs = new Cell[2];
                        // 找到目前線程hash到數組中的位置并建立其對應的Cell
                        rs[h & 1] = new Cell(x);
                        // 指派給cells數組
                        cells = rs;
                        // 初始化成功
                        init = true;
                    }
                } finally {
                    // 釋放鎖
                    cellsBusy = 0;
                }
                // 初始化成功直接傳回
                // 因為增加的值已經同時建立到Cell中了
                if (init)
                    break;
            }
            // 如果有其它線程在初始化cells數組中,就嘗試更新base
            // 如果成功了就傳回
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
           

(1)如果cells數組未初始化,目前線程會嘗試占有cellsBusy鎖并建立cells數組;

(2)如果目前線程嘗試建立cells數組時,發現有其它線程已經在建立了,就嘗試更新base,如果成功就傳回;

(3)通過線程的probe值找到目前線程應該更新cells數組中的哪個Cell;

(4)如果目前線程所在的Cell未初始化,就占有cellsBusy鎖并在相應的位置建立一個Cell;

(5)嘗試CAS更新目前線程所在的Cell,如果成功就傳回,如果失敗說明出現沖突;

(5)目前線程更新Cell失敗後并不是立即擴容,而是嘗試更新probe值後再重試一次;

(6)如果在重試的時候還是更新失敗,就擴容;

(7)擴容時目前線程占有cellsBusy鎖,并把數組容量擴大到兩倍,再遷移原cells數組中元素到新數組中;

(8)cellsBusy在建立cells數組、建立Cell、擴容cells數組三個地方用到;

sum()方法

sum()方法是擷取LongAdder中真正存儲的值的大小,通過把base和所有段相加得到。

public long sum() {
        Cell[] as = cells; Cell a;
        // sum初始等于base
        long sum = base;
        // 如果cells不為空
        if (as != null) {
            // 周遊所有的Cell
            for (int i = 0; i < as.length; ++i) {
                // 如果所在的Cell不為空,就把它的value累加到sum中
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        // 傳回sum
        return sum;
    }
           

可以看到sum()方法是把base和所有段的值相加得到,那麼,這裡有一個問題,如果前面已經累加到sum上的Cell的value有修改,不是就沒法計算到了麼?

答案确實如此,是以LongAdder可以說不是強一緻性的,它是最終一緻性的。

LongAdder VS AtomicLong

直接上代碼:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author suidd
 * @name LongAdderTest
 * @description LongAdder是java8中新增的原子類,在多線程環境中,它比AtomicLong性能要高出不少,特别是寫多的場景
 * @date 2020/5/21 11:41
 * Version 1.0
 **/
public class LongAdderTest {
    public static void main(String[] args) {
        testAtomicLongVSLongAdder(1, 10000000);
        testAtomicLongVSLongAdder(10, 10000000);
        testAtomicLongVSLongAdder(20, 10000000);
        testAtomicLongVSLongAdder(40, 10000000);
        testAtomicLongVSLongAdder(80, 10000000);
    }

    /**
     * @param
     * @return change notes
     * @author suidd
     * @description //AtomicLong和LongAdder 性能比較
     * @date 2020/5/21 15:33
     **/
    static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
        System.out.println(String.format("threadCount:%s,times:%s", threadCount, times));
        try {
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);//LongAdder
            System.out.println(String.format("LongAdder elapse:%s", (System.currentTimeMillis() - start)));

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);//AtomicLong
            System.out.println(String.format("AtomicLong elapse:%s", (System.currentTimeMillis() - start2)));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * @param
     * @return change notes
     * @author suidd
     * @description //使用AtomicLong實作自增
     * @date 2020/5/21 15:24
     **/
    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        AtomicLong atomicLong = new AtomicLong();
        List<Thread> list = new ArrayList<>(threadCount);
        //封裝threadcount個線程
        for (int i = 0; i < threadCount; i++) {
            list.add(new Thread(() -> {
                for (int j = 0; j < times; j++) {
                    //根據times,遞增AtomicLong
                    atomicLong.incrementAndGet();
                }
            }));
        }

        //啟動線程
        for (int i = 0; i < threadCount; i++) {
            list.get(i).start();
        }

        //等待所有線程執行完畢
        for (int i = 0; i < threadCount; i++) {
            list.get(i).join();
        }
    }

    /**
     * @param
     * @return change notes
     * @author suidd
     * @description //使用LongAdder實作自增
     * @date 2020/5/21 15:24
     **/
    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        LongAdder longAdder = new LongAdder();
        List<Thread> list = new ArrayList<>(threadCount);
        //封裝threadcount個線程
        for (int i = 0; i < threadCount; i++) {
            list.add(new Thread(() -> {
                for (int j = 0; j < times; j++) {
                    //根據times,遞增AtomicLong
                    longAdder.add(1);
                }
            }));
        }

        //啟動線程
        for (int i = 0; i < threadCount; i++) {
            list.get(i).start();
        }

        //等待所有線程執行完畢
        for (int i = 0; i < threadCount; i++) {
            list.get(i).join();
        }
    }
}
           

運作結果如下:

threadCount:1,times:10000000
LongAdder elapse:178
AtomicLong elapse:60
threadCount:10,times:10000000
LongAdder elapse:205
AtomicLong elapse:2246
threadCount:20,times:10000000
LongAdder elapse:416
AtomicLong elapse:3728
threadCount:40,times:10000000
LongAdder elapse:834
AtomicLong elapse:8807
threadCount:80,times:10000000
LongAdder elapse:1378
AtomicLong elapse:15257
           

可以看到當隻有一個線程的時候,AtomicLong反而性能更高,随着線程越來越多,AtomicLong的性能急劇下降,而LongAdder的性能影響很小。

總結

(1)LongAdder通過base和cells數組來存儲值;

(2)不同的線程會hash到不同的cell上去更新,減少了競争;

(3)LongAdder的性能非常高,最終會達到一種無競争的狀态;

彩蛋

在longAccumulate()方法中有個條件是 

n>=NCPU

就不會走到擴容邏輯了,而n是2的倍數,那是不是代表cells數組最大隻能達到大于等于NCPU的最小2次方?

答案是明确的。因為同一個CPU核心同時隻會運作一個線程,而更新失敗了說明有兩個不同的核心更新了同一個Cell,這時會重新設定更新失敗的那個線程的probe值,這樣下一次它所在的Cell很大機率會發生改變,如果運作的時間足夠長,最終會出現同一個核心的所有線程都會hash到同一個Cell(大機率,但不一定全在一個Cell上)上去更新,是以,這裡cells數組中長度并不需要太長,達到CPU核心數足夠了。

比如,筆者的電腦是8核的,是以這裡cells的數組最大隻會到8,達到8就不會擴容了。

深讀源碼-java并發包之LongAdder源碼分析

原文連結:https://mp.weixin.qq.com/s?__biz=Mzg2ODA0ODM0Nw==&mid=2247483892&idx=1&sn=2a19a6c714e987fc0ed30ab7407e23ef&scene=21#wechat_redirect