天天看点

Java 原子性CAS 原理 ,atomic包 详解

java.util.concurrent.atomic包类的原理

atomic包底层原理都是Unsafe.compareAndSwapXXX()方法,也就是通常说的CAS,compareAndSwapXXX方法是native标识的方法,是Java底层代码,不是Java代码实现的,

public final class Unsafe {
	//object
    public final native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
	//int
    public final native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
	//long
    public final native boolean compareAndSwapLong(Object obj, long offset, long expect, long update);

}
           

都是有4个参数:

第一个参数 obj:要更新的变量名

第二个参数 offset:主内存中该变量的值

第三个参数 expect:期待的该变量的值

第四个参数 update:如果offset==expect,则更新obj为update值

java.util.concurrent.atomic包结构

Java 原子性CAS 原理 ,atomic包 详解

AtomicBoolean

AtomicBoolean原理,有一个int 类型的成员变量,该值只可能为1-true,0-false,两个

AtomicBoolean有一点值得注意的,就是AtomicBoolean的方法compareAndSet(),只会执行一次,

//AtomicBoolean 方法
   	public final boolean compareAndSet(boolean expect, boolean update) {
        int e = expect ? 1 : 0;
        int u = update ? 1 : 0;
        return unsafe.compareAndSwapInt(this, valueOffset, e, u);
    }
           

AtomicInteger 原子操作一个Integer类型的变量

以getAndAdd 方法为例,其他方法原理一样:

//AtomicInteger 的方法
   	public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
    
    /**
    * Unsafe 类的方法
    * 三个参数,比如 int a = 2+1
    * 第一个参数var1代表变量名a
    * 第二个参数var2代表变量a期待的值为2
    * 第三个参数var4代表如果var2的值为2,则进行加法的值
    */
	public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
        	//var5 表示调用底层方法得到的变量var1在主内存中的值
            var5 = this.getIntVolatile(var1, var2);
            	// var1 表示当前变量,var2表示期待的值,var5表示该变量在底层的值,
            	// 如果var2==var5 ,则进行var5 + var4操作
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }
           

上面代码,是个循环,只有设置值成功才会退出,如果预期的值和底层的值不一样,则还会进入循环,重新获取底层变量的值var5 ,预期值var2也重新从对象var1中获取,然后对比var2,var5,直到两个值相等,则进行运算操作,退出循环。这里可以看出,如果修改这个变量的线程非常多并发量非常大时,var2,var5一直不相等,cpu 就会一直空转,浪费cpu资源,所以atomic包适合并发量不大的场景

AtomicIntegerArray原子操作Integer类型的数组的类

AtomicIntegerArray 和AtomicInteger区别不大,就是一个是单个Integer变量,一个是Integer数组,操作数组中的每个数也都是原子操作方法

以getAndAdd 方法为例,其他方法原理一样:

//AtomicIntegerArray 方法,第一个参数数数组坐标,第二个参数是要加的值
	//返回值为做加法前的数值
	public final int getAndAdd(int i, int delta) {
		//checkedByteOffset方法,获取数组对应i坐标的变量值
        return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
    }

 	/**
    * Unsafe 类的方法
    * 三个参数,比如 int a = 2+1
    * 第一个参数var1代表变量名a
    * 第二个参数var2代表变量a期待的值为2
    * 第三个参数var4代表如果var2的值为2,则进行加法的值
    */
	public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
        	//var5 表示调用底层方法得到的变量var1在主内存中的值
            var5 = this.getIntVolatile(var1, var2);
            	// var1 表示当前变量,var2表示期待的值,var5表示该变量在底层的值,
            	// 如果var2==var5 ,则进行var5 + var4操作
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
		//返回值是操作前的值
        return var5;
    }
           

AtomicIntegerFieldUpdater

这个类可以cas原子更新一个类的一个成员变量的值,并且要求成员变量非static并且volatile修饰,例如:

@Slf4j
public class Test {

    //泛型是类:Test,
    //newUpdater方法第一个参数是类的class,第二个参数数该类的成员变量名
    private static AtomicIntegerFieldUpdater<Test> updater = AtomicIntegerFieldUpdater.newUpdater(Test.class,"count");

