天天看點

AtomicLong LongAdder 源碼解析 Doug Lea太強了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源碼四 總結

文章目錄

  • 一 AtomicLong原理
  • 二 LongAdder原理
  • 三 LongAdder源碼
  • 四 總結

一 AtomicLong原理

AtomicLong通過cas+自旋更新AtomicLong中的value值,進而保證value的原子性,N個線程同時改變value的值,隻能有一個線程更新成功,其他線程這次的cas是失敗的

AtomicLong LongAdder 源碼解析 Doug Lea太強了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源碼四 總結

由于一次隻能有一個線程修改成功,其他線程得要自旋,一直到修改失敗,在并發量比較高的情況下,可能會導緻這些線程占用長時間修改值失敗,導緻cas次數過多。

二 LongAdder原理

在高并發場景下LongAdder可以顯著的提高性能。使用了空間換時間的思想,咋atomiclong中cas對一個value值的修改,使用了一個數組經行分散修改,這樣cas修改失敗的機率就會小許多。

例如有一個base值,有三個線程要對執行+1操作,這個時候隻有一個線程能成功使用cas對base修改成功,其他線程都會轉而去修改cell數組的值,例如cell[2],0,1位置都是0,這個時候要對

三 LongAdder源碼

  • add方法 流程圖
    AtomicLong LongAdder 源碼解析 Doug Lea太強了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源碼四 總結
public void add(long x) {
       //  as表示cells的引用
       // b  表示擷取的base的值
       // v 表示期望值
       // m為數組的長度
       // 表示目前線程路由命中的cell單元格
        Cell[] as; long b, v; int m; Cell a;
        //條件1:true 表示數組不為空 目前線程應該将資料寫入到對應的cell中
                 false 表示數組未初始化為空,目前線程應該将資料寫base中
        //條件2 :true 表示cas改變base失敗 可能需要重試 或者 擴容
		
        if ((as = cells) != null || !casBase(b = base, b + x)) {
        //進入代碼塊的條件:
        1 數組已經出初始化了,目前線程應該将資料寫入到對應的cell中
        2  數組為空,但是cas改變base出現了競争
            boolean uncontended = true;

			 //條件1 2:判斷數組是否為空,如果不是空,繼續判斷條件3
			 //條件3 :getProbe()擷取目前線程的hash值(代名詞以下都說是hash值),m表示cells長度-1,cells長度 一定是2的次方數,
			 true :表示目前線程經過路由後對應的數組的桶位為空,需要建立對象
			 false:說明目前線程對應的cell 不為空,說明 下一步想要将x值 添加到cell中。
			 //條件4:true 表示cas失敗,表明有競争
			 false:表示cas成功
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //都有哪些情況會調用?
                //1 cells數組位空
                // 2 線程路由後對應下标的cells位空
                // 3  cas 修改cells對應的位置失敗
                longAccumulate(x, null, uncontended);
        }
    }
           
  • striped64類中屬性
目前機器cpu的數量,用來限定數組的大小
static final int NCPU = Runtime.getRuntime().availableProcessors();
用來減少cas替換次數的數組
 transient volatile Cell[] cells;
 沒有發生過競争時,資料會累加到 base上 | 當cells擴容時,需要将資料寫到base中
 transient volatile long base;
樂觀鎖  初始化cells或者擴容cells都需要擷取鎖,0 表示無鎖狀态,1 表示其他線程已經持有鎖了
  transient volatile int cellsBusy;
           
  • 重要方法
通過CAS方式擷取鎖
   final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
    擷取目前線程的Hash值(不知道啥值,就代叫hash值,全文都是)
   static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    重置目前線程的hash值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

           
  • longAccumulate方法,感覺這個方法的判斷條件和煉丹一樣

流程圖:省去了一些細節,要結合代碼看

