天天看点

AtomicLong LongAdder 源码解析 Doug Lea太强了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源码四 总结

文章目录

  • 一 AtomicLong原理
  • 二 LongAdder原理
  • 三 LongAdder源码
  • 四 总结

一 AtomicLong原理

AtomicLong通过cas+自旋更新AtomicLong中的value值,进而保证value的原子性,N个线程同时改变value的值,只能有一个线程更新成功,其他线程这次的cas是失败的

AtomicLong LongAdder 源码解析 Doug Lea太强了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源码四 总结

由于一次只能有一个线程修改成功,其他线程得要自旋,一直到修改失败,在并发量比较高的情况下,可能会导致这些线程占用长时间修改值失败,导致cas次数过多。

二 LongAdder原理

在高并发场景下LongAdder可以显著的提高性能。使用了空间换时间的思想,咋atomiclong中cas对一个value值的修改,使用了一个数组经行分散修改,这样cas修改失败的概率就会小许多。

例如有一个base值,有三个线程要对执行+1操作,这个时候只有一个线程能成功使用cas对base修改成功,其他线程都会转而去修改cell数组的值,例如cell[2],0,1位置都是0,这个时候要对

三 LongAdder源码

  • add方法 流程图
    AtomicLong LongAdder 源码解析 Doug Lea太强了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源码四 总结
public void add(long x) {
       //  as表示cells的引用
       // b  表示获取的base的值
       // v 表示期望值
       // m为数组的长度
       // 表示当前线程路由命中的cell单元格
        Cell[] as; long b, v; int m; Cell a;
        //条件1:true 表示数组不为空 当前线程应该将数据写入到对应的cell中
                 false 表示数组未初始化为空,当前线程应该将数据写base中
        //条件2 :true 表示cas改变base失败 可能需要重试 或者 扩容
		
        if ((as = cells) != null || !casBase(b = base, b + x)) {
        //进入代码块的条件:
        1 数组已经出初始化了,当前线程应该将数据写入到对应的cell中
        2  数组为空,但是cas改变base出现了竞争
            boolean uncontended = true;

			 //条件1 2:判断数组是否为空,如果不是空,继续判断条件3
			 //条件3 :getProbe()获取当前线程的hash值(代名词以下都说是hash值),m表示cells长度-1,cells长度 一定是2的次方数,
			 true :表示当前线程经过路由后对应的数组的桶位为空,需要创建对象
			 false:说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。
			 //条件4:true 表示cas失败,表明有竞争
			 false:表示cas成功
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //都有哪些情况会调用?
                //1 cells数组位空
                // 2 线程路由后对应下标的cells位空
                // 3  cas 修改cells对应的位置失败
                longAccumulate(x, null, uncontended);
        }
    }
           
  • striped64类中属性
当前机器cpu的数量,用来限定数组的大小
static final int NCPU = Runtime.getRuntime().availableProcessors();
用来减少cas替换次数的数组
 transient volatile Cell[] cells;
 没有发生过竞争时,数据会累加到 base上 | 当cells扩容时,需要将数据写到base中
 transient volatile long base;
乐观锁  初始化cells或者扩容cells都需要获取锁,0 表示无锁状态,1 表示其他线程已经持有锁了
  transient volatile int cellsBusy;
           
  • 重要方法
通过CAS方式获取锁
   final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
    获取当前线程的Hash值(不知道啥值,就代叫hash值,全文都是)
   static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    重置当前线程的hash值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

           
  • longAccumulate方法,感觉这个方法的判断条件和炼丹一样

流程图:省去了一些细节,要结合代码看