    //注意该变量必须由volatile关键字修饰,
    //并且注意不能是包装类,也不能是static修饰,否则会报错
    private volatile int count=100;

    public static void main(String[] args) throws Exception {
         Test test = new Test();
        if(updater.compareAndSet(test ,100,120)){
            log.info("【1】如果类对象test 的成员变量count的值是100,则更新为120,----更新成功");
        }else{
            log.info("【2】如果类对象test 的成员变量count的值是100,则更新为120,----更新失败");
        }
        if(updater.compareAndSet(test ,100,120)){
            log.info("【3】如果类对象test 的成员变量count的值是100,则更新为120,----更新成功");
        }else{
            log.info("【4】如果类对象test 的成员变量count的值是100,则更新为120,----更新失败");
        }
    }

    public Integer getCount() {
        return count;
    }
}

           

运行结果:

14:29:54.953 [main] INFO test.Test - 【1】如果类对象test 的成员变量count的值是100,则更新为120,----更新成功
14:29:54.958 [main] INFO test.Test - 【4】如果类对象test 的成员变量count的值是100,则更新为120,----更新失败
           

AtomicLong

同AtomicInteger

AtomicLongArray

同AtomicIntegerArray

AtomicLongFieldUpdater

同AtomicIntegerFieldUpdater

AtomicMarkableReference

这个跟AtomicStampedReference类似,也有个标识,只是AtomicStampedReference的版本号可以随意加,可以为很多版本,但是AtomicMarkableReferenced的标识只能为boolean值,即true,false,例子:

public class Test {
    /**
     * 构造函数,第一个参数是初始值,第二个参数是初始版本号
     */
    static AtomicMarkableReference<Integer> atomicStampedReference = new AtomicMarkableReference(0, false);

    public static void main(String[] args) throws InterruptedException {
        final int reference = atomicStampedReference.getReference();
        System.out.println("初始值:" + reference );

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("更新前值:" + reference + " --》  如果值为0,标志为false,则更新值为10,标志为true,是否更新成功:"
                        + atomicStampedReference.compareAndSet(reference, reference + 10, false, true));
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                Integer reference = atomicStampedReference.getReference();
                System.out.println("更新前值:" + reference + " --》 如果值为0,标志为false,则更新值为10,标志为true,是否更新成功:"
                        + atomicStampedReference.compareAndSet(reference, reference + 10, false, true));
            }
        });
        t1.start();
        t1.join();
        t2.start();
        t2.join();

        System.out.println("更新后值:" + atomicStampedReference.getReference());
    }


}

           

执行结果:

初始值:0
更新前值:0 --》  如果值为0,标志为false,则更新值为10,标志为true,是否更新成功:true
更新前值:10 --》 如果值为0,标志为false,则更新值为10,标志为true,是否更新成功:false
更新后值:10
           

AtomicReference

泛型传入一个类,然后cas原子操作这个变量

public static void main(String[] args) throws Exception {
      AtomicReference<Character> characterAtomicReference = new AtomicReference<>('a');
      //如果characterAtomicReference 的值为a,则更新为c
      characterAtomicReference.compareAndSet('a','c'); //c
      characterAtomicReference.compareAndSet('b','d');//no
      characterAtomicReference.compareAndSet('c','e');//e
      characterAtomicReference.compareAndSet('d','g');//no
      characterAtomicReference.compareAndSet('e','j');//j
      characterAtomicReference.compareAndSet('j','w');//w
      System.out.println(characterAtomicReference.get());//w

  }
           

AtomicReferenceArray

同AtomicIntegerArray

AtomicReferenceFieldUpdater

同 AtomicIntegerFieldUpdater

AtomicStampedReference

该类,可以解决CAS的ABA问题,

cas 的ABA问题是指,变量a从1 更新为2,又从2更新为1,其他线程查询是1,以为这个变量没有更新,实际这个变量更新过。解决办法,就是添加版本号,比如 变量a从1 更新为2版本为v1,又从2更新为1版本为v2,

//AtomicStampedReference 核心类
 public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&  //这里多了版本比较,
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
           

LongAdder简介

JDK1.8时,java.util.concurrent.atomic包中提供了一个新的原子类:LongAdder。

