天天看點

【多線程】LongAdder實作原理

前言

AtomicInteger、AtomicLong使用非阻塞的CAS算法原子性地更新某一個變量,比synchronized這些阻塞算法擁有更好的性能,但是在高并發情況下,大量線程同時去更新一個變量,由于同一時間隻有一個線程能夠成功,絕大部分的線程在嘗試更新失敗後,會通過自旋的方式再次進行嘗試,嚴重占用了CPU的時間片。

AtomicLong的實作原理圖:

【多線程】LongAdder實作原理

LongAdder是JDK8新增的原子操作類,它提供了一種新的思路,既然AtomicLong的性能瓶頸是由于大量線程同時更新一個變量造成的,那麼能不能把這個變量拆分出來,變成多個變量,然後讓線程去競争這些變量,最後合并即可?LongAdder的設計精髓就在這裡,通過将變量拆分成多個元素,降低該變量的并發度,最後進行合并元素,變相的減少了CAS的失敗次數。

LongAdder的實作原理圖:

【多線程】LongAdder實作原理

常用方法

public class LongAdder extends Striped64 implements Serializable {
    //構造方法
    public LongAdder() {
    }
    //加1操作
    public void increment();
    //減1操作
    public void decrement();
    //擷取原子變量的值
    public long longValue();

}      

下面給出一個簡單的例子,模拟50線程同時進行更新

package com.xue.testLongAdder;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;

public class Main {
    public static void main(String[] args) {
        LongAdder adder = new LongAdder();
        ExecutorService threadPool = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 50; i++) {
            Runnable r = () -> {
                adder.add(1);
            };
            threadPool.execute(r);
        }
        threadPool.shutdown();
        //若關閉線程池後,所有任務執行完畢,則isTerminated()傳回true
        while (!threadPool.isTerminated()) {
            System.out.println(adder.longValue());
            break;
        }
    }
}      

輸出結果是50

其中,如果對線程池不熟悉的同學,可以先參考我的另外一篇文章​​說說線程池​​

原了解析

類圖

【多線程】LongAdder實作原理

LongAdder内部維護了一個Cell類型的數組,其中Cell是Striped64中的一個靜态内部類。

Cell類

abstract class Striped64 extends Number {
  
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    }

}      

Cell用來封裝被拆分出來的元素,内部用一個value字段儲存目前元素的值,等到需要合并時,則累加所有Cell數組中的value。Cell内部使用CAS操作來更新value值,對CAS操作不熟悉的同學,可以參考我的另外一篇文章​​淺探CAS實作原理​​

可以注意到,Cell類被 @sun.misc.Contended注解修飾了,這個注解是為了解決僞共享問題的,什麼是僞共享?

一個緩存行可以存儲多個變量(存滿目前緩存行的位元組數);而CPU對緩存的修改又是以緩存行為最小機關的,在多線程情況下,如果需要修改“共享同一個緩存行的變量”,就會無意中影響彼此的性能,這就是​

​僞共享(False Sharing)。​

對僞共享還不了解的同學,可以參考這位大佬的文章​​僞共享(False Sharing)底層原理及其解決方式​​

而LongAdder采用的是Cell數組,而數組元素是連續的,是以多個Cell對象共享一個緩存行的情況非常普遍,是以這裡@sun.misc.Contended注解對單個Cell元素進行位元組填充,確定一個Cell對象占據一個緩存行,即填充至64位元組。

關于如何确定一個對象的大小,可以參考我的另外一篇文章​​對象的記憶體布局,怎樣确定對象的大小​​,這樣可以算出來,還需要填充多少位元組。

longValue()

longValue()傳回累加後的值

public long longValue() {
        return sum();
    }

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        //當Cell數組不為null時,進行累加後傳回,否則直接傳回基準數base
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }      

這可能是LongAdder中最簡單的方法了,就不進行贅述了。什麼,你要看複雜的?好的,這就來了。

increment()

