天天看點

LongAdder源碼分析

AtomicLong是作用是對長整形進行原子操作,顯而易見,在java1.8中新加入了一個新的原子類LongAdder,該類也可以保證Long類型操作的原子性,相對于AtomicLong,LongAdder有着更高的性能和更好的表現,可以完全替代AtomicLong的來進行原子操作。

AtomicLong的代碼很簡單,下面僅以incrementAndGet()為例,對AtomicLong的原理進行說明。

incrementAndGet()源碼如下:

public final long incrementAndGet() {  
    for (;;) {  
        // 擷取AtomicLong目前對應的long值  
        long current = get();  
        // 将current加1  
        long next = current + 1;  
        // 通過CAS函數,更新current的值  
        if (compareAndSet(current, next))  
            return next;  
    }  
}

// value是AtomicLong對應的long值  
private volatile long value;  
// 傳回AtomicLong對應的long值  
public final long get() {  
    return value;  
}

public final boolean compareAndSet(long expect, long update) {  
return unsafe.compareAndSwapLong(this, valueOffset, expect, update); 
}

compareAndSet()的作用是更新AtomicLong對應的long值。它會比較AtomicLong的原始值是否與expect相等,若相等的話,則設定AtomicLong的值為update。      

 比AtomicLong更高效的LongAdder  

 AtomicLong的實作方式是内部有個value 變量,當多線程并發自增,自減時,均通過cas 指令從機器指令級别操作保證并發的原子性。

AtomicLong的實作方式是内部有個value 變量,當多線程并發自增,自減時,均通過CAS 指令從機器指令級别操作保證并發的原子性。

 LongAdder是jdk8新增的用于并發環境的計數器,目的是為了在高并發情況下,代替AtomicLong/AtomicInt,成為一個用于高并發情況下的高效的通用計數器。

高并發下計數,一般最先想到的應該是AtomicLong/AtomicInt,AtmoicXXX使用硬體級别的指令 CAS 來更新計數器的值,這樣可以避免加鎖,機器直接支援的指令,效率也很高。但是AtomicXXX中的 CAS 操作在出現線程競争時,失敗的線程會白白地循環一次,在并發很大的情況下,因為每次CAS都隻有一個線程能成功,競争失敗的線程會非常多。失敗次數越多,循環次數就越多,很多線程的CAS操作越來越接近 自旋鎖(spin lock)。計數操作本來是一個很簡單的操作,實際需要耗費的cpu時間應該是越少越好,AtomicXXX在高并發計數時,大量的cpu時間都浪費會在 自旋 上了,這很浪費,也降低了實際的計數效率。

// jdk1.8的AtomicLong的實作代碼,這段代碼在sun.misc.Unsafe中
// 當線程競争很激烈時,while判斷條件中的CAS會連續多次傳回false,這樣就會造成無用的循環,循環中讀取volatile變量的開銷本來就是比較高的
// 因為這樣,在高并發時,AtomicXXX并不是那麼理想的計數方式
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v + delta));
return v;
}      

說LongAdder比在高并發時比AtomicLong更高效,這麼說有什麼依據呢?LongAdder是根據ConcurrentHashMap這類為并發設計的類的基本原理——鎖分段,來實作的,它裡面維護一組按需配置設定的計數單元,并發計數時,不同的線程可以在不同的計數單元上進行計數,這樣減少了線程競争,提高了并發效率。本質上是用空間換時間的思想,不過在實際高并發情況中消耗的空間可以忽略不計。

現在,在處理高并發計數時,應該優先使用LongAdder,而不是繼續使用AtomicLong。當然,線程競争很低的情況下進行計數,使用Atomic還是更簡單更直接,并且效率稍微高一些。

LongAdder源碼分析
LongAdder源碼分析
LongAdder源碼分析
LongAdder源碼分析

既要看單線程的執行結果,還要看多線程對他的影響。

每次操作時候都要看局部變量是不是等于成員變量(判斷是否沒有别的線程幹擾),最後再把局部變量指派給成員變量完成修改。成員變量的修改都是CAS。 

