天天看点

Java8 atomic包 源码解析

  目录

一、AtomicLong

二、AtomicLongArray

三、AtomicLongFieldUpdater

四、LongAdder

1、使用

2、定义

3、add

4、sum / reset / sumThenReset

五、LongAccumulator

六、AtomicMarkableReference 和 AtomicStampedReference

      atomic包提供了Boolean,Integer,Long和对象引用类型的值的原子更新的工具类,还提供了Double和Long类型的原子累加器实现,前者的实现是基于Unsafe类提供的方法,后者的实现是基于Striped64类,本篇博客就以其中的典型类为例来说明其具体的使用方式和实现细节。

一、AtomicLong

      AtomicLong只有一个实例属性value,用volatile修饰,有两个静态属性Unsafe实例和表示value属性偏移量的valueOffset属性,后者通过静态代码块赋值,如下:

Java8 atomic包 源码解析

 重点关注以下方法的实现:

//修改该值,修改后对其他CPU立即可见
public final void lazySet(long newValue) {
   unsafe.putOrderedLong(this, valueOffset, newValue);
}

//底层会原子的修改为目标值并返回原来的值
public final long getAndSet(long newValue) {
        return unsafe.getAndSetLong(this, valueOffset, newValue);
    }
//如果当前值是expect,则更新成update,如果更新成功返回true
public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

//将当前值原子的加上delta,并返回加之前的值
public final long getAndAdd(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta);
    }

           

上述Unsafe的方法实现如下:

//实现Unsafe的putOrderedLong方法
UNSAFE_ENTRY(void, Unsafe_SetOrderedLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong x))
  UnsafeWrapper("Unsafe_SetOrderedLong");

#ifdef SUPPORTS_NATIVE_CX8
  SET_FIELD_VOLATILE(obj, offset, jlong, x);
#
UNSAFE_END

