天天看點

Java 原子性CAS 原理 ,atomic包 詳解

java.util.concurrent.atomic包類的原理

atomic包底層原理都是Unsafe.compareAndSwapXXX()方法,也就是通常說的CAS,compareAndSwapXXX方法是native辨別的方法,是Java底層代碼,不是Java代碼實作的,

public final class Unsafe {
	//object
    public final native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
	//int
    public final native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
	//long
    public final native boolean compareAndSwapLong(Object obj, long offset, long expect, long update);

}
           

都是有4個參數:

第一個參數 obj:要更新的變量名

第二個參數 offset:主記憶體中該變量的值

第三個參數 expect:期待的該變量的值

第四個參數 update:如果offset==expect,則更新obj為update值

java.util.concurrent.atomic包結構

Java 原子性CAS 原理 ,atomic包 詳解

AtomicBoolean

AtomicBoolean原理,有一個int 類型的成員變量,該值隻可能為1-true,0-false,兩個

AtomicBoolean有一點值得注意的,就是AtomicBoolean的方法compareAndSet(),隻會執行一次,

//AtomicBoolean 方法
   	public final boolean compareAndSet(boolean expect, boolean update) {
        int e = expect ? 1 : 0;
        int u = update ? 1 : 0;
        return unsafe.compareAndSwapInt(this, valueOffset, e, u);
    }
           

AtomicInteger 原子操作一個Integer類型的變量

以getAndAdd 方法為例,其他方法原理一樣:

//AtomicInteger 的方法
   	public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
    
    /**
    * Unsafe 類的方法
    * 三個參數,比如 int a = 2+1
    * 第一個參數var1代表變量名a
    * 第二個參數var2代表變量a期待的值為2
    * 第三個參數var4代表如果var2的值為2,則進行加法的值
    */
	public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
        	//var5 表示調用底層方法得到的變量var1在主記憶體中的值
            var5 = this.getIntVolatile(var1, var2);
            	// var1 表示目前變量,var2表示期待的值,var5表示該變量在底層的值,
            	// 如果var2==var5 ,則進行var5 + var4操作
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }
           

上面代碼,是個循環,隻有設定值成功才會退出,如果預期的值和底層的值不一樣,則還會進入循環,重新擷取底層變量的值var5 ,預期值var2也重新從對象var1中擷取,然後對比var2,var5,直到兩個值相等,則進行運算操作,退出循環。這裡可以看出,如果修改這個變量的線程非常多并發量非常大時,var2,var5一直不相等,cpu 就會一直空轉,浪費cpu資源,是以atomic包适合并發量不大的場景

AtomicIntegerArray原子操作Integer類型的數組的類

AtomicIntegerArray 和AtomicInteger差別不大,就是一個是單個Integer變量,一個是Integer數組,操作數組中的每個數也都是原子操作方法

以getAndAdd 方法為例,其他方法原理一樣:

//AtomicIntegerArray 方法,第一個參數數數組坐标,第二個參數是要加的值
	//傳回值為做加法前的數值
	public final int getAndAdd(int i, int delta) {
		//checkedByteOffset方法,擷取數組對應i坐标的變量值
        return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
    }

 	/**
    * Unsafe 類的方法
    * 三個參數,比如 int a = 2+1
    * 第一個參數var1代表變量名a
    * 第二個參數var2代表變量a期待的值為2
    * 第三個參數var4代表如果var2的值為2,則進行加法的值
    */
	public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
        	//var5 表示調用底層方法得到的變量var1在主記憶體中的值
            var5 = this.getIntVolatile(var1, var2);
            	// var1 表示目前變量,var2表示期待的值,var5表示該變量在底層的值,
            	// 如果var2==var5 ,則進行var5 + var4操作
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
		//傳回值是操作前的值
        return var5;
    }
           

AtomicIntegerFieldUpdater

這個類可以cas原子更新一個類的一個成員變量的值,并且要求成員變量非static并且volatile修飾,例如:

@Slf4j
public class Test {

    //泛型是類:Test,
    //newUpdater方法第一個參數是類的class,第二個參數數該類的成員變量名
    private static AtomicIntegerFieldUpdater<Test> updater = AtomicIntegerFieldUpdater.newUpdater(Test.class,"count");

    //注意該變量必須由volatile關鍵字修飾,
    //并且注意不能是包裝類,也不能是static修飾,否則會報錯
    private volatile int count=100;