LongAdder源碼分析

LongAdder源碼分析

LongAdder源碼分析
LongAdder源碼分析
LongAdder源碼分析
@SuppressWarnings("serial")
abstract class Striped641 extends Number1 {
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    transient volatile Cell[] cells;// cell數組,長度一樣要是2^n,可以類比為jdk1.7的ConcurrentHashMap中的segments數組
    // 累積器的基本值 ,沒有遇到并發的情況,直接使用base,速度更快;
    transient volatile long base;//cas更新
    // 自旋辨別,在對cells進行初始化,或者後續擴容時,需要通過CAS操作把此辨別設定為1(busy,忙辨別,相當于加鎖), 取消busy時可以直接使用cellsBusy = 0,相當于釋放鎖
    transient volatile int cellsBusy;//旋轉鎖
    Striped641() {
    }
    final boolean casBase(long cmp, long val) {//原子更新base
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
    // 使用CAS将cells自旋辨別更新為1,更新為0時可以不用CAS(指派為0肯定是隻有一個線程在指派為0),直接使用cellsBusy就行
    final boolean casCellsBusy() {//原子更新cellsBusy從0到1,以擷取鎖。
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

    // 下面這兩個方法是ThreadLocalRandom中的方法,不過因為包通路關系,這裡又重新寫一遍
     
    // probe翻譯過來是探測/探測器/探針這些,不好了解,它是ThreadLocalRandom裡面的一個屬性,
    // 不過并不影響對Striped64的了解,這裡可以把它了解為線程本身的hash值
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    // 相當于rehash,重新算一遍線程的hash值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13; // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);//CAS設定目前線程的threadLocalRandomProbe
        return probe;
    }

    //x:要增加的數。fn:執行函數。uncontended=false表示更新失敗了,=true表示沒有這個線程的Cell(不可能是更新成功了,更新成功就進不來這裡)。
    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {//開始分段更新:1.base更新失敗。2.前面分段更新不行或失敗。
        //建立,更新,擴容,初始化,更新base。
        
        int h;//線程hash值
        // 看下ThreadLocalRandom是否初始化。如果目前線程的threadLocalRandomProbe為0,說明目前線程是第一次進入該方法,
        if ((h = getProbe()) == 0) {// 目前線程hash值=0,
            ThreadLocalRandom.current(); //初始化目前線程的PROBE值不為0,
            h = getProbe();
            wasUncontended = true;//下面的cas語句走不走,還是間隔一個for循環在cas  uncontended=false代表存在争用,uncontended=true代表不存在争用。
        }
        
        boolean collide = false; //下面的擴容語句走不走,還是間隔一個for循環在擴容  。collide=true代表cas有沖突,collide=false代表cas無沖突 
        
        for (;;) {
            Cell[] as;//局部變量,線程執行時候,局部變量不會變,成員變量會改變(修改屬性位址不變,重新new位址才改變)。as一進來就指派後面沒有更改過。
            Cell a;//線程對應的cell,a一進來就指派後面沒有更改過。
            //cellsBusy沒有局部變量,直接使用成員變量。是一個鎖。
            int n;
            long v;
            
            //------------------------------------------重要---多線程時候,在一個線程的周期裡面,前一個指令的判斷(被另一個線程修改)現在不一定成立了-----------------------------------------------------//
            /*每次執行真正操作時候,有可能剛才判斷的條件全部不成立了,就要重來,那麼剛才判斷條件有什麼用:不沖突有用。
            執行成功時候:在鎖住期間,剛才進來的判斷都成立,近似于單線程操作,或者别的操作了,但是不影響現在要操作的條件。*/
            /*多線程同時判斷3個if,有可能一個線程判斷第一個if不成立,但是判斷第二個if時候,第一個if條件又成立了。走到後面的判斷,隻能說剛才前面的判斷不成立,現在前面的判斷不一定不成立了。
             是以進入一個if:要看2個判斷,之前判斷是什麼,現在判斷是什麼,才進入這個if條件 */
            /*casCellsBusy()用于鎖住這個cells,别的線程不能擴容和初始化和建立cell(但是可以cas更新存在的cell)。但是casCellsBusy()前後的條件不一定再次成立了,是以鎖住之後要再次判斷剛才的條件
             ,多線程時候上一次的判斷現在不一定成立了。*/
            /*局部變量,線程執行時候,局部變量不會變,成員變量會改變(修改屬性位址不變,重新new位址才改變)。as一進來就指派後面沒有更改過。*/
            /*在一個線程裡面:1.已經有cells,要麼建立(建立時候看是不是空),要麼更新(要看是不是原值),要麼擴容(要看cells有沒有變化)。2.沒有cells就去初始化(初始話時候再看是不是空)。3.初始化搶不赢就去更新base。*/
            //------------------------------------------重要-----多線程時候,在一個線程的判斷周期裡面,前一個指令的判斷(被另一個線程修改)現在不一定成立了---------------------------------------------------//
            
            
            //as == cells,隻有初始化和擴容(因為重新new)才不相等,修改值還是相等的。
            //每次重新來,都會重新擷取as和a,as = cells,a = as[(n - 1) & h],并且更新線程hash。
            
            // 1.已經有分段更新了cells!=null 
            if ((as = cells) != null && (n = as.length) > 0) {
                //1.1 沒有這個線程的Cell,建立
                //重新來:1.建立cell時候有人占用cell。2.建立cell時候位置不為空。 
                if ((a = as[(n - 1) & h]) == null) { 
                    if (cellsBusy == 0) { // cellsBusy=1表示有人在修改Cells數組(修改Cell從null到new Cell,擴容,初始化),CAS更新一個已經存在的Cell不用判斷cellsBusy。
                        Cell r = new Cell(x);  //這期間其他線程可以做很多事
                        if (cellsBusy == 0 && casCellsBusy()) {// cellsBusy是0就進來,然後變成cellsBusy=1,别的進不來。
                            
                            //不可能多個線程同時進這裡面來。
                            
                            /*鎖住cells,但是前面判斷a=null,現在不一定a=null了,因為在前面判斷到鎖住cells期間cells有可能改變了,并且cellsBusy從0變到1又變到0。是以鎖住之後在判斷是不是空。*/
                            
                            /*每次執行真正操作時候,有可能剛才判斷的條件全部不成立了,就要重來,那麼剛才判斷條件有什麼用:不沖突有用。
                            執行成功時候:在鎖住期間,剛才進來的判斷都成立,近似于單線程操作,或者别的操作了,但是不影響現在要操作的條件。*/
                            
                            boolean created = false;
                            try {  
                                Cell[] rs;
                                int m, j;
                                // 再次判斷沒有這個cell, 前面if判斷了是空,走到這裡時候有可能别人放進去了并且cellsBusy從0變到1再變到0了。如果不是null了,就不放,下次再來(直接更新)。
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;//  指派
                                    created = true;// 建立完成,退出,不用重新來了。
                                }
                            } finally {
                                cellsBusy = 0;//釋放鎖
                            }
                            if (created)// 建立完成,退出
                                break;
                            continue; // 這個線程沒有成功建立,肯定重頭再來
                        }
                    }
                    collide = false;// cell不存在,但是有人修改cells,collide = false,
                }
                //1.2有這個線程的cell
                //wasUncontended=false重新來
                else if (!wasUncontended) // wasUncontended=false表示更新失敗了,再來,wasUncontended=true下次不進這裡直接去cas更新,否則先不cas先再來一次。
                    wasUncontended = true; 
                //  1.3有這個線程的cell
                //wasUncontended=true,更新失敗了重新來。
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))//這個線程有Cell,去更新。  
                    break;// 更新成功,退出。
                // 1.4有這個線程的cell
                //wasUncontended=true,更新失敗,cells初始化擴容了,重新來
                else if (n >= NCPU || cells != as) // CPU能夠并行的CAS操作的最大數量是它的核心數 ,cells被改變了(擴容了肯定重新來)。
                    collide = false;  
                //  1.5有這個線程的cell
                //wasUncontended=true,更新失敗,cells沒有初始化擴容,collide=false,重新來
                else if (!collide) //=false走這裡,collide=true,下次不走這裡直接去擴容否則先不去擴容先再來一次。
                    collide = true; 
                // 1.6有這個線程的cell
                //wasUncontended=true,更新失敗,cells沒有初始化擴容,collide=true,占用cells,擴容完成,重新來
                //有這個線程的cell,cas失敗,說明2個線程同時更新這個cell,就擴容。既然你不讓我加,競争這麼厲害,那麼擴容試試看。
                else if (cellsBusy == 0 && /* 這期間其他線程可以做很多事 */casCellsBusy()) { 
                    
                    //不可能多個線程同時進這裡面來。
                    
                    try {
                        if (cells == as) { //鎖住cells了,最開始as = cells,但是現在as不一定=cells,是以判斷cells沒變擴容,
                            Cell[] rs = new Cell[n << 1];// 執行2倍擴容
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;// 釋放鎖
                    }
                    collide = false;// 擴容意向為false
                    continue; // 擴容後還沒有設定值(肯定重新來)
                }
                //1.7 有這個線程的cell
                h = advanceProbe(h);// 修改目前線程的hash,降低hash沖突(線程hash改變是無所謂的,關注的是裡面的值,與哪個線程放進去無關),避免下次還映射到這個cell。
            } 
            
            // 2。某個線程執行時候,前一個if判斷:沒有分段更新,cells==null或者cells.length=0。走到這裡時候cells有可能不為空了,但是要進入這if必須:cellsBusy=0,
            //同時cells還是剛才那個as=null(沒有擴容和初始化)并且casCellsBusy()搶成功,就去初始化。
            //線程執行到這裡:之前判斷:【沒有cells】,并且現在【沒人擴容或者初始化,并且cells為空】就初始化。
            else if (cellsBusy == 0 && cells == as && /*這期間其他線程可以做很多事*/casCellsBusy()) {// cellsBusy=1,别的線程就不能動cells
                
                //不可能多個線程同時進這裡面來。
                
                boolean init = false;
                try {  
                    if (cells == as) { //鎖住cells了,但是cells不一定=as=空或者null了, 鎖住之後一定要再檢測一次,如果還是null就初始化
                        Cell[] rs = new Cell[2];// 初始化時隻建立兩個單元
                        rs[h & 1] = new Cell(x);// 對其中一個單元進行累積操作,另一個不管,繼續為null
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;//  釋放鎖
                }
                if (init)//  初始化成功退出,初始化失敗繼續來
                    break;
            }
            
            // 3。走到這裡:前面判斷不成立(不代表現在的前面判斷也不成立),之前判斷:cells=null或者cells.length=0【沒有cells】并且cellsBusy=1或者 cells被改變了【有人正在初始化或擴容】或者casCellsBusy()失敗。
            //有了分段更新,還是可以用base,提高效率。準備去擴容的,但是現在有可能别人已經擴容了(cells != as)或者casCellsBusy()失敗(搶着去擴容沒有搶成功)
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) 
                break; // 更新base,成功就退出。
        }
        