根据Oracle官方文档的介绍,LongAdder在高并发的场景下会比它的前辈————AtomicLong 具有更好的性能,代价是消耗更多的内存空间:

那么,问题来了:

为什么要引入LongAdder? AtomicLong在高并发的场景下有什么问题吗? 如果低并发环境下,
LongAdder和AtomicLong性能差不多,那LongAdder是否就可以替代AtomicLong了?
           

为什么要引入LongAdder?

我们知道,AtomicLong是利用了底层的CAS操作来提供并发性的,比如addAndGet方法:

/**
     * 以原子方式将给定值添加到当前值。
	 * 参数:增量-要添加的值
	 * 返回值:更新值
     */
 	public final long addAndGet(long delta) {
    	//unsafe.getAndAddInt 方法进行cas 更新操作,返回值是操作前的值
    	//addAndGet方法返回的是操作后的值,所以是,返回前值+delta
        return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
    }

	/**
    * Unsafe 类的方法
    * 三个参数,比如 int a = 2+1
    * 第一个参数var1代表变量名a
    * 第二个参数var2代表变量a期待的值为2
    * 第三个参数var4代表如果var2的值为2,则进行加法的值
    */
	public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
        	//var5 表示调用底层方法得到的变量var1在主内存中的值
            var5 = this.getIntVolatile(var1, var2);
            	// var1 表示当前变量,var2表示期待的值,var5表示该变量在底层的值,
            	// 如果var2==var5 ,则进行var5 + var4操作
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
		//返回值是操作前的值
        return var5;
    }
           

上述方法调用了Unsafe类的getAndAddLong方法,该方法内部是个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。

在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。

这就是LongAdder引入的初衷——解决高并发环境下AtomicLong的自旋瓶颈问题。

LongAdder快在哪里?

既然说到LongAdder可以显著提升高并发环境下的性能,那么它是如何做到的?这里先简单的说下LongAdder的思路,第二部分会详述LongAdder的原理。

我们知道,AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。

LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

这种做法有没有似曾相识的感觉?没错,ConcurrentHashMap中的“分段锁”其实就是类似的思路。

LongAdder能否替代AtomicLong?

回答这个问题之前,我们先来看下LongAdder提供的API:

Java 原子性CAS 原理 ,atomic包 详解

可以看到,LongAdder提供的API和AtomicLong比较接近,两者都能以原子的方式对long型变量进行增减。

  • AtomicLong提供的功能其实更丰富,尤其是addAndGet、decrementAndGet、compareAndSet这些方法。addAndGet、decrementAndGet除了单纯的做自增自减外,还可以立即获取增减后的值,
  • LongAdder则需要做同步控制才能精确获取增减后的值。如果业务需求需要精确的控制计数,做计数比较,AtomicLong也更合适。

另外,从空间方面考虑,LongAdder其实是一种“空间换时间”的思想,从这一点来讲AtomicLong更适合。当然,如果你一定要跟我杠现代主机的内存对于这点消耗根本不算什么,那我也办法。

总之,低并发、一般的业务场景下AtomicLong是足够了。如果并发量很多,存在大量写多读少的情况,那LongAdder可能更合适。适合的才是最好的,如果真出现了需要考虑到底用AtomicLong好还是LongAdder的业务场景,那么这样的讨论是没有意义的,因为这种情况下要么进行性能测试,以准确评估在当前业务场景下两者的性能,要么换个思路寻求其它解决方案。

最后,给出国外一位博主对LongAdder和AtomicLong的性能评测,以供参考:

http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/

Java 原子性CAS 原理 ,atomic包 详解

LongAdder原理

之前说了,AtomicLong是多个线程针对单个热点值value进行原子操作。而LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作。

比如有三个ThreadA、ThreadB、ThreadC,每个线程对value增加10。

对于AtomicLong,最终结果的计算始终是下面这个形式:

value = 10 + 10 + 10 = 30 value=10+10+10=30

但是对于LongAdder来说,内部有一个base变量,一个Cell[]数组。

base变量:非竞态条件下,直接累加到该变量上

Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中的值

最终结果的计算是下面这个形式:

Java 原子性CAS 原理 ,atomic包 详解