    public static void main(String[] args) throws Exception {
         Test test = new Test();
        if(updater.compareAndSet(test ,100,120)){
            log.info("【1】如果類對象test 的成員變量count的值是100,則更新為120,----更新成功");
        }else{
            log.info("【2】如果類對象test 的成員變量count的值是100,則更新為120,----更新失敗");
        }
        if(updater.compareAndSet(test ,100,120)){
            log.info("【3】如果類對象test 的成員變量count的值是100,則更新為120,----更新成功");
        }else{
            log.info("【4】如果類對象test 的成員變量count的值是100,則更新為120,----更新失敗");
        }
    }

    public Integer getCount() {
        return count;
    }
}

           

運作結果:

14:29:54.953 [main] INFO test.Test - 【1】如果類對象test 的成員變量count的值是100,則更新為120,----更新成功
14:29:54.958 [main] INFO test.Test - 【4】如果類對象test 的成員變量count的值是100,則更新為120,----更新失敗
           

AtomicLong

同AtomicInteger

AtomicLongArray

同AtomicIntegerArray

AtomicLongFieldUpdater

同AtomicIntegerFieldUpdater

AtomicMarkableReference

這個跟AtomicStampedReference類似,也有個辨別,隻是AtomicStampedReference的版本号可以随意加,可以為很多版本,但是AtomicMarkableReferenced的辨別隻能為boolean值,即true,false,例子:

public class Test {
    /**
     * 構造函數,第一個參數是初始值,第二個參數是初始版本号
     */
    static AtomicMarkableReference<Integer> atomicStampedReference = new AtomicMarkableReference(0, false);

    public static void main(String[] args) throws InterruptedException {
        final int reference = atomicStampedReference.getReference();
        System.out.println("初始值:" + reference );

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("更新前值:" + reference + " --》  如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:"
                        + atomicStampedReference.compareAndSet(reference, reference + 10, false, true));
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                Integer reference = atomicStampedReference.getReference();
                System.out.println("更新前值:" + reference + " --》 如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:"
                        + atomicStampedReference.compareAndSet(reference, reference + 10, false, true));
            }
        });
        t1.start();
        t1.join();
        t2.start();
        t2.join();

        System.out.println("更新後值:" + atomicStampedReference.getReference());
    }


}

           

執行結果:

初始值:0
更新前值:0 --》  如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:true
更新前值:10 --》 如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:false
更新後值:10
           

AtomicReference

泛型傳入一個類,然後cas原子操作這個變量

public static void main(String[] args) throws Exception {
      AtomicReference<Character> characterAtomicReference = new AtomicReference<>('a');
      //如果characterAtomicReference 的值為a,則更新為c
      characterAtomicReference.compareAndSet('a','c'); //c
      characterAtomicReference.compareAndSet('b','d');//no
      characterAtomicReference.compareAndSet('c','e');//e
      characterAtomicReference.compareAndSet('d','g');//no
      characterAtomicReference.compareAndSet('e','j');//j
      characterAtomicReference.compareAndSet('j','w');//w
      System.out.println(characterAtomicReference.get());//w

  }
           

AtomicReferenceArray

同AtomicIntegerArray

AtomicReferenceFieldUpdater

同 AtomicIntegerFieldUpdater

AtomicStampedReference

該類,可以解決CAS的ABA問題,

cas 的ABA問題是指,變量a從1 更新為2,又從2更新為1,其他線程查詢是1,以為這個變量沒有更新,實際這個變量更新過。解決辦法,就是添加版本号,比如 變量a從1 更新為2版本為v1,又從2更新為1版本為v2,

//AtomicStampedReference 核心類
 public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&  //這裡多了版本比較,
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
           

LongAdder簡介

JDK1.8時,java.util.concurrent.atomic包中提供了一個新的原子類:LongAdder。

根據Oracle官方文檔的介紹,LongAdder在高并發的場景下會比它的前輩————AtomicLong 具有更好的性能,代價是消耗更多的記憶體空間:

那麼,問題來了:

為什麼要引入LongAdder? AtomicLong在高并發的場景下有什麼問題嗎? 如果低并發環境下,
LongAdder和AtomicLong性能差不多,那LongAdder是否就可以替代AtomicLong了?
           

為什麼要引入LongAdder?

我們知道,AtomicLong是利用了底層的CAS操作來提供并發性的,比如addAndGet方法:

/**
     * 以原子方式将給定值添加到目前值。
	 * 參數:增量-要添加的值
	 * 傳回值:更新值
     */
 	public final long addAndGet(long delta) {
    	//unsafe.getAndAddInt 方法進行cas 更新操作,傳回值是操作前的值
    	//addAndGet方法傳回的是操作後的值,是以是,傳回前值+delta
        return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
    }

	/**
    * Unsafe 類的方法
    * 三個參數,比如 int a = 2+1
    * 第一個參數var1代表變量名a
    * 第二個參數var2代表變量a期待的值為2
    * 第三個參數var4代表如果var2的值為2,則進行加法的值
    */
	public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
        	//var5 表示調用底層方法得到的變量var1在主記憶體中的值
            var5 = this.getIntVolatile(var1, var2);
            	// var1 表示目前變量,var2表示期待的值,var5表示該變量在底層的值,
            	// 如果var2==var5 ,則進行var5 + var4操作
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
		//傳回值是操作前的值
        return var5;
    }
           

上述方法調用了Unsafe類的getAndAddLong方法,該方法内部是個native方法,它的邏輯是采用自旋的方式不斷更新目标值,直到更新成功。

在并發量較低的環境下,線程沖突的機率比較小,自旋的次數不會很多。但是,高并發環境下,N個線程同時進行自旋操作,會出現大量失敗并不斷自旋的情況,此時AtomicLong的自旋會成為瓶頸。

這就是LongAdder引入的初衷——解決高并發環境下AtomicLong的自旋瓶頸問題。

LongAdder快在哪裡?

既然說到LongAdder可以顯著提升高并發環境下的性能,那麼它是如何做到的?這裡先簡單的說下LongAdder的思路,第二部分會詳述LongAdder的原理。

我們知道,AtomicLong中有個内部變量value儲存着實際的long值,所有的操作都是針對該變量進行。也就是說,高并發環境下,value變量其實是一個熱點,也就是N個線程競争一個熱點。

LongAdder的基本思路就是分散熱點,将value值分散到一個數組中,不同線程會命中到數組的不同槽中,各個線程隻對自己槽中的那個值進行CAS操作,這樣熱點就被分散了,沖突的機率就小很多。如果要擷取真正的long值,隻要将各個槽中的變量值累加傳回。

這種做法有沒有似曾相識的感覺?沒錯,ConcurrentHashMap中的“分段鎖”其實就是類似的思路。

LongAdder能否替代AtomicLong?

回答這個問題之前,我們先來看下LongAdder提供的API:

Java 原子性CAS 原理 ,atomic包 詳解

可以看到,LongAdder提供的API和AtomicLong比較接近,兩者都能以原子的方式對long型變量進行增減。

  • AtomicLong提供的功能其實更豐富,尤其是addAndGet、decrementAndGet、compareAndSet這些方法。addAndGet、decrementAndGet除了單純的做自增自減外,還可以立即擷取增減後的值,
  • LongAdder則需要做同步控制才能精确擷取增減後的值。如果業務需求需要精确的控制計數,做計數比較,AtomicLong也更合适。

另外,從空間方面考慮,LongAdder其實是一種“空間換時間”的思想,從這一點來講AtomicLong更适合。當然,如果你一定要跟我杠現代主機的記憶體對于這點消耗根本不算什麼,那我也辦法。

總之,低并發、一般的業務場景下AtomicLong是足夠了。如果并發量很多,存在大量寫多讀少的情況,那LongAdder可能更合适。适合的才是最好的,如果真出現了需要考慮到底用AtomicLong好還是LongAdder的業務場景,那麼這樣的讨論是沒有意義的,因為這種情況下要麼進行性能測試,以準确評估在目前業務場景下兩者的性能,要麼換個思路尋求其它解決方案。

最後,給出國外一位部落客對LongAdder和AtomicLong的性能評測,以供參考:

http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/

Java 原子性CAS 原理 ,atomic包 詳解

LongAdder原理

之前說了,AtomicLong是多個線程針對單個熱點值value進行原子操作。而LongAdder是每個線程擁有自己的槽,各個線程一般隻對自己槽中的那個值進行CAS操作。

比如有三個ThreadA、ThreadB、ThreadC,每個線程對value增加10。

對于AtomicLong,最終結果的計算始終是下面這個形式:

value = 10 + 10 + 10 = 30 value=10+10+10=30

但是對于LongAdder來說,内部有一個base變量,一個Cell[]數組。

base變量:非競态條件下,直接累加到該變量上

Cell[]數組:競态條件下,累加個各個線程自己的槽Cell[i]中的值

最終結果的計算是下面這個形式:

Java 原子性CAS 原理 ,atomic包 詳解

等價于:value=base + Cell[0] + Cell[1] + Cell[2] +…+Cell[n-1]; (n為Cell數值的長度)

LongAdder的内部結構