        /*如果Cells表為空,嘗試擷取鎖之後初始化表(初始大小為2);
        如果Cells表非空,對應的Cell為空,自旋鎖未被占用,嘗試擷取鎖,添加新的Cell;
        如果Cells表非空,找到線程對應的Cell,嘗試通過CAS更新該值;
        如果Cells表非空,線程對應的Cell CAS更新失敗,說明存在競争,嘗試擷取自旋鎖之後擴容,将cells數組擴大,降低每個cell的并發量後再試*/
    }

    // double更long的邏輯基本上是一樣的
    final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false; // True if last slot nonempty
        for (;;) {
            Cell[] as;
            Cell a;
            int n;
            long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) { // Try to attach new Cell
                        Cell r = new Cell(Double.doubleToRawLongBits(x));
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try { // Recheck under lock
                                Cell[] rs;
                                int m, j;
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue; // Slot is now non-empty
                        }
                    }
                    collide = false;
                } else if (!wasUncontended) // CAS already known to fail
                    wasUncontended = true; // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? Double.doubleToRawLongBits(Double.longBitsToDouble(v) + x)
                        : Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false; // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) { // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue; // Retry with expanded table
                }
                h = advanceProbe(h);
            } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try { // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            } else if (casBase(v = base, ((fn == null) ? Double.doubleToRawLongBits(Double.longBitsToDouble(v) + x)
                    : Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))
                break; // Fall back on using base
        }
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped641.class;
            BASE = UNSAFE.objectFieldOffset(sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset(sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 一個Cell裡面一個value,可以看成是一個簡化的AtomicLong,通過cas操作來更新value的值
    // @sun.misc.Contended是一個高端的注解,代表使用緩存行填來避免僞共享
    @sun.misc.Contended
    static final class Cell {
        volatile long value;// cas更新其值
        Cell(long x) {
            value = x;
        }
        final boolean cas(long cmp, long val) {// cas更新
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

}      
public class LongAdder1 extends Striped641 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;

    public LongAdder1() {
    }
    
    /*看到這裡我想應該有很多人明白為什麼LongAdder會比AtomicLong更高效了,
    沒錯,唯一會制約AtomicLong高效的原因是高并發,高并發意味着CAS的失敗幾率更高,
    重試次數更多,越多線程重試,CAS失敗幾率又越高,變成惡性循環,AtomicLong效率降低。 
    那怎麼解決?** LongAdder給了我們一個非常容易想到的解決方案:減少并發,将單一value的更新壓力分擔到多個value中去
    (每個線程更新value數組裡面的自己value【多個線程通路的同一個數組(也隻有一個數組)但是cas更新的是隻是其中一個value】
    【因為數組有限,是以不同的線程也會出現同時cas更新一個value的情況】【會出現:多個線程同時通路這個數組的同一個cell】,
    不再是多個線程更新同一個value導緻cas經常失敗), 降低單個value的 “熱度”,分段更新 */
    /*這樣,線程數再多也會分擔到多個value上去更新,隻需要增加value就可以降低 value的 “熱度”  
    AtomicLong中的 惡性循環不就解決了嗎? cells 就是這個 “段” cell中的value 就是存放更新值的, 
    這樣,當我需要總數時,把cells 中的value都累加一下不就可以了麼!!*/
    /*當然,聰明之處遠遠不僅僅這裡,在看看add方法中的代碼,casBase方法可不可以不要,直接分段更新,上來就計算 索引位置,然後更新value?
    答案是不好,不是不行,因為,casBase操作等價于AtomicLong中的CAS操作,要知道,LongAdder這樣的處理方式是有壞處的,
    分段操作必然帶來空間上的浪費,可以空間換時間,但是,能不換就不換,看空間時間都節約~! 
    是以,casBase操作保證了在低并發時,不會立即進入分支做分段更新操作,因為低并發時,
    casBase操作基本都會成功,隻有并發高到一定程度了,才會進入分支,
    是以,Doug Lea對該類的說明是:** 低并發時LongAdder和AtomicLong性能差不多,高并發時LongAdder更高效!***/
    /*因為低并發時候,使用的是base的原子更新,沒有啟用分段更新(cells=null,并且casBase成功),高并發才啟用分段更新。*/
    /*如此,longAccumulate中做了什麼事,也基本略知一二了,因為cell中的value都更新失敗(說明該索引到這個cell的線程也很多
    ,并發也很高時) 或者cells數組為空時才會調用longAccumulate,*/
    
    // +x,并發計數器LongAdder加X。要麼在base+x更新要麼在Cell[]數組裡面找到對應的Cell+x更新。
    public void add(long x) {//base和cells隻有一個,并且是LongAdder的屬性。
        Cell[] as;
        long b, v;
        int m;
        Cell a;
        //cells!=null不用判斷後面進去(表明已經啟用了分段更新),cells=null并且base的cas更新失敗進去(表示沒有啟用分段更新但是高并發了,
        //需要啟用分段更新),cells=null并且base的cas更新成功就退出(沒有啟用分段更新,并且不是高并發,此時跟AotomicLong是一樣的)。
        //并發時候更新失敗,AtomicLong的處理方式是死循環嘗試更新,直到成功才傳回,而LongAdder則是進入這個分支。
        if ((as = cells) != null || !casBase(b = base, b + x)/*cas把base的值從b變成b+x*/) {
            //進來:1.已經啟用分段更新了。2.沒有啟用分段更新但是cas失敗了表示高并發了。否則:沒有啟用分段更新并且不是高并發,就不進來。
            boolean uncontended = true;
            if (as == null //cells=null進去,沒有啟用分段更新(進來了)表示高并發了。
                || (m = as.length - 1) < 0  //cells.length<=0,沒有啟用分段更新(進來了)表示高并發了。
                || (a = as[getProbe() & m]) == null  //對as的長度取餘,從as中擷取這個線程對應的a Cell。=null表示還沒有這個線程對應的cell,
                || !(uncontended = a.cas(v = a.value, v + x)))  //a這個Cell裡面的value增加x失敗, 更新成功就不會進下面了。
                
                //1.cells=null。2.cells!=null但沒有這個線程的Cell。2.有這個線程的Cell但是更新失敗了。
                longAccumulate(x, null, uncontended); //uncontended=false表示更新失敗了,=true表示沒有這個線程的Cell(不可能是更新成功了,更新成功就進不來這裡)。
        }
    }

    public void increment() {
        add(1L);
    }

    public void decrement() {
        add(-1L);
    }

    //将多個cell數組中的值加起來的和就類似于AtomicLong中的value
    // 此傳回值可能不是絕對準确的,因為調用這個方法時還有其他線程可能正在進行計數累加,
    //     方法的傳回時刻和調用時刻不是同一個點,在有并發的情況下,這個值隻是近似準确的計數值
    // 高并發時,除非全局加鎖,否則得不到程式運作中某個時刻絕對準确的值,但是全局加鎖在高并發情況下是下下策
    // 在很多的并發場景中,計數操作并不是核心,這種情況下允許計數器的值出現一點偏差,此時可以使用LongAdder
    // 在必須依賴準确計數值的場景中,應該自己處理而不是使用通用的類。
    public long sum() {
        Cell[] as = cells;
        Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

    public void reset() {
        Cell[] as = cells;
        Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }

    public long sumThenReset() {
        Cell[] as = cells;
        Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }

    public String toString() {
        return Long.toString(sum());
    }

    public long longValue() {
        return sum();
    }

    public int intValue() {
        return (int) sum();
    }

    public float floatValue() {
        return (float) sum();
    }

    public double doubleValue() {
        return (double) sum();
    }

    private static class SerializationProxy implements Serializable {
        private static final long serialVersionUID = 7249069246863182397L;

        private final long value;//LongAdder1的總和

        SerializationProxy(LongAdder1 a) {
            value = a.sum();
        }

        private Object readResolve() {
            LongAdder1 a = new LongAdder1();
            a.base = value;
            return a;
        }
    }

    private Object writeReplace() {
        return new SerializationProxy(this);
    }

    private void readObject(java.io.ObjectInputStream s) throws java.io.InvalidObjectException {
        throw new java.io.InvalidObjectException("Proxy required");
    }

}      
public abstract class Number1 implements java.io.Serializable {
    
    public abstract int intValue();

    public abstract long longValue();

    public abstract float floatValue();

    public abstract double doubleValue();

    /*
     System.out.println((byte)127);//127
    System.out.println((byte)128);//-128
    System.out.println((byte)129);//-127
    System.out.println((byte)255);//-1
    System.out.println((byte)256);//0
    System.out.println((byte)257);//1
     */
    public byte byteValue() {
        return (byte) intValue();
    }

    public short shortValue() {
        return (short) intValue();
    }

    private static final long serialVersionUID = -8742448824652078965L;
}