天天看点

doubleAdder的性能为何比Atomic好

1.引言

       JDK8之前,我们对于简单类型在高并发下的原子性,多数情况下,都会使用Atomic类型来控制,比如AtomicInteger、AtomicLong等,

       其原理是通过CAS(compare and swap)来进行原子性控制,该方法是调用Unsafe类控制的,是通过JNI调用相关的DLL进行CPU

       级别的操作。但是如果真是在高并发的情况下,针对单一变量的多次自旋,性能的消耗也是很严重的。那下面来介绍下面的;

2.DoubleAdder和LongAdder

       但是在JDK8里有两个类,DoubleAdder和LongAdder,这两个类都继承Striped64类,我们先来看Striped64这个类官方给的解释:

       Apackage-local class holding common representation and mechanics for classessupporting dynamic striping on 64bit

       values.The class extends Number so that concrete subclasses must publicly do so.

       翻译:一个包含通用表示和机制的包本地类对于支持64位值的动态条带的类。类扩展数字,以便具体的子类必须公开这样做。

       然后这个类下面有个内部类,Cell,源码如下,

doubleAdder的性能为何比Atomic好

再看上面有个注解@sun.misc.Contended,这个就是对数据进行缓存填充,专业术语

       就是内存填充,知名框架Disruptor下也使用了类似的方式,但是当时JDK8还未出来,使用的是8个long,为什么是64位,自行补充java内存模型;

       下面我们直接分析方法,下面是Striped64的方法,类似的还有个是long的,原理类似就不做分析了

   final void doubleAccumulate(double x, DoubleBinaryOperator fn,

                                booleanwasUncontended) {

       int h;

              //初始化指针地址

       if ((h = getProbe()) == 0) {

           ThreadLocalRandom.current(); // force initialization

           h = getProbe();

           wasUncontended = true;

       }

              //冲突标记

       boolean collide = false;               // True if last slot nonempty

       for (;;) {//线程自旋

           Cell[] as; Cell a; int n; long v;

           if ((as = cells) != null && (n = as.length) > 0) {//判定存储数组有值,讲cells赋值给as

                if ((a = as[(n - 1) & h])== null) {\\当前指针位置数据是否为空

                    if (cellsBusy == 0) {       // Try to attach new Cell

                        Cell r = newCell(Double.doubleToRawLongBits(x));

                        if (cellsBusy == 0&& casCellsBusy()) {//加锁

                            boolean created =false;

                            try {               // Recheck under lock

                                Cell[] rs; int m, j;

                                if ((rs =cells) != null &&

                                    (m =rs.length) > 0 &&

                                    rs[j = (m -1) & h] == null) {

                                    rs[j] = r;

                                    created =true;

                                }

                            } finally {

                                cellsBusy = 0;

                            }

                            if (created)

                                break;

                            continue;           // Slot is now non-empty

                        }

                    }

                    collide = false;

                }

                else if (!wasUncontended)       //CAS already known to fail

                    wasUncontended = true;      // Continue after rehash

                else if (a.cas(v = a.value,

                               ((fn == null) ?

                               Double.doubleToRawLongBits

                               (Double.longBitsToDouble(v) + x) :

                               Double.doubleToRawLongBits

                               (fn.applyAsDouble

                                (Double.longBitsToDouble(v), x)))))

                   break;

                else if (n >= NCPU || cells!= as)

                    collide = false;            // At max size or stale

                else if (!collide)

                    collide = true;

                else if (cellsBusy == 0&& casCellsBusy()) {//指针达到最后,未锁,抢占锁

                    try {

                        if (cells == as) {      // 数组扩张一倍

                            Cell[] rs = newCell[n << 1];

                            for (int i = 0; i< n; ++i)

                                rs[i] = as[i];

                            cells = rs;

                        }

                    } finally {

                        cellsBusy = 0;

                    }

                    collide = false;

                    continue;                   // Retry with expanded table

                }

                h = advanceProbe(h);//指针往后偏移

           }

                     //第一次添加数据,未被抢占锁,占锁

           else if (cellsBusy == 0 && cells == as &&casCellsBusy()//加锁) {

                boolean init = false;

                try {                           // Initialize table

                    if (cells == as) {

                        Cell[] rs = newCell[2];

                                          //数据存入

                        rs[h & 1] = newCell(Double.doubleToRawLongBits(x));

                       cells = rs;

                        init = true;

                    }

                } finally {

                                   //解锁

                    cellsBusy = 0;

                }

                if (init)

                    break;

           }

                     //高并发下初始情况下,cells还未来得及填充,通过和atomic一样的方式询问处理

           else if (casBase(v = base,

                             ((fn == null) ?

                             Double.doubleToRawLongBits

                             (Double.longBitsToDouble(v) + x) :

                              Double.doubleToRawLongBits

                              (fn.applyAsDouble

                              (Double.longBitsToDouble(v), x)))))

                break;                          // Fall back on usingbase

       }

}

3.小结

Strip64通过cell数组和内存填充(Contented),分减少多线程针对同一变量的资源并发竞争和较少不同线程针对CPU针对内存缓存最小颗粒度的内存共享问题,这样就相对于Atomic提高了性能

4.总结

DoubleAdder数据存在一个数组里,我们看到,在DoubleAdder的sum方法里,对于cell数组进行了循环遍历累加,doubleValue也是调用的此方法,下面是源码

doubleAdder的性能为何比Atomic好

5.思维扩展

本身DoubleAdder的设计思想其实在诸多方面都可以应用,比如电商行业的库存管理,在商品的下单过程中,如果商品很热门,短时间内该商品下单量剧增,比如双11,双12,通过将高竞争资源分布式管理,能显著提升库存扣减的性能