等价于:value=base + Cell[0] + Cell[1] + Cell[2] +…+Cell[n-1]; (n为Cell数值的长度)

LongAdder的内部结构

LongAdder只有一个空构造器,其本身也没有什么特殊的地方,所有复杂的逻辑都在它的父类Striped64中。

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;

    public LongAdder() {
    }
}
           

来看下Striped64的内部结构,这个类实现一些核心操作,处理64位数据。

Striped64只有一个空构造器,初始化时,通过Unsafe获取到类字段的偏移量,以便后续CAS操作:

Striped64

abstract class Striped64 extends Number {
    Striped64() {
    }
    
   // Unsafe 类
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}
           

上面有个比较特殊的字段是threadLocalRandomProbe,可以把它看成是线程的hash值。这个后面我们会讲到。

定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过Unsafe来CAS操作它的值:

@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe 类
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
           

其它的字段:

可以看到Cell[]就是之前提到的槽数组,base就是非并发条件下的基数累计值。

/** CPUS的数目,以限制表大 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * cells数组。 如果为非null,则数组大小为2的幂。
     */
    transient volatile Cell[] cells;

    /**
     * 基本值,主要在没有争用时使用,也用作数组初始化过程中的后备。 通过CAS更新
     */
    transient volatile long base;

    /**
     * 扩容或者创建数组时使用的自旋锁(通过CAS锁定),将标志设置为1(加锁状态),
     * 初始化完毕或者扩容完毕时将此标志设置为0
     */
    transient volatile int cellsBusy;
           

LongAdder的核心方法

还是通过例子来看:

假设现在有一个LongAdder对象la,四个线程A、B、C、D同时对la进行累加操作。

LongAdder la = new LongAdder();
la.add(10);
           

①ThreadA调用add方法(假设此时没有并发)

// LongAdder方法
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    /**
     * Striped64 方法
     */
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
           

初始时Cell[]为null,base为0。所以ThreadA会调用casBase方法(定义在Striped64中),因为没有并发,CAS操作成功将base变为10,可以看到,如果线程A、B、C、D线性执行,那casBase永远不会失败,也就永远不会进入到base方法的if块中,所有的值都会累积到base中。

那么,如果任意线程有并发冲突,导致caseBase失败呢?失败就会进入if方法体:

Java 原子性CAS 原理 ,atomic包 详解

这个方法体会先再次判断Cell[]槽数组有没初始化过,如果初始化过了,以后所有的CAS操作都只针对槽中的Cell;否则,进入longAccumulate方法。整个add方法的逻辑如下图:

Java 原子性CAS 原理 ,atomic包 详解

可以看到,只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。

如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了。

Striped64的核心方法

我们来看下Striped64的核心方法longAccumulate到底做了什么:

//Striped64方法
final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
  int h;
  	//getProbe()方法给当前线程生成一个非0的hash值
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); 
        h = getProbe();
        wasUncontended = true;
    }
    //如果哈希取模映射得到的cell单元不为null,则为true,此值也可看做扩容意向
    boolean collide = false; 
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        //第一种可能:cells已经初始化了
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        //第二种可能,cells没有加锁也没有初始化,则尝试对cells加锁,并进行初始化
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2]; //初始化大小为2(必须为2的幂次方)
                    rs[h & 1] = new Cell(x); //根据hash值定位,并赋值
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        //第三种可能:cells正在初始化,则尝试直接在base上进行累加操作
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

           

上述代码首先给当前线程分配一个hash值,然后进入一个自旋,这个自旋分为三个分支:

  • 第一种可能:Cell[]数组已经初始化
  • 第二种可能:Cell[]数组未初始化
  • 第三种可能:Cell[]数组正在初始化中

第二种可能:Cell[]数组未初始化

我们之前讨论了,初始时Cell[]数组还没有初始化,所以会进入分支②:

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2]; //初始化大小为2(必须为2的幂次方)
                    rs[h & 1] = new Cell(x); //根据hash值定位,并赋值
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
           

首先会将cellsBusy置为1:加锁状态

/**
     * 将cellBusy字段从0设为1,以获取锁定。
     */
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
           

然后,初始化Cell[]数组(初始大小为2),根据当前线程的hash值计算映射的索引,并创建对应的Cell对象,Cell单元中的初始值x就是本次要累加的值。