LongAdder隻有一個空構造器,其本身也沒有什麼特殊的地方,所有複雜的邏輯都在它的父類Striped64中。

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;

    public LongAdder() {
    }
}
           

來看下Striped64的内部結構,這個類實作一些核心操作,處理64位資料。

Striped64隻有一個空構造器,初始化時,通過Unsafe擷取到類字段的偏移量,以便後續CAS操作:

Striped64

abstract class Striped64 extends Number {
    Striped64() {
    }
    
   // Unsafe 類
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}
           

上面有個比較特殊的字段是threadLocalRandomProbe,可以把它看成是線程的hash值。這個後面我們會講到。

定義了一個内部Cell類,這就是我們之前所說的槽,每個Cell對象存有一個value值,可以通過Unsafe來CAS操作它的值:

@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe 類
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
           

其它的字段:

可以看到Cell[]就是之前提到的槽數組,base就是非并發條件下的基數累計值。

/** CPUS的數目,以限制表大 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * cells數組。 如果為非null,則數組大小為2的幂。
     */
    transient volatile Cell[] cells;

    /**
     * 基本值,主要在沒有争用時使用,也用作數組初始化過程中的後備。 通過CAS更新
     */
    transient volatile long base;

    /**
     * 擴容或者建立數組時使用的自旋鎖(通過CAS鎖定),将标志設定為1(加鎖狀态),
     * 初始化完畢或者擴容完畢時将此标志設定為0
     */
    transient volatile int cellsBusy;
           

LongAdder的核心方法

還是通過例子來看:

假設現在有一個LongAdder對象la,四個線程A、B、C、D同時對la進行累加操作。

LongAdder la = new LongAdder();
la.add(10);
           

①ThreadA調用add方法(假設此時沒有并發)

// LongAdder方法
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    /**
     * Striped64 方法
     */
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
           

初始時Cell[]為null,base為0。是以ThreadA會調用casBase方法(定義在Striped64中),因為沒有并發,CAS操作成功将base變為10,可以看到,如果線程A、B、C、D線性執行,那casBase永遠不會失敗,也就永遠不會進入到base方法的if塊中,所有的值都會累積到base中。

那麼,如果任意線程有并發沖突,導緻caseBase失敗呢?失敗就會進入if方法體:

Java 原子性CAS 原理 ,atomic包 詳解

這個方法體會先再次判斷Cell[]槽數組有沒初始化過,如果初始化過了,以後所有的CAS操作都隻針對槽中的Cell;否則,進入longAccumulate方法。整個add方法的邏輯如下圖:

Java 原子性CAS 原理 ,atomic包 詳解

可以看到,隻有從未出現過并發沖突的時候,base基數才會使用到,一旦出現了并發沖突,之後所有的操作都隻針對Cell[]數組中的單元Cell。

如果Cell[]數組未初始化,會調用父類的longAccumelate去初始化Cell[],如果Cell[]已經初始化但是沖突發生在Cell單元内,則也調用父類的longAccumelate,此時可能就需要對Cell[]擴容了。

Striped64的核心方法

我們來看下Striped64的核心方法longAccumulate到底做了什麼:

//Striped64方法
final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
  int h;
  	//getProbe()方法給目前線程生成一個非0的hash值
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); 
        h = getProbe();
        wasUncontended = true;
    }
    //如果哈希取模映射得到的cell單元不為null,則為true,此值也可看做擴容意向
    boolean collide = false; 
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        //第一種可能:cells已經初始化了
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        //第二種可能,cells沒有加鎖也沒有初始化,則嘗試對cells加鎖,并進行初始化
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2]; //初始化大小為2(必須為2的幂次方)
                    rs[h & 1] = new Cell(x); //根據hash值定位,并指派
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        //第三種可能:cells正在初始化,則嘗試直接在base上進行累加操作
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

           

上述代碼首先給目前線程配置設定一個hash值,然後進入一個自旋,這個自旋分為三個分支:

  • 第一種可能:Cell[]數組已經初始化
  • 第二種可能:Cell[]數組未初始化
  • 第三種可能:Cell[]數組正在初始化中

第二種可能:Cell[]數組未初始化

我們之前讨論了,初始時Cell[]數組還沒有初始化,是以會進入分支②:

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2]; //初始化大小為2(必須為2的幂次方)
                    rs[h & 1] = new Cell(x); //根據hash值定位,并指派
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
           

首先會将cellsBusy置為1:加鎖狀态

/**
     * 将cellBusy字段從0設為1,以擷取鎖定。
     */
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
           