public void increment() {
          add(1L);
       }

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        /**
         * 如果一下兩種條件則繼續執行if内的語句
         * 1. cells數組不為null(不存在争用的時候,cells數組一定為null,一旦對base的cas操作失敗,
         * 才會初始化cells數組)
         * 2. 如果cells數組為null,如果casBase執行成功,則直接傳回,如果casBase方法執行失敗
         * (casBase失敗,說明第一次争用沖突産生,需要對cells數組初始化)進入if内;
         * casBase方法很簡單,就是通過UNSAFE類的cas設定成員變量base的值為base+要累加的值
         * casBase執行成功的前提是無競争,這時候cells數組還沒有用到為null,可見在無競争的情況下是
         * 類似于AtomticInteger處理方式,使用cas做累加。
         */
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //uncontended判斷cells數組中,目前線程要做cas累加操作的某個元素是否#不#存在争用,
            //如果cas失敗則存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。
            boolean uncontended = true;
            /**
             *1. as == null : cells數組未被初始化,成立則直接進入if執行cell初始化
             *2. (m = as.length - 1) < 0: cells數組的長度為0
             *條件1與2都代表cells數組沒有被初始化成功,初始化成功的cells數組長度為2;
             *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的長度不為0,
             * 則通過getProbe方法擷取目前線程Thread的threadLocalRandomProbe變量的值,初始為0,
             * 然後執行threadLocalRandomProbe&(cells.length-1 ),相當于m%cells.length;
             * 如果cells[threadLocalRandomProbe%cells.length]的位置為null,
             * 這說明這個位置從來沒有線程做過累加,
             * 需要進入if繼續執行,在這個位置建立一個新的Cell對象;
             *4. !(uncontended = a.cas(v = a.value, v + x)):
             * 嘗試對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value值做累加操作,
             * 并傳回操作結果,如果失敗了則進入if,重新計算一個threadLocalRandomProbe;
             如果進入if語句執行longAccumulate方法,有三種情況
             1. 前兩個條件代表cells沒有初始化,
             2. 第三個條件指目前線程hash到的cells數組中的位置還沒有其它線程做過累加操作,
             3. 第四個條件代表産生了沖突,uncontended=false
             **/
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }      