第三种可能:Cell[]数组正在初始化中

如果在初始化过程中,另一个线程ThreadB也进入了longAccumulate方法,就会进入分支③:

//第三种可能:cells正在初始化,则尝试直接在base上进行累加操作
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
           

可以看到,分支③直接操作base基数,将值累加到base上。

第一种可能:Cell[]数组已经初始化

如果初始化完成后,其它线程也进入了longAccumulate方法,就会进入分支①:

//第一种可能:cells已经初始化了
 if ((as = cells) != null && (n = as.length) > 0) {
   		//当前线程的hash值运算后映射到Cell单元为null,说明该Cell没有被使用
       if ((a = as[(n - 1) & h]) == null) {
           if (cellsBusy == 0) {       // cells数字没有正在扩容
               Cell r = new Cell(x);  
               //尝试加锁,成功后cellsBusy=1
               if (cellsBusy == 0 && casCellsBusy()) {
                   boolean created = false;
                   try {              
                       Cell[] rs; int m, j;
                       //在有锁的情况下,再次坚持cell单元是否为null,为空则赋值
                       if ((rs = cells) != null &&
                           (m = rs.length) > 0 &&
                           rs[j = (m - 1) & h] == null) {
                           rs[j] = r;
                           created = true;
                       }
                   } finally {
                       cellsBusy = 0;
                   }
                   if (created)
                       break;
                   continue;           // Slot is now non-empty
               }
           }
           collide = false;
       }
       // wasUncontended表示前一次CAS更新Cell单元是否成功了
       else if (!wasUncontended)     
       		// 没成功,重置为true,后面会重新计算线程的hash值  
           wasUncontended = true;      
       else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
           break;
       else if (n >= NCPU || cells != as)//当cells数组的大小超过cpu核数后,永远都不会再扩容了
       		//
           collide = false;// 扩容标识,为false,表示不会再进行扩容
       else if (!collide)
           collide = true;
       else if (cellsBusy == 0 && casCellsBusy()) {//尝试加锁
           try {
               if (cells == as) {      
                   Cell[] rs = new Cell[n << 1];//扩容为原来的两倍
                   for (int i = 0; i < n; ++i)
                       rs[i] = as[i];
                   cells = rs;
               }
           } finally {
               cellsBusy = 0;
           }
           collide = false;
           continue;                   // Retry with expanded table
       }
       h = advanceProbe(h);//计算当前线程新的hash值
   }
           

LongAdder的sum方法

最后,我们来看下LongAdder的sum方法:

Java 原子性CAS 原理 ,atomic包 详解

sum求和的公式就是我们开头说的:

Java 原子性CAS 原理 ,atomic包 详解

需要注意的是,这个方法只能得到某个时刻的近似值,也就是如果要求一个准确无误的值,应该用AtomicLong接口,这也就是LongAdder并不能完全替代LongAtomic的原因之一。

LongAdder的其它兄弟

JDK1.8时,java.util.concurrent.atomic包中,除了新引入LongAdder外,还有引入了它的三个兄弟类:LongAccumulator、DoubleAdder、DoubleAccumulator

Java 原子性CAS 原理 ,atomic包 详解

LongAccumulator

LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。其构造函数如下:

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }
           

通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator接收2个long作为参数,并返回1个long)

LongAccumulator内部原理和LongAdder几乎完全一样,都是利用了父类Striped64的longAccumulate方法。这里就不再赘述了,可以自己阅读源码。

DoubleAdder和DoubleAccumulator

从名字也可以看出,DoubleAdder和DoubleAccumulator用于操作double原始类型。

与LongAdder的唯一区别就是,其内部会通过一些方法,将原始的double类型,转换为long类型,其余和LongAdder完全一样:

// DoubleAdder方法
 public void add(double x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null ||
            !casBase(b = base,
                     Double.doubleToRawLongBits
                     (Double.longBitsToDouble(b) + x))) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value,
                                      Double.doubleToRawLongBits
                                      (Double.longBitsToDouble(v) + x))))
                doubleAccumulate(x, null, uncontended);
        }
    }
           

参考

https://segmentfault.com/a/1190000015865714