然後,初始化Cell[]數組(初始大小為2),根據目前線程的hash值計算映射的索引,并建立對應的Cell對象,Cell單元中的初始值x就是本次要累加的值。

第三種可能:Cell[]數組正在初始化中

如果在初始化過程中,另一個線程ThreadB也進入了longAccumulate方法,就會進入分支③:

//第三種可能:cells正在初始化,則嘗試直接在base上進行累加操作
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
           

可以看到,分支③直接操作base基數,将值累加到base上。

第一種可能:Cell[]數組已經初始化

如果初始化完成後,其它線程也進入了longAccumulate方法,就會進入分支①:

//第一種可能:cells已經初始化了
 if ((as = cells) != null && (n = as.length) > 0) {
   		//目前線程的hash值運算後映射到Cell單元為null,說明該Cell沒有被使用
       if ((a = as[(n - 1) & h]) == null) {
           if (cellsBusy == 0) {       // cells數字沒有正在擴容
               Cell r = new Cell(x);  
               //嘗試加鎖,成功後cellsBusy=1
               if (cellsBusy == 0 && casCellsBusy()) {
                   boolean created = false;
                   try {              
                       Cell[] rs; int m, j;
                       //在有鎖的情況下,再次堅持cell單元是否為null,為空則指派
                       if ((rs = cells) != null &&
                           (m = rs.length) > 0 &&
                           rs[j = (m - 1) & h] == null) {
                           rs[j] = r;
                           created = true;
                       }
                   } finally {
                       cellsBusy = 0;
                   }
                   if (created)
                       break;
                   continue;           // Slot is now non-empty
               }
           }
           collide = false;
       }
       // wasUncontended表示前一次CAS更新Cell單元是否成功了
       else if (!wasUncontended)     
       		// 沒成功,重置為true,後面會重新計算線程的hash值  
           wasUncontended = true;      
       else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
           break;
       else if (n >= NCPU || cells != as)//當cells數組的大小超過cpu核數後,永遠都不會再擴容了
       		//
           collide = false;// 擴容辨別,為false,表示不會再進行擴容
       else if (!collide)
           collide = true;
       else if (cellsBusy == 0 && casCellsBusy()) {//嘗試加鎖
           try {
               if (cells == as) {      
                   Cell[] rs = new Cell[n << 1];//擴容為原來的兩倍
                   for (int i = 0; i < n; ++i)
                       rs[i] = as[i];
                   cells = rs;
               }
           } finally {
               cellsBusy = 0;
           }
           collide = false;
           continue;                   // Retry with expanded table
       }
       h = advanceProbe(h);//計算目前線程新的hash值
   }
           

LongAdder的sum方法

最後,我們來看下LongAdder的sum方法:

Java 原子性CAS 原理 ,atomic包 詳解

sum求和的公式就是我們開頭說的:

Java 原子性CAS 原理 ,atomic包 詳解

需要注意的是,這個方法隻能得到某個時刻的近似值,也就是如果要求一個準确無誤的值,應該用AtomicLong接口,這也就是LongAdder并不能完全替代LongAtomic的原因之一。

LongAdder的其它兄弟

JDK1.8時,java.util.concurrent.atomic包中,除了新引入LongAdder外,還有引入了它的三個兄弟類:LongAccumulator、DoubleAdder、DoubleAccumulator

Java 原子性CAS 原理 ,atomic包 詳解

LongAccumulator

LongAccumulator是LongAdder的增強版。LongAdder隻能針對數值的進行加減運算,而LongAccumulator提供了自定義的函數操作。其構造函數如下:

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }
           

通過LongBinaryOperator,可以自定義對入參的任意操作,并傳回結果(LongBinaryOperator接收2個long作為參數,并傳回1個long)

LongAccumulator内部原理和LongAdder幾乎完全一樣,都是利用了父類Striped64的longAccumulate方法。這裡就不再贅述了,可以自己閱讀源碼。

DoubleAdder和DoubleAccumulator

從名字也可以看出,DoubleAdder和DoubleAccumulator用于操作double原始類型。

與LongAdder的唯一差別就是,其内部會通過一些方法,将原始的double類型,轉換為long類型,其餘和LongAdder完全一樣:

// DoubleAdder方法
 public void add(double x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null ||
            !casBase(b = base,
                     Double.doubleToRawLongBits
                     (Double.longBitsToDouble(b) + x))) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value,
                                      Double.doubleToRawLongBits
                                      (Double.longBitsToDouble(v) + x))))
                doubleAccumulate(x, null, uncontended);
        }
    }
           

參考

https://segmentfault.com/a/1190000015865714