AtomicLong LongAdder 源码解析 Doug Lea太强了!!!一 AtomicLong原理二 LongAdder原理三 LongAdder源码四 总结
会执行这个方法的原因
    case 1: cells没有初始化,线程修改base竞争失败,需要[重试|初始化cells]
    case 2:当前线程路由后对应下标cell为空
    case 3:cas修改当前线程路由后对应的cell桶的值失败,意味着当前线程对应的cell 有竞争[重试|扩容]
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //当前线程的hash值                      
        int h;
        //条件成立说明当前线程还没有获取hash值
        if ((h = getProbe()) == 0) {
          给当前线程分配hash值
            ThreadLocalRandom.current(); // force initialization
            获取hash值
            h = getProbe();
            修改为没有发生竞争,为什么,因为如果当前线程的hash值为0,也就是会默认的写到数组为0的位置,
            。不能算作真正的竞争了值
            wasUncontended = true;
        }
        //表示扩容意向false,一定不会扩容,true可能会扩容
        boolean collide = false;                // True if last slot nonempty
        //自旋
        for (;;) {
        //as  数组引用  a 表示当前线程命中的cell n数组的长度
        //v  表示期望值
            Cell[] as; Cell a; int n; long v;
        //case 1数组 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
            if ((as = cells) != null && (n = as.length) > 0) {
				//进入代码块的原因: 
				 1.当前线程路由后对应下标cell为空
				 2 cas修改当前线程路由后对应的cell桶的值失败,意味着当前线程对应的cell 有竞争[重试|扩容]

            case 1.1  线程路由桶位位空   需要创建new Cell
                if ((a = as[(n - 1) & h]) == null) {
                  //true->表示当前锁 未被占用  false->表示锁被占用
                    if (cellsBusy == 0) {       // Try to attach new Cell
                    //乐观的创建Cell对象 为啥?
                        Cell r = new Cell(x);   // Optimistically create
                        //获取锁
                        if (cellsBusy == 0 && casCellsBusy()) {
                        //是否创建成功标志
                            boolean created = false;
                            try {               // Recheck under lock
                            //rs 数组的引用  m 数组的长度
                             //j 路由位置
                                Cell[] rs; int m, j;
                                //条件 1 ,2恒成立  ,不知道有啥用
                                //条件 3 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置  ,导致数据丢失
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally { 
                            //释放锁
                                cellsBusy = 0;
                            }
                            //创建成功,跳出
                            if (created)
                                             break;
                             //初始化桶位失败,再次自旋
                            continue;           // Slot is now non-empty
                        }
                    }
                    //扩容意向,改为false
                    collide = false;
                }
                case 1.2 只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                 
               case 1.3 当前线程rehash过hash值,然后新命中的cell不为空
				true   修改成功,退出循环
				false 表示rehash之后命中的新的cell也有竞争,
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                case 1.4  条件一: n>= NCPU true  扩容意向 改为false
                           条件二: true 表示其他线程扩容了这个数组,扩容意向改为false 当前线程rehash之后重试即可
                else if (n >= NCPU || cells != as)   
                    collide = false;            // At max size or stale
                 //case 1.5 修改扩容意向
                 !collide = true 设置扩容意向 为true 但是不一定真的发生扩容
                else if (!collide)
                    collide = true;

                case 1.6 扩容逻辑
                    //条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁
                    //条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑
                    // false->表示当前时刻有其它线程在做扩容相关的操作。
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                    //true 说明 数组没有被其他线程修改过
                        if (cells == as) {      // Expand table unless stale
                        // 将数组的长度扩容为原数组长度的两倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                    释放乐观锁
                        cellsBusy = 0;
                    }
                    //扩容意向 修改为false
                    collide = false;
                    //再次自旋
                    continue;                   // Retry with expanded table
                }
                //充值当前线程的hash值
                h = advanceProbe(h);
            }
   //case 2 数组还没初始化
     条件一:true表示当前还没加锁
     条件二:确保其他线程没有初始化cells数组
     条件三:true表示成功获取锁,false表示获取锁失败
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                 //防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
                    if (cells == as) {
                        //初始化长度为2的数组
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                //释放锁
                    cellsBusy = 0;
                }
                //如果初始化成功 也就是写数据成功了,
                if (init)
                    break;
            }
            // 1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,所以当前线程将值累加到base
            // 2  cells被其它线程初始化后,当前线程需要将数据累加到base
            cas替换成功直接退出
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
           
  • 惰性求和

    LongAdder只有在使用longValue()获取当前累加值时才会真正的去结算计数的数据,longValue()方法底层就是调用sum()方法,对base和Cell数组的数据累加然后返回,做到数据写入和读取分离,这个方法不存在竞争就不贴源码了

四 总结

LongAdder中最核心的思想就是利用空间来换时间,将热点value分散成一个Cellss数组来承接并发的CAS,以此来提升性能,但是sum方法可能不能保证实时性,LongAdder的性能全面超越了AtomicLong,而且阿里巴巴开发手册也提及到 推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观锁的重试次数).