天天看點

Java8 atomic包 源碼解析

  目錄

一、AtomicLong

二、AtomicLongArray

三、AtomicLongFieldUpdater

四、LongAdder

1、使用

2、定義

3、add

4、sum / reset / sumThenReset

五、LongAccumulator

六、AtomicMarkableReference 和 AtomicStampedReference

      atomic包提供了Boolean,Integer,Long和對象引用類型的值的原子更新的工具類,還提供了Double和Long類型的原子累加器實作,前者的實作是基于Unsafe類提供的方法,後者的實作是基于Striped64類,本篇部落格就以其中的典型類為例來說明其具體的使用方式和實作細節。

一、AtomicLong

      AtomicLong隻有一個執行個體屬性value,用volatile修飾,有兩個靜态屬性Unsafe執行個體和表示value屬性偏移量的valueOffset屬性,後者通過靜态代碼塊指派,如下:

Java8 atomic包 源碼解析

 重點關注以下方法的實作:

//修改該值,修改後對其他CPU立即可見
public final void lazySet(long newValue) {
   unsafe.putOrderedLong(this, valueOffset, newValue);
}

//底層會原子的修改為目标值并傳回原來的值
public final long getAndSet(long newValue) {
        return unsafe.getAndSetLong(this, valueOffset, newValue);
    }
//如果目前值是expect,則更新成update,如果更新成功傳回true
public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

//将目前值原子的加上delta,并傳回加之前的值
public final long getAndAdd(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta);
    }

           

上述Unsafe的方法實作如下:

//實作Unsafe的putOrderedLong方法
UNSAFE_ENTRY(void, Unsafe_SetOrderedLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong x))
  UnsafeWrapper("Unsafe_SetOrderedLong");

#ifdef SUPPORTS_NATIVE_CX8
  SET_FIELD_VOLATILE(obj, offset, jlong, x);
#
UNSAFE_END