AtomicLong LongAdder 源碼解析 Doug Lea太強了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源碼四 總結
會執行這個方法的原因
    case 1: cells沒有初始化,線程修改base競争失敗,需要[重試|初始化cells]
    case 2:目前線程路由後對應下标cell為空
    case 3:cas修改目前線程路由後對應的cell桶的值失敗,意味着目前線程對應的cell 有競争[重試|擴容]
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //目前線程的hash值                      
        int h;
        //條件成立說明目前線程還沒有擷取hash值
        if ((h = getProbe()) == 0) {
          給目前線程配置設定hash值
            ThreadLocalRandom.current(); // force initialization
            擷取hash值
            h = getProbe();
            修改為沒有發生競争,為什麼,因為如果目前線程的hash值為0,也就是會預設的寫到數組為0的位置,
            。不能算作真正的競争了值
            wasUncontended = true;
        }
        //表示擴容意向false,一定不會擴容,true可能會擴容
        boolean collide = false;                // True if last slot nonempty
        //自旋
        for (;;) {
        //as  數組引用  a 表示目前線程命中的cell n數組的長度
        //v  表示期望值
            Cell[] as; Cell a; int n; long v;
        //case 1數組 表示cells已經初始化了,目前線程應該将資料寫入到對應的cell中
            if ((as = cells) != null && (n = as.length) > 0) {
				//進入代碼塊的原因: 
				 1.目前線程路由後對應下标cell為空
				 2 cas修改目前線程路由後對應的cell桶的值失敗,意味着目前線程對應的cell 有競争[重試|擴容]

            case 1.1  線程路由桶位位空   需要建立new Cell
                if ((a = as[(n - 1) & h]) == null) {
                  //true->表示目前鎖 未被占用  false->表示鎖被占用
                    if (cellsBusy == 0) {       // Try to attach new Cell
                    //樂觀的建立Cell對象 為啥?
                        Cell r = new Cell(x);   // Optimistically create
                        //擷取鎖
                        if (cellsBusy == 0 && casCellsBusy()) {
                        //是否建立成功标志
                            boolean created = false;
                            try {               // Recheck under lock
                            //rs 數組的引用  m 數組的長度
                             //j 路由位置
                                Cell[] rs; int m, j;
                                //條件 1 ,2恒成立  ,不知道有啥用
                                //條件 3 為了防止其它線程初始化過 該位置,然後目前線程再次初始化該位置  ,導緻資料丢失
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally { 
                            //釋放鎖
                                cellsBusy = 0;
                            }
                            //建立成功,跳出
                            if (created)
                                             break;
                             //初始化桶位失敗,再次自旋
                            continue;           // Slot is now non-empty
                        }
                    }
                    //擴容意向,改為false
                    collide = false;
                }
                case 1.2 隻有cells初始化之後,并且目前線程 競争修改失敗,才會是false
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                 
               case 1.3 目前線程rehash過hash值,然後新命中的cell不為空
				true   修改成功,退出循環
				false 表示rehash之後命中的新的cell也有競争,
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                case 1.4  條件一: n>= NCPU true  擴容意向 改為false
                           條件二: true 表示其他線程擴容了這個數組,擴容意向改為false 目前線程rehash之後重試即可
                else if (n >= NCPU || cells != as)   
                    collide = false;            // At max size or stale
                 //case 1.5 修改擴容意向
                 !collide = true 設定擴容意向 為true 但是不一定真的發生擴容
                else if (!collide)
                    collide = true;

                case 1.6 擴容邏輯
                    //條件一:cellsBusy == 0 true->表示目前無鎖狀态,目前線程可以去競争這把鎖
                    //條件二:casCellsBusy true->表示目前線程 擷取鎖 成功,可以執行擴容邏輯
                    // false->表示目前時刻有其它線程在做擴容相關的操作。
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                    //true 說明 數組沒有被其他線程修改過
                        if (cells == as) {      // Expand table unless stale
                        // 将數組的長度擴容為原數組長度的兩倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                    釋放樂觀鎖
                        cellsBusy = 0;
                    }
                    //擴容意向 修改為false
                    collide = false;
                    //再次自旋
                    continue;                   // Retry with expanded table
                }
                //充值目前線程的hash值
                h = advanceProbe(h);
            }
   //case 2 數組還沒初始化
     條件一:true表示目前還沒加鎖
     條件二:確定其他線程沒有初始化cells數組
     條件三:true表示成功擷取鎖,false表示擷取鎖失敗
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                 //防止其它線程已經初始化了,目前線程再次初始化 導緻丢失資料
                    if (cells == as) {
                        //初始化長度為2的數組
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                //釋放鎖
                    cellsBusy = 0;
                }
                //如果初始化成功 也就是寫資料成功了,
                if (init)
                    break;
            }
            // 1.目前cellsBusy加鎖狀态,表示其它線程正在初始化cells,是以目前線程将值累加到base
            // 2  cells被其它線程初始化後,目前線程需要将資料累加到base
            cas替換成功直接退出
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
           
  • 惰性求和

    LongAdder隻有在使用longValue()擷取目前累加值時才會真正的去結算計數的資料,longValue()方法底層就是調用sum()方法,對base和Cell數組的資料累加然後傳回,做到資料寫入和讀取分離,這個方法不存在競争就不貼源碼了

四 總結

LongAdder中最核心的思想就是利用空間來換時間,将熱點value分散成一個Cellss數組來承接并發的CAS,以此來提升性能,但是sum方法可能不能保證明時性,LongAdder的性能全面超越了AtomicLong,而且阿裡巴巴開發手冊也提及到 推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀鎖的重試次數).