其中longAccumulate()的解析如下:

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        //擷取目前線程的threadLocalRandomProbe值作為hash值,如果目前線程的threadLocalRandomProbe為0,
        // 說明目前線程是第一次進入該方法,則強制設定線程的threadLocalRandomProbe為ThreadLocalRandom類的成員
        // 靜态私有變量probeGenerator的值,後面會詳細将hash值的生成;
        //另外需要注意,如果threadLocalRandomProbe=0,代表新的線程開始參與cell争用的情況
        //1.目前線程之前還沒有參與過cells争用(也許cells數組還沒初始化,進到目前方法來就是為了初始化cells數組
        //後争用的),
        // 是第一次執行base的cas累加操作失敗;
        //2.或者是在執行add方法時,對cells某個位置的Cell的cas操作第一次失敗,則将wasUncontended設定為false,
        // 那麼這裡會将其重新置為true;第一次執行操作失敗;
        //凡是參與了cell争用操作的線程threadLocalRandomProbe都不為0;
        int h;
        if ((h = getProbe()) == 0) {
            //初始化ThreadLocalRandom;
            ThreadLocalRandom.current(); // force initialization
            //将h設定為0x9e3779b9
            h = getProbe();
            //設定未競争标記為true
            wasUncontended = true;
        }
        //cas沖突标志,表示目前線程hash到的Cells數組的位置,做cas累加操作時與其它線程發生了沖突,cas失敗;
        // collide=true代表有沖突,collide=false代表無沖突
        boolean collide = false;
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //這個主幹if有三個分支
            //1.主分支一:處理cells數組已經正常初始化了的情況(這個if分支處理add方法的四個條件中的3和4)
            //2.主分支二:處理cells數組沒有初始化或者長度為0的情況;(這個分支處理add方法的四個條件中的1和2)
            //3.主分支三:處理如果cell數組沒有初始化,并且其它線程正在執行對cells數組初始化的操作,
            // 及cellbusy=1;
            // 則嘗試将累加值通過cas累加到base上
            //先看主分支一
            if ((as = cells) != null && (n = as.length) > 0) {
                /**
                 *内部小分支一:這個是處理add方法内部if分支的條件3:如果被hash到的位置為null,
                 * 說明沒有線程在這個位置設定過值,
                 * 沒有競争,可以直接使用,則用x值作為初始值建立一個新的Cell對象,
                 * 對cells數組使用cellsBusy加鎖,
                 * 然後将這個Cell對象放到cells[m%cells.length]位置上
                 */
                if ((a = as[(n - 1) & h]) == null) {
                    //cellsBusy == 0 代表目前沒有線程cells數組做修改
                    if (cellsBusy == 0) {
                        //将要累加的x值作為初始值建立一個新的Cell對象,
                        Cell r = new Cell(x);
                        //如果cellsBusy=0無鎖,則通過cas将cellsBusy設定為1加鎖
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //标記Cell是否建立成功并放入到cells數組被hash的位置上
                            boolean created = false;
                            try {
                                Cell[] rs; int m, j;
                                //再次檢查cells數組不為null,且長度不為空,且hash到的位置的Cell為null
                                if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                    //将新的cell設定到該位置
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //去掉鎖
                                cellsBusy = 0;
                            }
                            //生成成功,跳出循環
                            if (created)
                                break;
                            //如果created為false,說明上面指定的cells數組的位置cells[m%cells.length]
                            // 已經有其它線程設定了cell了,
                            // 繼續執行循環。
                            continue;
                        }
                    }
                    //如果執行的目前行,代表cellsBusy=1,有線程正在更改cells數組,代表産生了沖突,将collide設定為false
                    collide = false;

                    /**
                     *内部小分支二:如果add方法中條件4的通過cas設定cells[m%cells.length]位置的Cell對象中的
                     * value值設定為v+x失敗,
                     * 說明已經發生競争,将wasUncontended設定為true,跳出内部的if判斷,
                     * 最後重新計算一個新的probe,然後重新執行循環;
                     */
                } else if (!wasUncontended)
                    //設定未競争标志位true,繼續執行,後面會算一個新的probe值,然後重新執行循環。
                    wasUncontended = true;
                /**
                 *内部小分支三:新的争用線程參與争用的情況:處理剛進入目前方法時threadLocalRandomProbe=0的情況,
                 * 也就是目前線程第一次參與cell争用的cas失敗,這裡會嘗試将x值加到cells[m%cells.length]
                 * 的value ,如果成功直接退出
                 */
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                        fn.applyAsLong(v, x))))
                    break;
                /**
                 *内部小分支四:分支3處理新的線程争用執行失敗了,這時如果cells數組的長度已經到了最大值
                 * (大于等于cup數量),
                 * 或者是目前cells已經做了擴容,則将collide設定為false,後面重新計算prob的值*/
                else if (n >= NCPU || cells != as)
                    collide = false;
                /**
                 *内部小分支五:如果發生了沖突collide=false,則設定其為true;會在最後重新計算hash值後,
                 * 進入下一次for循環
                 */
                else if (!collide)
                    //設定沖突标志,表示發生了沖突,需要再次生成hash,重試。
                    // 如果下次重試任然走到了改分支此時collide=true,!collide條件不成立,則走後一個分支
                    collide = true;
                /**
                 *内部小分支六:擴容cells數組,新參與cell争用的線程兩次均失敗,且符合庫容條件,會執行該分支
                 */
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //檢查cells是否已經被擴容
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //為目前線程重新計算hash值
                h = advanceProbe(h);

                //這個大的分支處理add方法中的條件1與條件2成立的情況,如果cell表還未初始化或者長度為0,
                // 先嘗試擷取cellsBusy鎖。
            }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {  // Initialize table
                    //初始化cells數組,初始容量為2,并将x值通過hash&1,放到0個或第1個位置上
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //解鎖
                    cellsBusy = 0;
                }
                //如果init為true說明初始化成功,跳出循環
                if (init)
                    break;
            }
            /**
             *如果以上操作都失敗了,則嘗試将值累加到base上;
             */
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) 
                // Fall back on using base
                break;
        }
    }      

以上2個方法的解析搬自于​​源碼閱讀:全方位講解LongAdder​​,此處對代碼做了微調,友善閱讀。

總結