#define SET_FIELD_VOLATILE(obj, offset, type_name, x) \
  oop p = JNIHandles::resolve(obj); \
  //将其作为一个volatile变量指针,修改值并通过内存屏障指令强制写回内存,让其他CPU都可以看见
  OrderAccess::release_store_fence((volatile type_name*)index_oop_from_field_offset_long(p, offset), truncate_##type_name(x));

//根据字段偏移量计算该属性的内存地址
inline void* index_oop_from_field_offset_long(oop p, jlong field_offset) {
  jlong byte_offset = field_offset_to_byte_offset(field_offset);

  if (sizeof(char*) == sizeof(jint))    // (this constant folds!)
    return (address)p + (jint) byte_offset;
  else
    return (address)p +        byte_offset;
}

inline jlong field_offset_to_byte_offset(jlong field_offset) {
  return field_offset;
}

#define truncate_jlong(x) (x)

public final long getAndSetLong(Object o, long offset, long newValue) {
        long v;

        do {
            //获取最新值
            v = getLongVolatile(o, offset);
        } while (!compareAndSwapLong(o, offset, v, newValue)); //如果当前值是v则更新成newValue,更新成功返回true,否则返回false,通过while循环继续修改
        return v;
    }


//GetLongVolatile方法通过宏定义实现的,实现Unsafe的getLongVolatile方法
#ifdef SUPPORTS_NATIVE_CX8
DEFINE_GETSETOOP_VOLATILE(jlong, Long);
#endif

//这个宏同时定了Get和Set方法
#define DEFINE_GETSETOOP_VOLATILE(jboolean, Boolean) \
 \
UNSAFE_ENTRY(jboolean, Unsafe_Get##Boolean##Volatile(JNIEnv *env, jobject unsafe, jobject obj, jlong offset)) \
  UnsafeWrapper("Unsafe_Get"#Boolean); \
  GET_FIELD_VOLATILE(obj, offset, jboolean, v); \
  return v; \
UNSAFE_END \
 \
UNSAFE_ENTRY(void, Unsafe_Set##Boolean##Volatile(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jboolean x)) \
  UnsafeWrapper("Unsafe_Set"#Boolean); \
  SET_FIELD_VOLATILE(obj, offset, jboolean, x); \
UNSAFE_END \
 \

#define GET_FIELD_VOLATILE(obj, offset, type_name, v) \
  //从JNI引用中解析出oop
  oop p = JNIHandles::resolve(obj); \
  if (support_IRIW_for_not_multiple_copy_atomic_cpu) { \
    OrderAccess::fence(); \
  } \
  //基于内存屏障指令,读取该地址最新的值
  volatile type_name v = OrderAccess::load_acquire((volatile type_name*)index_oop_from_field_offset_long(p, offset));

//用于实现Unsafe的compareAndSwapLong方法
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
  UnsafeWrapper("Unsafe_CompareAndSwapLong");
  Handle p (THREAD, JNIHandles::resolve(obj));
  //获取属性地址
  jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
  //通过cmpxchg和lock指令前缀实现,会比较当前值与期望值是否一致,如果一致则更新,否则返回当前值
  return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

//Unsafe的getAndAddLong方法实现
public final long getAndAddLong(Object o, long offset, long delta) {
        long v;
        do {
            //获取最新值
            v = getLongVolatile(o, offset);
        } while (!compareAndSwapLong(o, offset, v, v + delta));//原子的修改,修改失败返回false,继续重试,否则返回true
        return v;
    }
           

二、AtomicLongArray

      AtomicLongArray定义的方法和实现和AtomicLong是一样的,最大的区别在于后者需要根据修改的数组索引值来计算对应索引的数组元素的内存地址,而不是固定的value属性偏移量了,其计算某个索引的内存地址的实现如下,以set方法为例说明:

public final void set(int i, long newValue) {
        //putLongVolatile对应的实现是Unsafe_SetLongVolatile,还是通过SET_FIELD_VOLATILE宏实现
        //即跟putOrderedLong的实现是一样的
        unsafe.putLongVolatile(array, checkedByteOffset(i), newValue);
    }

//相对于array的偏移量
private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);

        return byteOffset(i);
    }


private static long byteOffset(int i) {
        //i<<shift实际是i*2^shift,用于计算前i个元素的内存大小
        return ((long) i << shift) + base;
    }

//base不是基地址,而是数组在内存中的表示,有一部字节是固定,用来记录数组长度和元素类型的
//base返回这部分内存的字节数,实际的数组元素存储在该部分字节的后面
private static final int base = unsafe.arrayBaseOffset(long[].class);

 static {
        //scale表示一个数组元素的字节数,long下scale就是8
        int scale = unsafe.arrayIndexScale(long[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        //shift的结果就是3    
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }
           

三、AtomicLongFieldUpdater

      AtomicLongFieldUpdater用于给一个volatile long类型的属性做原子更新,实际使用过程中如果直接将某个volatile long类型的属性声明成AtomicLong类型,则在读该变量的时候都会增加额外的代码,因为volatile关键字本身会保证CPU读取该变量时永远是最新的,volatile关键字不能保证修改是原子的,就可以使用AtomicLongFieldUpdater来完成修改。

其使用如下:

class Holder {
        //注意不能是static属性,否则获取属性偏移量时报错
        private volatile long test = 1;

        AtomicLongFieldUpdater<Holder> updater=AtomicLongFieldUpdater.newUpdater(Holder.class, "test");

        public long getTest() {
            return test;
        }

        public AtomicLongFieldUpdater<Holder> getUpdater() {
            return updater;
        }
    }

    @Test
    public void test() throws Exception {
        Holder holder=new Holder();
        CountDownLatch countDownLatch=new CountDownLatch(10);
        Runnable run=new Runnable() {
            @Override
            public void run() {
                for(int i=0;i<10000;i++) {
                    holder.getUpdater().addAndGet(holder, 1);
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        countDownLatch.await();
        System.out.println("main end,test->"+holder.getTest());
    }
           

   AtomicLongFieldUpdater是一个抽象类,newUpdater方法返回该类的一个实现类实例,如下:

Java8 atomic包 源码解析

其中CallerSensitive注解是为了Reflection.getCallerClass()方法能够准确的找到当前方法的上层调用类的,比如通过反射调用此方法时可以跳过反射相关类找到调用反射方法的类。 VM_SUPPORTS_LONG_CAS是AtomicLong的一个静态属性,通过本地方法VMSupportsCS8初始化,64位下该属性为true。CASUpdater和LockedUpdater都是AtomicLongFieldUpdater的两个内部类,用来实现AtomicLongFieldUpdater的抽象方法,如下:

Java8 atomic包 源码解析

CASUpdater的实现是基于Unsafe的,跟AtomicLong的实现是一样的,LockedUpdater的实现是基于synchronized关键字的,也用到了Unsafe的方法,不过这些方法不保证原子更新。重点关注这两的构造方法实现,两者是一样的,以CASUpdater为例说明,如下:

CASUpdater(final Class<T> tclass, final String fieldName,
                   final Class<?> caller) {
            final Field field;
            final int modifiers;
            try {
                //获取对应的字段
                field = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<Field>() {
                        public Field run() throws NoSuchFieldException {
                            return tclass.getDeclaredField(fieldName);
                        }
                    });
                //获取字段的修饰符    
                modifiers = field.getModifiers();
                //判断是否有访问权限
                sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                    caller, tclass, null, modifiers);
                ClassLoader cl = tclass.getClassLoader();
                ClassLoader ccl = caller.getClassLoader();
                if ((ccl != null) && (ccl != cl) &&
                    ((cl == null) || !isAncestor(cl, ccl))) {
                    sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                }
            } catch (PrivilegedActionException pae) {
                throw new RuntimeException(pae.getException());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            //字段类型不是long
            if (field.getType() != long.class)
                throw new IllegalArgumentException("Must be long type");
            //不是volatile变量
            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");
            
            //初始化ccclass主要是为了accessCheck中校验访问权限
            //如果是在子类中构造了这个CASUpdater,则cclass是子类,否则就是tclass
            this.cclass = (Modifier.isProtected(modifiers) && //是protected变量
                           tclass.isAssignableFrom(caller) && //tclass是caller的子类
                           !isSamePackage(tclass, caller)) //不是同一个包
                          ? caller : tclass;
            this.tclass = tclass;
            //获取字段偏移量,要求该字段必须是实例属性
            this.offset = U.objectFieldOffset(field);
        }
    
    //second是否first的父类加载器
    static boolean isAncestor(ClassLoader first, ClassLoader second) {
        ClassLoader acl = first;
        do {
            acl = acl.getParent();
            if (second == acl) {
                return true;
            }
        } while (acl != null);
        return false;
    }
    
    //如果两个的类加载器和包名一样返回true
    private static boolean isSamePackage(Class<?> class1, Class<?> class2) {
        return class1.getClassLoader() == class2.getClassLoader()
               && Objects.equals(getPackageName(class1), getPackageName(class2));
}

    private static String getPackageName(Class<?> cls) {
        String cn = cls.getName();
        int dot = cn.lastIndexOf('.');
        return (dot != -1) ? cn.substring(0, dot) : "";
    }
   
   //调用具体的修改方法前必须调用accessCheck
   private final void accessCheck(T obj) {
            //要求obj必须是cclass的实例
            if (!cclass.isInstance(obj))
                throwAccessCheckException(obj);
        }
           

四、LongAdder

1、使用

       LongAdder是对AtomicLong在高并发下累加计算场景的优化版,AtomicLong底层是通过CAS加上while循环实现原子的累加计算的,高并发下,CAS原子修改失败的概率会很高,会导致线程多次重试,LongAdder底层也是CAS加上for循环,不过CAS修改的是Cell,Cell用来存储累加的结果,每个线程会映射到一个Cell,且Cell的个数可以扩容到跟CPU核数一致,即避免了多个线程同时CAS修改同一个变量,从而减少CAS原子修改失败导致的重试,提升系统性能。

参考如下测试用例:

@Test
    public void test2() throws Exception {
        AtomicLong atomicLong=new AtomicLong(0);
        CountDownLatch countDownLatch=new CountDownLatch(10);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(11);
        Runnable run=new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                for(int i=0;i<10000000;i++) {
                    atomicLong.incrementAndGet();
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        cyclicBarrier.await();
        long start=System.currentTimeMillis();
        countDownLatch.await();
        System.out.println("main end,time->"+(System.currentTimeMillis()-start)+",result->"+atomicLong.get());
    }

    @Test
    public void test3() throws Exception {
        LongAdder longAdder=new LongAdder();
        CountDownLatch countDownLatch=new CountDownLatch(10);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(11);
        Runnable run=new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                for(int i=0;i<10000000;i++) {
                    longAdder.add(1);
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        cyclicBarrier.await();
        long start=System.currentTimeMillis();
        countDownLatch.await();
        System.out.println("main end,time->"+(System.currentTimeMillis()-start)+",result->"+longAdder.longValue());
    }
           

 AtomicLong的耗时在2300ms左右,LongAdder的耗时在300ms左右,性能有大幅的提升。

2、定义

LongAdder提供的API相对于AtomicLong要简单很多,只能做加减,其中减是通过add一个负值实现的,而且LongAdder不支持设置初始值。LongAdder继承自Striped64,其核心方法的实现都在Striped64中。Striped64包含的实例属性如下:

/** 获取CPU的个数 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    //Cell数组
    transient volatile Cell[] cells;

    //保存long值
    transient volatile long base;

    //相当于一个锁,当创建Cell数组或者扩容时使用,1表示加锁了,0表示没有锁
    transient volatile int cellsBusy;
           

其中Cell的定义如下:

Java8 atomic包 源码解析

Contended注解是为了避免由高速缓存行导致的伪共享问题,Cell实际就是对long value的一个包装,提供cas修改的能力。

 Striped64中包含的静态属性如下:

Java8 atomic包 源码解析

3、add

     LongAdder的核心就是add方法,参数可以是正数或者负数,初始值都是0,其实现如下:

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        //cells为空就会执行casBase,不为空则进入if分支
        //casBase执行失败会进入if分支
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||  //cells未初始化
                (a = as[getProbe() & m]) == null ||   //当前线程对应索引位的Cell未初始化
                !(uncontended = a.cas(v = a.value, v + x))) //执行cas失败,即存在多个线程同时修改同一个Cell
                longAccumulate(x, null, uncontended);
        }
    }

 final boolean casBase(long cmp, long val) {
        //原子的修改
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

static final int getProbe() {
        //获取当前线程的threadLocalRandomProbe属性,相当于线程的唯一标识,由ThreadLocalRandom负责初始化
        //这个唯一标识也是原子递增的
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

//wasUncontended表示是否cas失败
 final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            //如果probe未初始化,则通过current方法强制初始化
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            //将属性强制为true
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                //如果cells已经初始化了
                if ((a = as[(n - 1) & h]) == null) {
                    //如果当前线程对应的索引位的Cell为空
                    if (cellsBusy == 0) {       //此时无锁
                        //创建一个新的Cell
                        Cell r = new Cell(x);   //注意此时是用x来初始化Cell的,x就是待增加的值
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //casCellsBusy返回true表示加锁成功
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 && //再次校验cells完成初始化了
                                    rs[j = (m - 1) & h] == null)  //再次校验对应的索引位Cell为空
                                { 
                                    //数组元素赋值
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //解锁
                                cellsBusy = 0;
                            }
                            if (created)
                                break; //已经创建则终止循环
                            continue;           //没有创建,则继续下一次for循环,直到创建完成为止
                        }
                        //当前锁被占用了或者加锁失败
                    }
                    //当前锁被占用了或者加锁失败,然后进入到下面的advanceProbe,下一次for循环当前线程就会进入另外一个索引位了
                    collide = false;
                }
                //进入下面的分支都是因为多个线程同时修改同一个Cell导致cas修改失败,此时wasUncontended必须为true,如果为false也给置为true
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      //置为true以后下一次for循环直接进入下一个else if分支
                //a就是当前线程对应的Cell,v就是该Cell的值,如果fn为null就直接相加,否则调用fn修改,最后cas原子修改,如果修改成功,则终止for循环 
                //如果此时cas原子修改失败则继续下面的else if分支   
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    //如果数组长度大于CPU个数 或者cells发生扩容了,相反的如果数组长度小于CPU个数且没有扩容则进入下面的分支
                    collide = false;            //collide置为false以后就命中下一个else if分支,不会执行扩容了,跳转到advanceProbe 
                else if (!collide) 
                    collide = true; //置为true以后,下一次进入这个else if分支就变成false了,进入下一个else if分支,执行扩容了
                else if (cellsBusy == 0 && casCellsBusy()) { //获取锁成功,如果失败则for循环重试
                    try {
                        if (cells == as) {      //再次校验cells未扩容
                            Cell[] rs = new Cell[n << 1]; //扩容一倍
                            //拷贝原来的数组元素
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            //cells重新赋值    
                            cells = rs;
                        }
                    } finally {
                        //释放锁
                        cellsBusy = 0;
                    }
                    //置为false
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //增加probe,下一次for循环可能就命中其他的索引位了
                h = advanceProbe(h);
            }
            //cells未初始化,进入此分支 
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //尝试获取锁,如果失败则进入下面的else if分支
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) { //校验未扩容
                        //初始数组长度就是2
                        Cell[] rs = new Cell[2];
                        //将当前线程对应Cell初始化
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //获取锁失败或者cells已经完成扩容了,则通过casBase尝试修改
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }//for循环结束
    }

//将其原子的置为1,返回true表示加锁成功
 final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }
           

cells是默认是null,只有当cas失败了才会初始化cells,初始化是数组长度是2,后面每次都扩容一倍, 直到达到CPU的个数为止;数组初始化时会初始化根据当前线程probe计算出来的索引位的Cell,扩容时不会初始化扩容的那一部分数组元素的Cell,而是等到某个线程命中了该索引位,发现对应Cell未初始化才会初始化。创建Cell时使用add的参数作为初始值的,创建之后的cas修改都是以此为基础。线程probe属性不是固定不变的,其初始值是0,然后通过ThreadLocalRandom.current()方法完成初始化,在for循环重试的过程中部分条件下会通过advanceProbe方法基于当前的probe值,利用位运算重新计算一个probe值出来,这样下一轮for循环时,当前线程可能就命中了其他的索引位的Cell了。

4、sum / reset / sumThenReset

     sum方法用于获取LongAdder关联的变量的值,会将由LongAdder本身维护的base和由Cell数组中Cell维护的value的值累加起来;reset方法用于将base属性和Cell的value属性置为0,从而可以重复使用;sumThenReset在一次遍历中同时执行上述两个逻辑,其实现如下:

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        //将base同cell数组中的Cell保存的累加值求和
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

//将base和cell数组中的Cell中的value置为0
public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }

//先累加求和,再置为0
public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }
           

五、LongAccumulator

      LongAccumulator是对LongAdder的强化,不再是支持简单的累加,可以自定义函数,底层实现的核心都是longAccumulate方法,其使用如下:

@Test
    public void test4() throws Exception {
        //可以指定初始值和具体的累加逻辑
        LongAccumulator longAccumulator=new LongAccumulator(new LongBinaryOperator() {
            @Override
            //left是原来的值,right是累加的值
            public long applyAsLong(long left, long right) {
                //此时结果是不固定的
                return left+right*3;
                //此时结果是固定的
//                return left+right;
            }
        }, 100);
        CountDownLatch countDownLatch=new CountDownLatch(10);
        Runnable run=new Runnable() {
            @Override
            public void run() {

                for(int i=0;i<1000000;i++) {
                    longAccumulator.accumulate(1);
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        long start=System.currentTimeMillis();
        countDownLatch.await();
        System.out.println("main end,time->"+(System.currentTimeMillis()-start)+",result->"+longAccumulator.longValue());
    }
           

为啥left+right*3时计算的结果是不固定的了?不是因为计算过程中不是原子修改,而是因为最后算总和的时候也是通过LongBinaryOperator完成的,Cell中的值作为right,base值作为left,这样Cell中值实际就算了2遍,Cell本身执行cas修改前会算一遍,算总和时再算一遍。因为并发时落到各个Cell的线程是随机的,即各Cell的值是随机的,最后算出来的结果就是随机的。而left+right就不存在算两遍的问题,所以结果和LongAdder是一样的。参考accumulate和get方法的实现,这两个相当于LongAdder的add和sum方法,如下:

public void accumulate(long x) {
        Cell[] as; long b, v, r; int m; Cell a;
        if ((as = cells) != null ||
            (r = function.applyAsLong(b = base, x)) != b  //需要确保经过applyAsLong计算后结果不等于原来的值
            && !casBase(b, r)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended =
                  (r = function.applyAsLong(v = a.value, x)) == v ||
                  a.cas(v, r)))
                longAccumulate(x, function, uncontended);
        }
    }

public long get() {
        Cell[] as = cells; Cell a;
        long result = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                     //注意此时还是通过构造时传入function来计算总和
                    result = function.applyAsLong(result, a.value);
            }
        }
        return result;
    }
           

六、AtomicMarkableReference 和 AtomicStampedReference

    很多博客说这两个类是为了解决AtomicReference的ABA问题,其实是错误的,所谓的ABA是指,线程1将某个变量从A修改成B,然后又修改成A,线程2判断这个变量还是A,就将其修改成B,即线程2没有感知到这个变量在线程1中的改变。这个ABA其实不是一个问题,对线程2而言,本身也是无需关心这个变量是怎么变化的,只要它的当前值是A就行。另外ABA“问题”,不只是AtomicReference特有的,所有类似的CAS动作都会存在的。

    这两个类与AtomicReference的区别在于,前者是CAS修改一个Pair,后者是CAS修改一个对象引用,底层都是基于Unsafe,方法的用途也是一致的。AtomicMarkableReference 的Pair定义如下:

Java8 atomic包 源码解析

其中mark属性通常用于表示该引用已经被逻辑删除了。AtomicStampedReference的Pair定义如下:

Java8 atomic包 源码解析

这里的stamp就相当于版本号。实际CAS比较时,除了会比较reference属性还会比较stamp或者mark属性,只有这两个都满足才会执行更新。其使用示例如下:

@Test
    public void test5() throws Exception {
        Object obj=new Object();
        AtomicMarkableReference<Object> markableReference=new AtomicMarkableReference<>(obj, true);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(3);
        Thread a=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=markableReference.compareAndSet(obj,new Object(), true, false);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        a.start();
        Thread b=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=markableReference.compareAndSet(obj,new Object(), true, true);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        b.start();
        cyclicBarrier.await();
        cyclicBarrier.await();
        System.out.println("main end");
    }

    @Test
    public void test6() throws Exception {
        Object obj=new Object();
        AtomicStampedReference<Object> stampedReference=new AtomicStampedReference<Object>(obj, 1);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(3);
        Thread a=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=stampedReference.compareAndSet(obj,new Object(), 1, 2);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        a.start();
        Thread b=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=stampedReference.compareAndSet(obj,new Object(), 1, 3);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        b.start();
        cyclicBarrier.await();
        cyclicBarrier.await();
        System.out.println("main end");
    }
           

继续阅读