#define SET_FIELD_VOLATILE(obj, offset, type_name, x) \
  oop p = JNIHandles::resolve(obj); \
  //将其作為一個volatile變量指針,修改值并通過記憶體屏障指令強制寫回記憶體,讓其他CPU都可以看見
  OrderAccess::release_store_fence((volatile type_name*)index_oop_from_field_offset_long(p, offset), truncate_##type_name(x));

//根據字段偏移量計算該屬性的記憶體位址
inline void* index_oop_from_field_offset_long(oop p, jlong field_offset) {
  jlong byte_offset = field_offset_to_byte_offset(field_offset);

  if (sizeof(char*) == sizeof(jint))    // (this constant folds!)
    return (address)p + (jint) byte_offset;
  else
    return (address)p +        byte_offset;
}

inline jlong field_offset_to_byte_offset(jlong field_offset) {
  return field_offset;
}

#define truncate_jlong(x) (x)

public final long getAndSetLong(Object o, long offset, long newValue) {
        long v;

        do {
            //擷取最新值
            v = getLongVolatile(o, offset);
        } while (!compareAndSwapLong(o, offset, v, newValue)); //如果目前值是v則更新成newValue,更新成功傳回true,否則傳回false,通過while循環繼續修改
        return v;
    }


//GetLongVolatile方法通過宏定義實作的,實作Unsafe的getLongVolatile方法
#ifdef SUPPORTS_NATIVE_CX8
DEFINE_GETSETOOP_VOLATILE(jlong, Long);
#endif

//這個宏同時定了Get和Set方法
#define DEFINE_GETSETOOP_VOLATILE(jboolean, Boolean) \
 \
UNSAFE_ENTRY(jboolean, Unsafe_Get##Boolean##Volatile(JNIEnv *env, jobject unsafe, jobject obj, jlong offset)) \
  UnsafeWrapper("Unsafe_Get"#Boolean); \
  GET_FIELD_VOLATILE(obj, offset, jboolean, v); \
  return v; \
UNSAFE_END \
 \
UNSAFE_ENTRY(void, Unsafe_Set##Boolean##Volatile(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jboolean x)) \
  UnsafeWrapper("Unsafe_Set"#Boolean); \
  SET_FIELD_VOLATILE(obj, offset, jboolean, x); \
UNSAFE_END \
 \

#define GET_FIELD_VOLATILE(obj, offset, type_name, v) \
  //從JNI引用中解析出oop
  oop p = JNIHandles::resolve(obj); \
  if (support_IRIW_for_not_multiple_copy_atomic_cpu) { \
    OrderAccess::fence(); \
  } \
  //基于記憶體屏障指令,讀取該位址最新的值
  volatile type_name v = OrderAccess::load_acquire((volatile type_name*)index_oop_from_field_offset_long(p, offset));

//用于實作Unsafe的compareAndSwapLong方法
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
  UnsafeWrapper("Unsafe_CompareAndSwapLong");
  Handle p (THREAD, JNIHandles::resolve(obj));
  //擷取屬性位址
  jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
  //通過cmpxchg和lock指令字首實作,會比較目前值與期望值是否一緻,如果一緻則更新,否則傳回目前值
  return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

//Unsafe的getAndAddLong方法實作
public final long getAndAddLong(Object o, long offset, long delta) {
        long v;
        do {
            //擷取最新值
            v = getLongVolatile(o, offset);
        } while (!compareAndSwapLong(o, offset, v, v + delta));//原子的修改,修改失敗傳回false,繼續重試,否則傳回true
        return v;
    }
           

二、AtomicLongArray

      AtomicLongArray定義的方法和實作和AtomicLong是一樣的,最大的差別在于後者需要根據修改的數組索引值來計算對應索引的數組元素的記憶體位址,而不是固定的value屬性偏移量了,其計算某個索引的記憶體位址的實作如下,以set方法為例說明:

public final void set(int i, long newValue) {
        //putLongVolatile對應的實作是Unsafe_SetLongVolatile,還是通過SET_FIELD_VOLATILE宏實作
        //即跟putOrderedLong的實作是一樣的
        unsafe.putLongVolatile(array, checkedByteOffset(i), newValue);
    }

//相對于array的偏移量
private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);

        return byteOffset(i);
    }


private static long byteOffset(int i) {
        //i<<shift實際是i*2^shift,用于計算前i個元素的記憶體大小
        return ((long) i << shift) + base;
    }

//base不是基位址,而是數組在記憶體中的表示,有一部位元組是固定,用來記錄數組長度和元素類型的
//base傳回這部分記憶體的位元組數,實際的數組元素存儲在該部分位元組的後面
private static final int base = unsafe.arrayBaseOffset(long[].class);

 static {
        //scale表示一個數組元素的位元組數,long下scale就是8
        int scale = unsafe.arrayIndexScale(long[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        //shift的結果就是3    
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }
           

三、AtomicLongFieldUpdater

      AtomicLongFieldUpdater用于給一個volatile long類型的屬性做原子更新,實際使用過程中如果直接将某個volatile long類型的屬性聲明成AtomicLong類型,則在讀該變量的時候都會增加額外的代碼,因為volatile關鍵字本身會保證CPU讀取該變量時永遠是最新的,volatile關鍵字不能保證修改是原子的,就可以使用AtomicLongFieldUpdater來完成修改。

其使用如下:

class Holder {
        //注意不能是static屬性,否則擷取屬性偏移量時報錯
        private volatile long test = 1;

        AtomicLongFieldUpdater<Holder> updater=AtomicLongFieldUpdater.newUpdater(Holder.class, "test");

        public long getTest() {
            return test;
        }

        public AtomicLongFieldUpdater<Holder> getUpdater() {
            return updater;
        }
    }

    @Test
    public void test() throws Exception {
        Holder holder=new Holder();
        CountDownLatch countDownLatch=new CountDownLatch(10);
        Runnable run=new Runnable() {
            @Override
            public void run() {
                for(int i=0;i<10000;i++) {
                    holder.getUpdater().addAndGet(holder, 1);
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        countDownLatch.await();
        System.out.println("main end,test->"+holder.getTest());
    }
           

   AtomicLongFieldUpdater是一個抽象類,newUpdater方法傳回該類的一個實作類執行個體,如下:

Java8 atomic包 源碼解析

其中CallerSensitive注解是為了Reflection.getCallerClass()方法能夠準确的找到目前方法的上層調用類的,比如通過反射調用此方法時可以跳過反射相關類找到調用反射方法的類。 VM_SUPPORTS_LONG_CAS是AtomicLong的一個靜态屬性,通過本地方法VMSupportsCS8初始化,64位下該屬性為true。CASUpdater和LockedUpdater都是AtomicLongFieldUpdater的兩個内部類,用來實作AtomicLongFieldUpdater的抽象方法,如下:

Java8 atomic包 源碼解析

CASUpdater的實作是基于Unsafe的,跟AtomicLong的實作是一樣的,LockedUpdater的實作是基于synchronized關鍵字的,也用到了Unsafe的方法,不過這些方法不保證原子更新。重點關注這兩的構造方法實作,兩者是一樣的,以CASUpdater為例說明,如下:

CASUpdater(final Class<T> tclass, final String fieldName,
                   final Class<?> caller) {
            final Field field;
            final int modifiers;
            try {
                //擷取對應的字段
                field = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<Field>() {
                        public Field run() throws NoSuchFieldException {
                            return tclass.getDeclaredField(fieldName);
                        }
                    });
                //擷取字段的修飾符    
                modifiers = field.getModifiers();
                //判斷是否有通路權限
                sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                    caller, tclass, null, modifiers);
                ClassLoader cl = tclass.getClassLoader();
                ClassLoader ccl = caller.getClassLoader();
                if ((ccl != null) && (ccl != cl) &&
                    ((cl == null) || !isAncestor(cl, ccl))) {
                    sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                }
            } catch (PrivilegedActionException pae) {
                throw new RuntimeException(pae.getException());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            //字段類型不是long
            if (field.getType() != long.class)
                throw new IllegalArgumentException("Must be long type");
            //不是volatile變量
            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");
            
            //初始化ccclass主要是為了accessCheck中校驗通路權限
            //如果是在子類中構造了這個CASUpdater,則cclass是子類,否則就是tclass
            this.cclass = (Modifier.isProtected(modifiers) && //是protected變量
                           tclass.isAssignableFrom(caller) && //tclass是caller的子類
                           !isSamePackage(tclass, caller)) //不是同一個包
                          ? caller : tclass;
            this.tclass = tclass;
            //擷取字段偏移量,要求該字段必須是執行個體屬性
            this.offset = U.objectFieldOffset(field);
        }
    
    //second是否first的父類加載器
    static boolean isAncestor(ClassLoader first, ClassLoader second) {
        ClassLoader acl = first;
        do {
            acl = acl.getParent();
            if (second == acl) {
                return true;
            }
        } while (acl != null);
        return false;
    }
    
    //如果兩個的類加載器和包名一樣傳回true
    private static boolean isSamePackage(Class<?> class1, Class<?> class2) {
        return class1.getClassLoader() == class2.getClassLoader()
               && Objects.equals(getPackageName(class1), getPackageName(class2));
}

    private static String getPackageName(Class<?> cls) {
        String cn = cls.getName();
        int dot = cn.lastIndexOf('.');
        return (dot != -1) ? cn.substring(0, dot) : "";
    }
   
   //調用具體的修改方法前必須調用accessCheck
   private final void accessCheck(T obj) {
            //要求obj必須是cclass的執行個體
            if (!cclass.isInstance(obj))
                throwAccessCheckException(obj);
        }
           

四、LongAdder

1、使用

       LongAdder是對AtomicLong在高并發下累加計算場景的優化版,AtomicLong底層是通過CAS加上while循環實作原子的累加計算的,高并發下,CAS原子修改失敗的機率會很高,會導緻線程多次重試,LongAdder底層也是CAS加上for循環,不過CAS修改的是Cell,Cell用來存儲累加的結果,每個線程會映射到一個Cell,且Cell的個數可以擴容到跟CPU核數一緻,即避免了多個線程同時CAS修改同一個變量,進而減少CAS原子修改失敗導緻的重試,提升系統性能。

參考如下測試用例:

@Test
    public void test2() throws Exception {
        AtomicLong atomicLong=new AtomicLong(0);
        CountDownLatch countDownLatch=new CountDownLatch(10);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(11);
        Runnable run=new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                for(int i=0;i<10000000;i++) {
                    atomicLong.incrementAndGet();
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        cyclicBarrier.await();
        long start=System.currentTimeMillis();
        countDownLatch.await();
        System.out.println("main end,time->"+(System.currentTimeMillis()-start)+",result->"+atomicLong.get());
    }

    @Test
    public void test3() throws Exception {
        LongAdder longAdder=new LongAdder();
        CountDownLatch countDownLatch=new CountDownLatch(10);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(11);
        Runnable run=new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                for(int i=0;i<10000000;i++) {
                    longAdder.add(1);
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        cyclicBarrier.await();
        long start=System.currentTimeMillis();
        countDownLatch.await();
        System.out.println("main end,time->"+(System.currentTimeMillis()-start)+",result->"+longAdder.longValue());
    }
           

 AtomicLong的耗時在2300ms左右,LongAdder的耗時在300ms左右,性能有大幅的提升。

2、定義

LongAdder提供的API相對于AtomicLong要簡單很多,隻能做加減,其中減是通過add一個負值實作的,而且LongAdder不支援設定初始值。LongAdder繼承自Striped64,其核心方法的實作都在Striped64中。Striped64包含的執行個體屬性如下:

/** 擷取CPU的個數 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    //Cell數組
    transient volatile Cell[] cells;

    //儲存long值
    transient volatile long base;

    //相當于一個鎖,當建立Cell數組或者擴容時使用,1表示加鎖了,0表示沒有鎖
    transient volatile int cellsBusy;
           

其中Cell的定義如下:

Java8 atomic包 源碼解析

Contended注解是為了避免由高速緩存行導緻的僞共享問題,Cell實際就是對long value的一個包裝,提供cas修改的能力。

 Striped64中包含的靜态屬性如下:

Java8 atomic包 源碼解析

3、add

     LongAdder的核心就是add方法,參數可以是正數或者負數,初始值都是0,其實作如下:

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        //cells為空就會執行casBase,不為空則進入if分支
        //casBase執行失敗會進入if分支
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||  //cells未初始化
                (a = as[getProbe() & m]) == null ||   //目前線程對應索引位的Cell未初始化
                !(uncontended = a.cas(v = a.value, v + x))) //執行cas失敗,即存在多個線程同時修改同一個Cell
                longAccumulate(x, null, uncontended);
        }
    }

 final boolean casBase(long cmp, long val) {
        //原子的修改
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

static final int getProbe() {
        //擷取目前線程的threadLocalRandomProbe屬性,相當于線程的唯一辨別,由ThreadLocalRandom負責初始化
        //這個唯一辨別也是原子遞增的
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

//wasUncontended表示是否cas失敗
 final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            //如果probe未初始化,則通過current方法強制初始化
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            //将屬性強制為true
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                //如果cells已經初始化了
                if ((a = as[(n - 1) & h]) == null) {
                    //如果目前線程對應的索引位的Cell為空
                    if (cellsBusy == 0) {       //此時無鎖
                        //建立一個新的Cell
                        Cell r = new Cell(x);   //注意此時是用x來初始化Cell的,x就是待增加的值
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //casCellsBusy傳回true表示加鎖成功
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 && //再次校驗cells完成初始化了
                                    rs[j = (m - 1) & h] == null)  //再次校驗對應的索引位Cell為空
                                { 
                                    //數組元素指派
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //解鎖
                                cellsBusy = 0;
                            }
                            if (created)
                                break; //已經建立則終止循環
                            continue;           //沒有建立,則繼續下一次for循環,直到建立完成為止
                        }
                        //目前鎖被占用了或者加鎖失敗
                    }
                    //目前鎖被占用了或者加鎖失敗,然後進入到下面的advanceProbe,下一次for循環目前線程就會進入另外一個索引位了
                    collide = false;
                }
                //進入下面的分支都是因為多個線程同時修改同一個Cell導緻cas修改失敗,此時wasUncontended必須為true,如果為false也給置為true
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      //置為true以後下一次for循環直接進入下一個else if分支
                //a就是目前線程對應的Cell,v就是該Cell的值,如果fn為null就直接相加,否則調用fn修改,最後cas原子修改,如果修改成功,則終止for循環 
                //如果此時cas原子修改失敗則繼續下面的else if分支   
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    //如果數組長度大于CPU個數 或者cells發生擴容了,相反的如果數組長度小于CPU個數且沒有擴容則進入下面的分支
                    collide = false;            //collide置為false以後就命中下一個else if分支,不會執行擴容了,跳轉到advanceProbe 
                else if (!collide) 
                    collide = true; //置為true以後,下一次進入這個else if分支就變成false了,進入下一個else if分支,執行擴容了
                else if (cellsBusy == 0 && casCellsBusy()) { //擷取鎖成功,如果失敗則for循環重試
                    try {
                        if (cells == as) {      //再次校驗cells未擴容
                            Cell[] rs = new Cell[n << 1]; //擴容一倍
                            //拷貝原來的數組元素
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            //cells重新指派    
                            cells = rs;
                        }
                    } finally {
                        //釋放鎖
                        cellsBusy = 0;
                    }
                    //置為false
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //增加probe,下一次for循環可能就命中其他的索引位了
                h = advanceProbe(h);
            }
            //cells未初始化,進入此分支 
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //嘗試擷取鎖,如果失敗則進入下面的else if分支
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) { //校驗未擴容
                        //初始數組長度就是2
                        Cell[] rs = new Cell[2];
                        //将目前線程對應Cell初始化
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //擷取鎖失敗或者cells已經完成擴容了,則通過casBase嘗試修改
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }//for循環結束
    }

//将其原子的置為1,傳回true表示加鎖成功
 final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }
           

cells是預設是null,隻有當cas失敗了才會初始化cells,初始化是數組長度是2,後面每次都擴容一倍, 直到達到CPU的個數為止;數組初始化時會初始化根據目前線程probe計算出來的索引位的Cell,擴容時不會初始化擴容的那一部分數組元素的Cell,而是等到某個線程命中了該索引位,發現對應Cell未初始化才會初始化。建立Cell時使用add的參數作為初始值的,建立之後的cas修改都是以此為基礎。線程probe屬性不是固定不變的,其初始值是0,然後通過ThreadLocalRandom.current()方法完成初始化,在for循環重試的過程中部分條件下會通過advanceProbe方法基于目前的probe值,利用位運算重新計算一個probe值出來,這樣下一輪for循環時,目前線程可能就命中了其他的索引位的Cell了。

4、sum / reset / sumThenReset

     sum方法用于擷取LongAdder關聯的變量的值,會将由LongAdder本身維護的base和由Cell數組中Cell維護的value的值累加起來;reset方法用于将base屬性和Cell的value屬性置為0,進而可以重複使用;sumThenReset在一次周遊中同時執行上述兩個邏輯,其實作如下:

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        //将base同cell數組中的Cell儲存的累加值求和
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

//将base和cell數組中的Cell中的value置為0
public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }

//先累加求和,再置為0
public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }
           

五、LongAccumulator

      LongAccumulator是對LongAdder的強化,不再是支援簡單的累加,可以自定義函數,底層實作的核心都是longAccumulate方法,其使用如下:

@Test
    public void test4() throws Exception {
        //可以指定初始值和具體的累加邏輯
        LongAccumulator longAccumulator=new LongAccumulator(new LongBinaryOperator() {
            @Override
            //left是原來的值,right是累加的值
            public long applyAsLong(long left, long right) {
                //此時結果是不固定的
                return left+right*3;
                //此時結果是固定的
//                return left+right;
            }
        }, 100);
        CountDownLatch countDownLatch=new CountDownLatch(10);
        Runnable run=new Runnable() {
            @Override
            public void run() {

                for(int i=0;i<1000000;i++) {
                    longAccumulator.accumulate(1);
                }
                countDownLatch.countDown();
            }
        };
        for(int i=0;i<10;i++){
            new Thread(run).start();
        }
        long start=System.currentTimeMillis();
        countDownLatch.await();
        System.out.println("main end,time->"+(System.currentTimeMillis()-start)+",result->"+longAccumulator.longValue());
    }
           

為啥left+right*3時計算的結果是不固定的了?不是因為計算過程中不是原子修改,而是因為最後算總和的時候也是通過LongBinaryOperator完成的,Cell中的值作為right,base值作為left,這樣Cell中值實際就算了2遍,Cell本身執行cas修改前會算一遍,算總和時再算一遍。因為并發時落到各個Cell的線程是随機的,即各Cell的值是随機的,最後算出來的結果就是随機的。而left+right就不存在算兩遍的問題,是以結果和LongAdder是一樣的。參考accumulate和get方法的實作,這兩個相當于LongAdder的add和sum方法,如下:

public void accumulate(long x) {
        Cell[] as; long b, v, r; int m; Cell a;
        if ((as = cells) != null ||
            (r = function.applyAsLong(b = base, x)) != b  //需要確定經過applyAsLong計算後結果不等于原來的值
            && !casBase(b, r)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended =
                  (r = function.applyAsLong(v = a.value, x)) == v ||
                  a.cas(v, r)))
                longAccumulate(x, function, uncontended);
        }
    }

public long get() {
        Cell[] as = cells; Cell a;
        long result = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                     //注意此時還是通過構造時傳入function來計算總和
                    result = function.applyAsLong(result, a.value);
            }
        }
        return result;
    }
           

六、AtomicMarkableReference 和 AtomicStampedReference

    很多部落格說這兩個類是為了解決AtomicReference的ABA問題,其實是錯誤的,所謂的ABA是指,線程1将某個變量從A修改成B,然後又修改成A,線程2判斷這個變量還是A,就将其修改成B,即線程2沒有感覺到這個變量線上程1中的改變。這個ABA其實不是一個問題,對線程2而言,本身也是無需關心這個變量是怎麼變化的,隻要它的目前值是A就行。另外ABA“問題”,不隻是AtomicReference特有的,所有類似的CAS動作都會存在的。

    這兩個類與AtomicReference的差別在于,前者是CAS修改一個Pair,後者是CAS修改一個對象引用,底層都是基于Unsafe,方法的用途也是一緻的。AtomicMarkableReference 的Pair定義如下:

Java8 atomic包 源碼解析

其中mark屬性通常用于表示該引用已經被邏輯删除了。AtomicStampedReference的Pair定義如下:

Java8 atomic包 源碼解析

這裡的stamp就相當于版本号。實際CAS比較時,除了會比較reference屬性還會比較stamp或者mark屬性,隻有這兩個都滿足才會執行更新。其使用示例如下:

@Test
    public void test5() throws Exception {
        Object obj=new Object();
        AtomicMarkableReference<Object> markableReference=new AtomicMarkableReference<>(obj, true);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(3);
        Thread a=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=markableReference.compareAndSet(obj,new Object(), true, false);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        a.start();
        Thread b=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=markableReference.compareAndSet(obj,new Object(), true, true);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        b.start();
        cyclicBarrier.await();
        cyclicBarrier.await();
        System.out.println("main end");
    }

    @Test
    public void test6() throws Exception {
        Object obj=new Object();
        AtomicStampedReference<Object> stampedReference=new AtomicStampedReference<Object>(obj, 1);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(3);
        Thread a=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=stampedReference.compareAndSet(obj,new Object(), 1, 2);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        a.start();
        Thread b=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                boolean result=stampedReference.compareAndSet(obj,new Object(), 1, 3);
                System.out.println(Thread.currentThread().getName()+",cas result->"+result);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        b.start();
        cyclicBarrier.await();
        cyclicBarrier.await();
        System.out.println("main end");
    }
           

繼續閱讀