天天看點

JUC—兩萬字的atomic原子類源碼深度解析1 atomic的概述2 原子更新單個變量3 原子更新數組4 原子更新字段屬性5 原子類的加強6 JMH性能測試7 atomic的總結

  基于JDK1.8詳細介紹了JUC下面的atomic子包中的大部分原子類的底層源碼實作,比如AtomicInteger、AtomicIntegerArray、AtomicStampedReference等原子類源碼。最後還介紹了JDK1.8對原子類的增強,比如LongAdder和LongAccumulator的原理!

文章目錄

  • 1 atomic的概述
  • 2 原子更新單個變量
    • 2.1 基本原子類
      • 2.1.1 重要屬性
      • 2.1.2 重要方法
    • 2.2 帶版本号的原子類
      • 2.2.1 重要屬性
      • 2.2.2 重要方法
      • 2.2.3 案例
  • 3 原子更新數組
    • 3.1 重要屬性
    • 3.2 重要方法
  • 4 原子更新字段屬性
  • 5 原子類的加強
    • 5.1 LongAdder
      • 5.1.1 LongAdder的概述
      • 5.1.2 LongAdder的原理
        • 5.1.2.1 内部結構
        • 5.1.2.2 add增加給定值
          • 5.1.2.2.1 longAccumulate統一處理
        • 5.1.2.3 increment自增
        • 5.1.2.4 sum統計
        • 5.1.2.5 reset重置
        • 5.1.2.6 sumThenReset統計并重置
    • 5.2 LongAccumulator
      • 5.2.1 LongAccumulator的概述
      • 5.2.2 LongAccumulator的原理
        • 5.2.2.1 内部結構
        • 5.2.2.2 accumulate更新給定值
      • 5.2.3 其他操作
      • 5.2.4 案例
      • 5.2.5 LongAccumulator的總結
  • 6 JMH性能測試
  • 7 atomic的總結

1 atomic的概述

  JDK1.5之前,為了保證Java中對單個變量的多個獨立操作的原子性和安全性,通常會使用到synchronized鎖,但是synchronized需要底層作業系統mutex資源的支援,這是一種重量級資源,性能比較低!

  JDK1.5的時候,新增了JUC包,增加了許多和同步有關的特性,大大提高了使用Java進行并發程式設計的效率,比如并發集合、并發隊列、新lock鎖等。另外,JUC包下面還提供了一個java.util.concurrent.atomic子包,這個atomic包中的類用于在多線程環境下實作單個變量多個獨立操作(比如讀-寫)的連續原子性,并且都比較高效,因為它們都是由基于偏移量(類似于指針)的非阻塞CAS算法實作,用于替代鎖的使用。

  JDK1.8的atomic包中具有17個原子類,根據支援的更新變量的類型,我們可以對常用原子類分為三種,分别是原子更新單個變量、原子更新數組、原子更新引用屬性(字段)。

  atomic 包下的常用原子類如下:

摘要
AtomicBoolean 用原子方式更新的 boolean 值。
AtomicInteger 用原子方式更新的 int 值。
AtomicLong 用原子方式更新的 long 值。
AtomicReference< V > 用原子方式更新的對象引用。
AtomicMarkableReference< V > 維護帶有boolean标志位的對象引用,可以原子方式對其進行更新。
AtomicStampedReference< V > 維護帶有int整數版本号的對象引用,可用原子方式對其進行更新。
AtomicIntegerArray 用原子方式更新其元素的 int 數組。
AtomicLongArray 用原子方式更新其元素的 long 數組。
AtomicReferenceArray< E > 用原子方式更新其元素的對象引用數組。
AtomicIntegerFieldUpdater< T > 基于反射的實用工具,可以對指定類的指定非私有非靜态的 volatile int 字段進行原子更新。
AtomicLongFieldUpdater< T > 基于反射的實用工具,可以對指定類的指定非私有非靜态的 volatile long 字段進行原子更新。
AtomicReferenceFieldUpdater< T,V > 基于反射的實用工具,可以對指定類的指定非私有非靜态的 volatile 引用字段進行原子更新 。
LongAdder JDK1.8新增加的原子類累加器,使用熱點資料分離的思想對long資料進行加法運算,性能更佳!
LongAccumulator JDK1.8新增加的原子類累加器,使用熱點資料分離的思想對long資料進行指定規則的運算,性能更佳!
DoubleAdder JDK1.8新增加的原子類累加器,使用熱點資料分離的思想對double資料進行加法運算,性能更佳!
DoubleAccumulator JDK1.8新增加的原子類累加器,使用熱點資料分離的思想對double資料進行指定規則的運算,性能更佳!

  實際上Java中atomic包下的原子類的基石就是:volatile字段修飾符+CAS算法(Unsafe提供)。本文沒有對這兩個基本知識點做深入講解,因為前面的文章中已經講了,都是深入到了虛拟機源碼級别,如果想要深入了解原子類的原理,應該要看看以下文章:Java中的volatile實作原理深度解析以及應用和Java中的CAS實作原理深度解析與應用案例。

2 原子更新單個變量

2.1 基本原子類

  通過原子的方式更新單個變量,Atomic包提供了以下4個基礎類:

  1. AtomicBoolean:用原子方式更新的 boolean 值。
  2. AtomicInteger:用原子方式更新的 int 值。
  3. AtomicLong:用原子方式更新的 long 值。
  4. AtomicReference< V >:用原子方式更新的對象引用。

  上面四個原子類的原理幾乎一緻,我們以AtomicInteger來講解。

2.1.1 重要屬性

  AtomicInteger 中儲存了一個核心字段value,它就代表了Atomiclnteger 的目前實際取值,所有的方法都是圍繞該值進行的。

  還有一個屬性valueOffset,它儲存着value 字段在Atomiclnteger 對象中的偏移量。Unsafe中的CAS方法都是通過字段的偏移量來操作字段的。

/**
 * 内部的value屬性,它就代表了Atomiclnteger 的目前實際取值。
 * 所有的方法都是圍繞該值進行的
 */
private volatile int value;

/**
 * 使用給定值初始化value
 *
 * @param initialValue 給定值
 */
public AtomicInteger(int initialValue) {
    value = initialValue;
}

/**
 * 初始化value值為0
 */
public AtomicInteger() {
}

/**
 * 内部實際上依賴于Unsafe類的方法,堆value值進行操作
 */
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
 * value字段的偏移量
 */
private static final long valueOffset;

static {
    try {
        //初始化value字段的偏移量
        valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) {
        throw new Error(ex);
    }
}
           

2.1.2 重要方法

/**
 * 擷取目前最新值
 *
 * @return 目前最新值
 */
public final int get() {
    return value;
}

/**
 * 設定給定新值
 *
 * @param newValue 新值
 */
public final void set(int newValue) {
    value = newValue;
}

/**
 * 原子性的将目前值設為給定新值,傳回舊值
 *
 * @param newValue 新值
 * @return 舊值
 */
public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}


/**
 * 如果目前值等于預期值,則以原子方式将該值設定為給定的新值
 *
 * @param expect 預期值
 * @param update the new value
 * @return true 更新成功 false 更新失敗
 */
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

/**
 * 原子性的将目前值加1,傳回舊值
 *
 * @return 舊值
 */
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}


/**
 * 原子性的将目前值減1,傳回舊值
 *
 * @return 傳回舊值
 */
public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}


/**
 * 原子性的将目前值增加delta,傳回舊值
 *
 * @param delta 增加的值
 * @return 舊值
 */
public final int getAndAdd(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta);
}


/**
 * 原子性的将目前值加1,傳回新值
 *
 * @return 更新後的值
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}


/**
 * 原子性的将目前值減1,傳回新值
 *
 * @return 更新後的值
 */
public final int decrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}


/**
 * 原子性的将目前值增加delta,傳回新值
 *
 * @param delta 增加的值
 * @return 更新後的值
 */
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}


/**
 1. 最終會設定成newValue,使用lazySet設定值後,可能導緻其他線程在之後的一小段時間内還是可以讀到舊的值。
 2. 關于該方法的更多資訊可以參考并發程式設計網翻譯的一篇文章《AtomicLong.lazySet是如何工作的?》,文章位址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
 3.  4. @param newValue 新值
 */
public final void lazySet(int newValue) {
    unsafe.putOrderedInt(this, valueOffset, newValue);
}
           

  可以看到,裡面的方法都是調用的Unsafe類方法,進行的CAS操作。

  Atomic包實際上隻提供了3種基本類型的原子更新:int、long、boolean,其中boolean也是轉換為int的0、1進行更新的,實際上并沒有char、float和double等的CAS操作,實際上char、 float、double都可以轉換為int或者long在進行操作,如果DoubleAdder就是采用Double.doubleToRawLongBits将double轉換為long類型的值在進行操作。

/*Unsafe隻提供了3種CAS方法.*/
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

/*AtomicBoolean源碼中,它是先把Boolean轉換成int類型,再使用compareAndSwapInt進行CAS操作*/
public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
           

2.2 帶版本号的原子類

  通過原子的方式更新單個變量的原子類的更新版,Atomic包提供了以下2個類:

  1. AtomicMarkableReference< V >:維護帶有标記位的對象引用,可以原子方式對其進行更新。
  2. AtomicStampedReference< V >:維護帶有整數标志的對象引用,可用原子方式對其進行更新。

  上面兩個原子類的方法以及原理幾乎一緻,屬于帶有版本号的原子類。我們知道CAS操作的三大問題之一就是“ABA”問題:CAS在操作值的時候,需要檢查預期值有沒有發生變化,如果沒有發生變化則更新。但是,如果一個線程t1首先擷取了預期值A,此時另一個線程t2則将值從A變成了B,随後又變成了A,随後t1再使用CAS進行比較交換的時候,會發現它的預期值“沒有變化”,但實際上是變化過的。這就是ABA問題的由來。

  ABA問題的解決思路就是使用版本号,1A->2B->3A,在Atomic包中,提供了一個現成的AtomicStampedReference類來解決ABA問題,使用的就是添加版本号的方法。還有一個AtomicMarkableReference實作類,它比AtomicStampedReference更加簡單,AtomicStampedReference中每更新一次資料版本号也會更新一次,這樣可以使用版本号統計到底更新了多少次,而AtomicMarkableReference僅僅使用了一個boolean值來表示值是否改變過,是以使用的比較少。

  這裡我們以AtomicStampedReference來講解。

2.2.1 重要屬性

  AtomicStampedReference内部不僅維護了我們的傳遞的對象reference,還維護了一個int類型的版本号stamp,它們都被存放到一個Pair類型的内部類執行個體中。當AtomicStampedReference 對應的資料被修改時,除了更新資料本身外,還必須要更新版本号,這個版本号一般都是自增的。當AtomicStampedReference 設定對象值時,對象值及版本号都必須滿足期望值,才會更新成功。

/**
 * Pair内部類,用于維護reference和stamp
 *
 * @param <T>
 */
private static class Pair<T> {
    /**
     * 真正的資料
     */
    final T reference;
    /**
     * 版本号
     */
    final int stamp;

    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }

    /**
     * 傳回Pair執行個體
     */
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}

/**
 * 由于要維護兩個屬性,是以幹脆使用一個内部類對象來維護這兩個屬性
 */
private volatile Pair<V> pair;

/**
 * 建立具有給定初始值的新 AtomicStampedReference。
 *
 * @param initialRef   初始值
 * @param initialStamp 初始版本号
 */
public AtomicStampedReference(V initialRef, int initialStamp) {
    //初始化一個Pair對象,并初始化屬性值
    pair = Pair.of(initialRef, initialStamp);
}
           

2.2.2 重要方法

  最重要的就是compareAndSet方法,它需要傳遞:期望值、新值、期望版本号、新版本号,當期望值和期望版本号都與此時内部的真實值和真實版本号相等的時候,就會調用compareAndSwapObject使用一個新的Pair對象替換舊的Pair對象,同時完成reference和stamp的更新。

/**
 * 如果目前引用 == 預期引用,并且目前版本号等于預期版本号,則以原子方式将該引用和該标志的值設定為給定的更新值。
 *
 * @param expectedReference 預期引用
 * @param newReference      新引用
 * @param expectedStamp     預期版本号
 * @param newStamp          新版本号
 * @return 如果成功,則傳回 true
 */
public boolean compareAndSet(V expectedReference,
                             V newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    //一系列的判斷,如果兩個預期值都相等,那麼嘗試調用compareAndSwapObject使用新的Pair對象替代舊的Pair對象
    //這樣就同時完成了reference和stamp的更新
    return
            expectedReference == current.reference &&
                    expectedStamp == current.stamp &&
                    ((newReference == current.reference &&
                            newStamp == current.stamp) ||
                            casPair(current, Pair.of(newReference, newStamp)));
}

/**
 * CAS替換内部的Pair對象的方法
 *
 * @param cmp 預期pair對象
 * @param val 新pair對象
 * @return 如果成功,則傳回 true
 */
private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}


/**
 * @return 獲得目前儲存的對象引用
 */
public V getReference() {
    return pair.reference;
}

/**
 * @return 獲得目前儲存的版本号
 */
public int getStamp() {
    return pair.stamp;
}


/**
 * 設定新對象引用和版本号
 *
 * @param newReference 新對象引用
 * @param newStamp     新版本号
 */
public void set(V newReference, int newStamp) {
    Pair<V> current = pair;
    //如果新對象引用以及新版本号和之前的都一樣那就不設定
    //否則就是建立一個Pair對象并設定相應的屬性,替代原來的Pair對象
    if (newReference != current.reference || newStamp != current.stamp)
        this.pair = Pair.of(newReference, newStamp);
}
           

2.2.3 案例

  實際上,如果更新的資料是無狀态的資料,那麼使用基本的原子類也可以完成目的,即如果線程A将值從1->2->1,而線程B僅僅是使用了值,這是沒什麼問題的,但是如果和業務相關聯,比較的對象是有狀态的,那麼可能會出現嚴重問題。

  比如還是線程A将值從1->2->1,而線程B的業務邏輯是如果發現資料改變過,那麼就不能操作,這樣的話就不能單純的比較值了,這就需要用到版本号了。

/**
 * @author lx
 */
public class AtomicStampedReferenceDemo {

    public static void main(String args[]) {
        //初始值為0,版本号為0
        AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(0, 0);

        Thread thread = new Thread(() -> {
            //先擷取标志位
            int timestamp = atomicStampedReference.getStamp();
            //擷取原值
            int reference = atomicStampedReference.getReference();
            System.out.println("原值reference: " + reference);
            //阻塞,等待被喚醒
            LockSupport.park();
            if (atomicStampedReference.compareAndSet(reference, reference + 1, timestamp, timestamp + 1)) {
                System.out.println("更新成功,新值reference: " + atomicStampedReference.getReference());
            } else {
                System.out.println("更新失敗,新值reference: " + atomicStampedReference.getReference());
                System.out.println("雖然原值和新值相等,但是是線上程阻塞過程中值發生了變化,變化了" + atomicStampedReference.getStamp() + "次");
            }
        });
        thread.start();


        Thread thread1 = new Thread(() -> {
            //對資料先加一再減一,反複4次,最終reference的值是不變的
            for (int i = 0; i < 4; i++) {
                int timestamp = atomicStampedReference.getStamp();
                int reference = atomicStampedReference.getReference();
                if (i % 2 == 0) {
                    atomicStampedReference.compareAndSet(reference, reference + 1, timestamp, timestamp + 1);
                } else {
                    atomicStampedReference.compareAndSet(reference, reference - 1, timestamp, timestamp + 1);
                }
            }
            //喚醒阻塞的thread線程
            LockSupport.unpark(thread);
        });
        thread1.start();
    }
}
           

  同樣的邏輯,使用普通原子類就能更新成功:

/**
 1. @author lx
 */
public class AtomicRefrenceDemo {

    public static void main(String args[]) {
        //初始值為0
        AtomicReference<Integer> atomicReference = new AtomicReference<Integer>(0);

        Thread thread = new Thread(() -> {
            int reference = atomicReference.get();
            System.out.println("原值reference: " + reference);
            //阻塞,等待被喚醒
            LockSupport.park();
            if (atomicReference.compareAndSet(reference, reference + 1)) {
                System.out.println("更新成功,新值reference: " + atomicReference.get());
            } else {
                System.out.println("更新失敗,新值reference: " + atomicReference.get());
            }
        });
        thread.start();


        Thread thread1 = new Thread(() -> {
            //對資料先加一再減一,反複4次,最終的值是不變的
            for (int i = 0; i < 4; i++) {
                int reference = atomicReference.get();
                if (i % 2 == 0) {
                    atomicReference.compareAndSet(reference, reference + 1);
                } else {
                    atomicReference.compareAndSet(reference, reference - 1);
                }
            }
            //喚醒阻塞的thread線程
            LockSupport.unpark(thread);
        });
        thread1.start();
    }
}
           

3 原子更新數組

  通過原子的方式更新數組裡的某個元素,Atomic包提供了以下3個類:

  1. AtomicIntegerArray:用原子方式更新其元素的 int 數組。
  2. AtomicLongArray:用原子方式更新其元素的 long 數組。
  3. AtomicReferenceArray< E >:用原子方式更新其元素的對象引用數組。

  上面三個原子類的原理幾乎一緻,我們以AtomicIntegerArray來講解。

3.1 重要屬性

  可以看到内部就是一個int的數組,然後調用Unsafe的方法對數組的元素進行操作。

/**
 * 使用Unsafe操作數組
 */
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
 * 傳回數組類型的第一個元素的偏移位址(基礎偏移位址)。
 * 如果arrayIndexScale方法傳回的比例因子不為0,你可以通過結合基礎偏移位址和比例因子通路數組的所有元素。
 */
private static final int base = unsafe.arrayBaseOffset(int[].class);
/**
 * scale最高位的1的所在位數(從左從0開始),在計算某個索引的偏移量的時候
 * 使用是該值進行位運算而不是scale進行傳統乘法運算,提升效率
 */
private static final int shift;
/**
 * 底層int數組
 */
private final int[] array;

static {
    //傳回數組單個元素的大小,數組中的元素的位址是連續的,64位虛拟機應該是4
    int scale = unsafe.arrayIndexScale(int[].class);
    //大小必須是2的幂次方
    if ((scale & (scale - 1)) != 0)
        throw new Error("data type scale not a power of two");
    //numberOfLeadingZeros用于傳回scale的最高非零位前面的0的個數,包括符号位在内;
    //31減去scale的最高非零位前面的0的個數,就表示scale最高位的1的所在位數,比如scale為2,那麼shift為1,如果scale為4,那麼shift為2
    shift = 31 - Integer.numberOfLeadingZeros(scale);
}

/**
 * 某個數組索引位置的元素的偏移量
 *
 * @param i 數組索引
 * @return 該索引的偏移量
 */
private long checkedByteOffset(int i) {
    if (i < 0 || i >= array.length)
        throw new IndexOutOfBoundsException("index " + i);
    return byteOffset(i);
}

/**
 * @param i 索引位置
 * @return 傳回某個數組索引位置的元素的偏移量
 */
private static long byteOffset(int i) {
    //這裡就能明白shift的作用了,對于2的幂次方的scale:
    //這裡可以使用scale的最高為1的位置shift的位運算i << shift,代替scale*i的傳統運算,效率提高
    //比如scale=4,那麼shift=2,如果i=3,那麼i<<shift = 3 << 2 = 12 就等于 scale*i = 4 * 3  = 12
    //比如scale=8,那麼shift=3,如果i=3,那麼i<<shift = 3 << 3 = 24 就等于 scale*i = 8 * 3  = 24
    return ((long) i << shift) + base;
}


/**
 * 建立給定長度的新 AtomicIntegerArray。
 *
 * @param length 給定長度
 */
public AtomicIntegerArray(int length) {
    array = new int[length];
}

/**
 * 建立與給定數組具有相同長度的新 AtomicIntegerArray,并從給定數組複制其所有元素。
 *
 * @param array 給定數組
 * @throws NullPointerException 如果數組為 null
 */
public AtomicIntegerArray(int[] array) {
    // 克隆數組,元素淺克隆
    this.array = array.clone();
}
           

3.2 重要方法

  其常用方法如下,基于Unsafe的volatile和CAS操作:

/**
 * 擷取i索引位置的目前值
 *
 * @param i 多赢
 * @return 目前值
 */
public final int get(int i) {
    return getRaw(checkedByteOffset(i));
}

private int getRaw(long offset) {
    //volatile的擷取最新值
    return unsafe.getIntVolatile(array, offset);
}

/**
 * 在i索引位置設定為指定新值
 *
 * @param i        索引
 * @param newValue 新值
 */
public final void set(int i, int newValue) {
    //volatile的寫
    unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}


/**
 * 以原子方式将元素設定在i索引位置,并傳回舊值
 *
 * @param i        索引
 * @param newValue 新值
 * @return 舊值
 */
public final int getAndSet(int i, int newValue) {
    return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}

/**
 * 以原子方式将輸入值與數組中索引i的元素相加,并傳回舊值
 *
 * @param i     索引
 * @param delta 相加的資料
 * @return 舊值
 */
public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

/**
 * 以原子方式将輸入值與數組中索引i的元素相加,并傳回新值
 *
 * @param i     索引
 * @param delta 相加的資料
 * @return 更新後的值
 */
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}


/**
 1. 如果目前值等于預期值,則以原子方式将數組位置i的元素設定成新值。
 2.  3. @param i      索引
 3. @param expect 預期值
 4. @param update 新值
 5. @return true表示CAS成功 false 表示CAS失敗
 */
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
           

4 原子更新字段屬性

  通過原子的方式更新對象裡的某個字段,Atomic包提供了以下3個類:

  1. AtomicIntegerFieldUpdater< T >:基于反射的實用工具,可以對指定類的指定非私有的 volatile int

    字段進行原子更新。

  2. AtomicLongFieldUpdater< T >:基于反射的實用工具,可以對指定類的指定非私有的 volatile long

    字段進行原子更新。

  3. AtomicReferenceFieldUpdater< T,V >:基于反射的實用工具,可以對指定類的指定非私有的 volatile

    引用字段進行原子更新。

  以上3個類的原理幾乎一樣,我們以AtomicIntegerFieldUpdater來講解。

  AtomicIntegerFieldUpdater實際上是一個抽象類,它的實作類實際上在它的内部而且是私有的,是以隻能使用靜态方法newUpdater()建立一個更新器,并且需要設定想要更新的類和屬性字元串名。

  另外,這裡對于對象的字段的設定是先采用getDeclaredField方法反射擷取的對應字段的Filed對象,然後在對Filed對象進行操作,并且沒有設定setAccessible權限,是以類的字段屬性不能是私有屬性!

  由于CAS 操作會通過對象執行個體中的偏移量直接進行指派,即Unsafe. objectFieldOffset()方法。是以,它不支援對static屬性的指派。

  對象的字段還應該被設定為volatile類型,這樣就能擷取到最新的值。

/**
 1. @author lx
 */
public class AtomicFieldUpdaterTest {
    public static void main(String[] args) {
        AtomicIntegerFieldUpdater<User> old = AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
        AtomicReferenceFieldUpdater<User, String> name = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");

        User user = new User("user", 10);

        System.out.println(old.getAndIncrement(user));
        System.out.println(old.get(user));

        System.out.println(name.getAndSet(user, "user2"));
        System.out.println(name.get(user));
    }

    public static class User {
        volatile String name;
        volatile int old;

        User(String name, int old) {
            this.name = name;
            this.old = old;
        }

        public String getName() {
            return name;
        }

        public int getOld() {
            return old;
        }
    }
}
           

5 原子類的加強

  JDK1.8的時候,新增了四個原子類:

  1. LongAdder:long類型的數值累加器,從0開始累加,累加規則為加法運算。
  2. LongAccumulator:long類型的數值累加器,可從指定值開始累加,可指定累加規則。
  3. DoubleAdder:double類型的數值累加器,從0開始累加,累加規則為加法運算。
  4. DoubleAccumulator:double類型的數值累加器,可從指定值開始累加,可指定累加規則。

  自從原子類問世之後,多線程環境下如果用于統計計數操作,一般可以使用AtomicLong來代替鎖作為計數器,AtomicLong 通過CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的同步器來說它的性能己經很好了,那麼,它們有什麼缺點嗎?

  實際上,AtomicLong等其他傳統的atomic原子類對于數值的更改,通常都是在一個無限循環(自旋)中不斷嘗試CAS 的修改操作,一旦CAS失敗則循環重試,這樣來保證最終CAS操作成功。如果競争不激烈,那麼修改成功的機率就很高,但是如果在高并發下大量線程頻繁的競争修改計數器,會造成一次CAS修改失敗的機率就很高。在大量修改失敗時,這些原子操作就會進行多次循環嘗試,白白浪費CPU 資源,是以性能還是會受到影響。

  JDK1.8新增這些類,正是為了解決高并發環境下由于頻繁讀寫AtomicLong等計數器而可能造成某些線程持續的空轉(循環)進而浪費CPU的情況,它們也被稱為“累加器”!

  LongAdder和DoubleAdder,LongAccumulator和DoubleAccumulator的原理差不多。實際上DoubleAdder中對于double的累加也是先通過Double.doubleToRawLongBits将double類型轉換為long類型來進行計算的,并且底層也是存儲的long類型的值,在擷取總和的時候又會通過Double.longBitsToDouble将存儲的long值轉換為double。

  下面我們将對LongAdder和DoubleAdder進行講解!

5.1 LongAdder

5.1.1 LongAdder的概述

public class LongAdder

  extends Number

  implements Serializable

  來自于JDK 1.8的LongAdder,作為一個long類型數值的累加器,被用來克服在高并發下使用AtomicLong 可能由于線程頻繁自旋而浪費CPU的缺點。

  LongAdder的解決方式是采用了“熱點資料分離”的基本思想:

  傳統的原子類的内部通常維護了一個對應類型的value屬性值,多個線程之間的CAS競争實際上就是在争奪對這個value屬性的更新權,但是CAS操作隻會保證同時隻有一個線程能夠更新成功,是以AtomicLong(包括其他傳統原子類)的性能瓶頸就是由于過多線程同時去競争一個變量的更新而産生的,那麼如果把一個變量分解為多個變量,讓同樣多的線程去競争多個資源,最終的結果就是統計被分解出來的多個變量的總和,這樣就能大大緩解多線程競争導緻的性能問題,這就是“熱點資料分離”的基本思想。這種思想在高并發環境下非常有用,類似的還有“減小鎖的粒度”的思想,除了新的原子類之外,在JDK1.8的ConcurrentHashMap中對于結點數量的統計并沒有采用單個變量計數,也是采用的類似于LongAdder的“熱點資料分離”的基本思想。

JUC—兩萬字的atomic原子類源碼深度解析1 atomic的概述2 原子更新單個變量3 原子更新數組4 原子更新字段屬性5 原子類的加強6 JMH性能測試7 atomic的總結

5.1.2 LongAdder的原理

5.1.2.1 内部結構

  下面是Striped64中的常用屬性,LongAdder實際上就是使用的這些屬性, 沒什麼自己的特别的屬性:

//Striped64中的屬性

/**
 * 用來實作CAS鎖的資源,值為0時表示沒有鎖,值為1時表示已上鎖,擴容Cell 數組或者初始化Cell 數組時會使用到該值
 * 使用CAS的同時唯一成功性來保證同一時刻隻有一條線程可以進入擴容Cell 數組或者初始化Cell 數組的代碼
 */
transient volatile int cellsBusy;

/**
 * volatile long 類型的基本屬性,在沒有CAS競争時用來統計計數
 */
transient volatile long base;

/**
 * volatile Cell類型的數組,要麼為null,當發生CAS更新base出現競争的時候初始化
 * 此後就一直使用該數組來統計計數,初始容量為2,數組可擴容,大小為2的幂次方
 */
transient volatile Striped64.Cell[] cells;

/**
 1. ccells數組的元素類型,由于是數組,導緻記憶體連續,是以可以使用緩存填充(注解方式)來避免僞共享。
 */
@sun.misc.Contended
static final class Cell {
    /**
     * 内部就是一個volatile long類型的基本屬性,線程對數組某個索引位置的更新實際上就是更新該值
     */
    volatile long value;

    Cell(long x) {
        value = x;
    }

    /**
     * 更新value指的CAS方法
     *
     * @param cmp 預期值
     * @param val 新值
     * @return true 成功  false 失敗
     */
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    //對于數值的更新都是CAS的操作

    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Striped64.Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
           

  LongAdder類繼承自Striped64類,Striped64被設計用來減少高并發環境的線程沖突,Striped64類是對外不可見的,也是這四個累加器類的公共抽象父類,它們很多的操作也是直接調用Striped64的方法,在Striped64 内部維護着三個主要的變量:

  1. cellsBusy :用來實作簡單的CAS鎖,狀态值隻有0和l,當建立Cell 元素,擴容Cell 數組或者初始化Cell 數組時,使用CAS 操作該變量來保證同時隻有一個線程可以進行其中之一的操作。
  2. base:volatile int類型的一個基本屬性,熱點分離的實作之一,在沒有存在并發CAS操作的時候記錄被用于記錄累加值,也用來記錄初始值。
  3. cells:volatile Cell[ ] 類型的一個對象數組,熱點分離的實作之二,當使用CAS更新base基值失敗(出現CAS競争)的時候,就會初始化該數組,然後嘗試通過更新該數組中的某個位置的值來記錄累加值。

  由此我們可以明确的知道,LongAdder的熱點分離思想的具體實作是将value分散為一個base變量+一個cells數組。

  這裡采用數組的用途很明顯,那就是對于并發下的線程随機配置設定到數組不同索引位置,并對該位置的值進行更新,是以理論上采用一個數組就行了,那麼為什麼不采用單獨一個數組還要加一個變量呢?在沒有競争的情況下,如果還是初始化一個數組然後更新數組某個索引的值就有些得不償失了,因為數組明顯比單個變量占用更多的空間,其更新效率也沒有單獨更新一個變量那麼塊。

  是以,綜合考慮下LongAdder采用一個變量base和一個數組cells一起來計數,它們的使用流程如下:在更新計數的時候如果沒有CAS競争,即并發度較低時就一直使用base變量來統計計數,此時cells數組是null,即沒有初始化或者鎖延遲初始化,就和AtomicLong一樣。一旦出現對base變量的CAS競争,即高并發環境下某些線程CAS更新base失敗,那麼就初始化cells數組,并且此後都使用cells數組來進行統計計數,如果數組某一個索引位置的Cell更新時仍然出現了競争,那麼cells數組可能會擴容或者尋找新的Cell。在統計總和時對base和cells數組中的值進行求和即可,這種方法在熱點分離的基礎上還優化了記憶體的開銷。

  初始化cells數組中的容量為2,擴容時必須保證容量為2的幂次方,數組裡面的資料是Cell 類型,Cell類中僅僅隻有一個value屬性,實際上就是對value值的封裝,封裝成為類的原因主要是友善調用方法對某個位置的value值進行CAS的更新,以及作緩存填充操作。

  因為數組的記憶體空間必須是連續的,而一個cell内部隻有一個int value屬性,非常有可能多個cell對象存在同一個緩存行中,當CAS的更新某一個Cell的值時會将該Cell所屬的緩存行失效,是以會同時造成其他位于同一個緩存行的相鄰Cell緩存也同時失效,這樣後續線程必須重主存擷取相鄰的Cell,這就造成了“僞共享”的問題,兩個Cell的通路應該是互不影響的,但是由于在同一個緩存行,造成了和“共享”的現象,是以稱為“僞共享”。這裡的Cell 類使用了 @sun.misc.Contended注解修飾,這是JDK1.8緩存填充的新方式,這樣一個Cell對象就占據一個緩存行的大小,解決了僞共享的問題,進一步提升了性能。 關于僞共享,可以看這篇文章:Java中的僞共享深度解析以及避免方法。

  LongAdder僅有一個空構造器:

/**
 * 空的構造器,Cell數組延遲初始化
 */
public LongAdder() {
}
           

5.1.2.2 add增加給定值

public void add(long x)

  add方法用于增加給定值。這個方法是LongAdder的核心方法之一。代碼比較多,且比較難以了解。

  大概步驟為:

  1. as變量儲存此時的cells數組。判斷當as為null時,那麼CAS更新base的值,如果更新成功,那麼add方法就結束了,這就是采用base屬性更新的邏輯。
  2. 如果as不為null,或者CAS更新base失敗之後,都會進入if代碼塊,内部就是采用cells數組更新的邏輯:
    1. uncontended變量表示沖突的标記,初始化true。
    2. if代碼塊中又是一個if判斷,通過||連接配接四個表達式:
      1. 如果as為null,說明cells數組沒有初始化。如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,繼續向後判斷;
      2. m等于cells數組長度減一。如果m小于0,說明數組沒初始化完畢。如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,繼續向後判斷。
      3. getProbe()用于擷取目前線程的threadLocalRandomProbe值,是一個随機生成的探測哈希值,不同的線程不一樣,初始值為0。通過getProbe() & m 計算目前線程應該通路數組的某個索引元素并指派給a。另外,threadLocalRandomProbe也被用在ThreadLocalRandom中。如果a為null,說明該索引位置還沒有初始化元素對象。如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,繼續向後判斷。
      4. 最後調用a.cas方法嘗試CAS的将base值從v = a.value更新為v+x,即,使用該位置記錄增加值,CAS的結果指派給uncontended,如果還是CAS更新失敗,即說明這個位置還是有沖突。如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,表示CAS成功,使用數組該位置的CAS記錄更新成功,那麼add方法結束。
      5. 即如果上面的四個表達式有一個傳回true,那麼就是進入if代碼塊,表示cells需要(正在)初始化、或者某個位置的Cell需要初始化,或者cells的競争激烈需要擴容。在if代碼塊中調用longAccumulate方法,該方法是Striped64的方法,用于進一步處理上面的問題并且最終會新增給定值成功,add方法結束。傳遞參數為:x、null、uncontended。實際上即使進入了longAccumulate方法,還是有可能最終會使用base屬性進行更新的,那就是多個線程判斷cells為null并同時進入的情況下,後面會講到。
/**
 * 增加給定值
 *
 * @param x 給定值
 */
public void add(long x) {
    //初始化一些變量
    Striped64.Cell[] as;
    long b, v;
    int m;
    Striped64.Cell a;
    /*
     * as儲存此時的cells數組:
     * 如果不為null,那麼直接進入if代碼塊
     * 如果為null,說明合格數組還沒有初始化,執行後面的casBase操作,b儲存base的值
     * 随後嘗試CAS的将base值從b更新為b + x,即使用base記錄增加值,如果CAS成功那麼不進入if代碼塊,add方法就結束了
     * CAS失敗同樣進入if代碼塊,表示CAS更新bae的值出現了并發競争
     *
     * 即,當cells數組為null并且CAS更新base的值成功之後,add方法就結束了,這就是采用base更新的邏輯
     * 如果cells不為null,或者CAS更新base失敗之後,都會進入if代碼塊,下面就是采用數組更新的邏輯
     */
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        //uncontended用于表示是否沒有進行CAS操作,初始化true,當CAS失敗的時候會變成false
        boolean uncontended = true;
        /*
         * as == null
         *      如果as為null,說明cells數組沒有初始化。如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,繼續向後判斷
         * (m = as.length - 1) < 0
         *      m等于cells數組長度減一,如果m小于0,說明數組沒初始化完畢。如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,繼續向後判斷
         * (a = as[getProbe() & m]) == null
         *      getProbe()用于擷取目前線程的threadLocalRandomProbe值,是一個随機生成的探測哈希值,不同的線程不一樣,初始值為0
         *      通過getProbe() & m 計算目前線程應該通路數組的某個索引元素并指派給a。另外,threadLocalRandomProbe也被用在ThreadLocalRandom中
         *      如果a為null,說明該索引位置還沒有初始化元素對象。如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,繼續向後判斷
         * !(uncontended = a.cas(v = a.value, v + x))
         *      最後調用a.cas方法嘗試CAS的将base值從v = a.value更新為v+x,即使用該位置記錄增加值,CAS的結果指派給uncontended,如果還是CAS更新失敗,即說明這個位置還是有沖突。
         *      如果條件滿足,繼續那麼進入if代碼塊,如果該條件不滿足,表示CAS成功,使用數組該位置的CAS記錄更新成功,那麼add方法結束
         *
         * 即如果上面的四個表達式有一個傳回true,那麼就是進入if代碼塊,表示cells需要/正在初始化、或者某個位置的Cell需要初始化,或者cells的競争激烈需要擴容
         */
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
            //調用longAccumulate方法進一步處理,該方法是Striped64的方法,傳遞x、null、uncontended
            longAccumulate(x, null, );
    }
}

/**
 * 嘗試CAS的将base值從cmp更新為val
 *
 * @param cmp 預期base值
 * @param val 新base值
 * @return 成功傳回true;失敗傳回false
 */
final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

/**
 1. 擷取目前線程的探測哈希值,不同的線程不一樣,初始值為0
 */
static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
           
5.1.2.2.1 longAccumulate統一處理

  在上面的add方法中,如果遇到base競争時,會使用cells記錄更新值,在使用cells的的時候可能會遇到這些情況:

  1. 因為cells數組是在需要它的時候才會初始化的,可能cells數組還未初始化,此時需要初始化cells數組;
  2. 計算出來的cells數組某個索引位置的Cell對象還未初始化,此時需要初始化該位置的Cell對象;
  3. 在CAS的更新某個Cell對象的時候,又發生了沖突,即多個線程定位到了同一個Cell,此時可能需要對cells數組擴容。

  面對這些情況,就需要調用longAccumulate方法進行統一的處理并最終更新給定值成功,該方法是Striped64的方法,LongAccumulator類中也是調用該方法。在LongAdder中調用該方法時,傳遞的參數為x、null、uncontended。

  大概步驟就是:

  1. 調用getProbe擷取此線程最新的probe指派給h,如果為0,說明probe還沒有初始化,即該線程第一次進入這個方法,那麼進行如下操作:
    1. 調用ThreadLocalRandom.current()初始化線程的probe屬性;
    2. 重新擷取此時線程最新的probe指派給h;
    3. 對于線程第一次進入的情況,如果是CAS失敗的原因肯定是因為probe沒有初始化才造成的CAS競争數組0索引結點失敗(0&m=0),此時wasUncontended為false。是以在probe初始化之後,将wasUncontended統一設定為true,表示“不存在CAS競争”,因為覺得線程初始化之後通過這個probe找到的新索引位置是大機率不會CAS失敗的。
  2. 初始化一個是否可能進行擴容的标志collide,如果為true,表示則下一次循環可能會進行擴容,如果為false,表示下一次循環一定不會進行擴容;
  3. 開啟一個死循環,相當于自旋,嘗試處理相應的情況并且并增加給定值。但是這裡的自旋的效率相比于元素針對單個vlaue變量的效率高得多:
    1. as儲存此時的cells數組,如果as不為null,說明cells數組已經初始化,并且n儲存此時的as數組的長度,如果n大于0,說明數組初始化完畢:
      1. 通過(n - 1) & h定位到目前線程對應的某個Cell并使用a變量儲存,這個定位方式就是類似于hash函數。
      2. 如果a為null,說明目前位置還沒有線程使用過,那麼嘗試在該位置新增一個Cell,對于數組某個位置新增Cell的操作需要保證線程安全:
        1. 如果cellsBusy為0,表示目前沒有其他線程在建立或擴容cells并且也沒有建立Cell,即沒有擷取CAS鎖:
          1. 建立一個Cell對象r,内部初始值就是x。
          2. 如果此時cellsBusy還是為0,那麼目前線程調用casCellsBusy方法嘗試将cellsBusy從0改為1,表示擷取CAS鎖。如果CAS成功那麼可以進入下面的if代碼塊,用于進行對上面新建立的Cell進行指派。這樣就可以控制同一時刻隻能有一個線程進入此if代碼塊,這裡相當于借助cellsBusy和CAS操作實作了一個CAS鎖,cellsBusy為0表示無鎖,cellsBusy為1表示有線程加鎖。
            1. created表示建立的Cell是否放入cells數組對應位置成功的标記,初始化為false,表示未成功。
            2. 重新擷取此時最新的數組并計算該線程在數組中的對應的索引位置j,并判斷如果該位置還是為null,那麼将上面建立的Cell對象r,放入數組對應的j索引位置,created改為true,表示放入cells數組對應位置成功。
            3. 無論上面有沒有成功,最終會将cellsBusy置為0,表示釋放CAS鎖。為什麼擷取CAS所緻後還需要校驗一次呢?因為可能在擷取上面擷取as之後加鎖成功之前,底層的cells數組被其他線程改變了,比如被擴容,比如該位置被其他線程搶先存入了Cell對象或者擴容後計算出的位置本來就有Cell對象等,是以在加鎖之後再一次檢查是很有必要的。
            4. 最後判斷created如果為true,表示放入cells數組對應位置成功,新增值x就是該位置新增元素的預設值,表示增加給定值成功,那麼break跳出這個死循環,longAccumulate方法結束。
            5. 到這一步,即created為false,表示放入cells數組對應位置失敗,對應的索引位置非null,那麼continue結束本次循環,本次競争到了鎖說明競争不是很激烈,下一次循環中有很大機率CAS成功,也不需要後續調用advanceProbe計算新的probe值了。
          3. 到這裡,表示cellsBusy為1,即有線程正在建立cells,或者在擴容cells,或者在建立Cell;或者競争CAS鎖失敗,collide置為false,下一次循環一定不會擴容,而是繼續嘗試。由于此時并發競争可能會比較嚴重,随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell。
      3. 否則,a不為null,表示cells計算出來的索引位置已經存在Cell。如果wasUncontended為false。表示此線程的probe在前面已經被初始化并且是因為add方法中的a.cas調用失敗才進來該方法的:
        1. wasUncontended重置為true,下一次循環中就可能會比對到後面的條件。随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell。
      4. 否則,表示wasUncontended為true,可能是:上面的條件滿足随後wasUncontended置為true并進行的第二次自旋,或者由于該線程getProbe()) == 0而在初始化probe之後将wasUncontended置為true。此時重新調用a.cas,嘗試CAS的更新位置的Cell的值,如果CAS成功,那麼表示增加給定值成功,break跳出循環,longAccumulate方法結束;
      5. 否則,表示上面的CAS更新失敗,還是發生了CAS沖突。判斷如果as數組長度n大于等于CPU的實際可用線程數量NCPU,表示達到了數組最大容量,不能夠再擴容了,此後隻能循環CAS直到成功;或者如果此時的cells不等于as了,表示數組被擴容了,那麼同樣需要重新嘗試CAS操作新數組。
        1. 上面的條件滿足一個,都會将collide設定為false,随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell。
      6. 否則,表示as數組長度n小于CPU的實際可用線程數量NCPU,并且此時的cells等于as了,表示沒有數組沒有擴容此時可以進行數組的擴容,但這裡僅僅判斷!collide是否為true,即collide是否為false:
        1. 如果collide為false,那麼将collide設定為true。随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell。即,在真正嘗試擴容之前,還需要再至少自旋一次,寄希望能夠CAS成功,盡量避免擴容操作!
      7. 否則,表示collide為true。到這裡,說明并發很嚴重,表示真正的可以擴容了。同樣首先需要通過cellsBusy擷取CAS鎖,如果失敗,随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell,如果成功擷取CAS鎖:
        1. 同樣需要校驗該數組是否是最新的數組,因為可能在加鎖之前此數組被其他線程擴容了。如果是同一個數組,那麼可以擴容:建立Cell數組,容量為原容量的兩倍,循環舊數組,将資料轉移到新數組對應的索引位置,将新數組賦給cells,到這裡,表示擴容成功。
        2. 無論上面有沒有擴容成功,最終會将cellsBusy置為0,表示釋放CAS鎖。
        3. 到這裡,表示擴容成功,或者擴容失敗此時是因為其他線程了擴容了數組,是以無論如何下一次循環都會使用新數組,collide重置為false。
        4. 由于擴容了數組,但是還沒有增加給定值,是以還需要繼續循環,但是由于後面循環使用更大的數組,是以将會有更大的機率CAS成功,直接continue結束本次循環,也不需要後續調用advanceProbe新的probe值了。
      8. 在該輪循環中數組已經初始化的情況下,如果沒有在後續的代碼中break跳出循環或則continue結束本次循環的操作,那麼都将會調用advanceProbe重新計算目前線程的probe值,下一次循環時就有很大機率得到另外一個的Cell位置,可以減少下次通路cells元素時的沖突機率。
    2. 否則,表示數組沒有初始化完畢,那麼這裡嘗試進行數組的初始化,初始化操作同一時刻隻能有一個線程進入,這裡需要擷取CAS鎖。如果此時cellsBusy為0,那麼表示目前沒有其他線程在建立、擴容cells并且也沒有建立Cell,但是有可能是其他線程已經初始化數組完畢了,并且如果此時cells 還是等于 as,說明數組确實沒有被初始化,并且如果調用casCellsBusy方法嘗試将cellsBusy從0改為1成功,那麼表示擷取了CAS鎖,可以進入下面的代碼塊,用于進行數組的初始化:
      1. init表示建立數組成功的标記,初始化為false,表示未成功;
      2. 這裡同樣需要校驗,因為可能在加鎖之前此cells被其他線程初始化了,盡管這樣的機率很低,如果通過,建立Cell數組,初始容量為2,通過 h&1 計算目前線程在該數組的位置,如果h的二進制數的最低位為1那麼計算結果就是1索引,否為計算結果就是0索引,随後建立一個Cell對象初始化值就是給定值x,并放入計算出的數組對應的索引位置,将新數組rs賦給cells,到這裡,表示初始化成功,同時增加給定值成功,init設定為true
      3. 無論上面有沒有成功,最終會将cellsBusy置為0,表示釋放CAS鎖。
      4. 最後判斷init如果為true,表示cells數組初始化同時增加給定值成功,那麼break跳出這個死循環,longAccumulate方法結束。
      5. 到這一步還沒有結束,表示cells數組初始化同時增加給定值失敗,因為數組已被其他線程初始化了,随後會進行下一次循環;
    3. 否則,上面兩個條件都不滿足,表示其他線程正在初始化數組,或者數組已被其他線程初始化完畢,或者CAS競争鎖失敗,那麼還是嘗試使用base變量來增加給定值,如果成功那麼break跳出這個死循環,longAccumulate方法結束。可以看到,即使進入了longAccumulate方法,還有有可能會使用base更新的,那就是多個線程判斷cells為null并同時進入的情況下。
    4. 到這裡,表示casBase失敗,那麼繼續下一次循環。

可以看到,源碼還是比較複雜的,如果實在搞不懂的同學隻需要記住我們前面講的大概流程就行了!

/**
 * CPU中通常一個核心一個線程,後來有了超線程技術,可以把一個實體核心,模拟成兩個邏輯核心,線程量增加一倍
 * 是以這裡擷取的是CPU的實際可用線程數量,比如 i7-8750H 它具有6核心12線程,是以擷取的就是12而不是6
 * 通常CPU的實際可用線程數量越高,運作并發的程式的效率也越高
 * <p>
 * CPU的實際可用線程數量NCPU在Striped64中被用來綁定cells數組的大小
 */
static final int NCPU = Runtime.getRuntime().availableProcessors();


/**
 * 處理cells初始化、調整容量、建立新Cell等情況,并增加給定值。
 *
 * @param x              給定值
 * @param fn             累加規則函數,如果是LongAdder就傳遞null,表示僅僅是加法運算,如果是LongAccumulator就可以傳遞指定累加規則
 * @param wasUncontended 如果在add方法的a.cas調用之前就調用該方法,那麼為true,表示cells未初始化或者某個Cell未初始化;否則為false,表示a.cas競争失敗
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    //擷取此線程最新的probe指派給h,如果為0,說明probe還沒有初始化,即該線程第一次進入這個方法
    if ((h = getProbe()) == 0) {
        //ThreadLocalRandom.current()方法本來是擷取線程自己的随機數生成器,同時會初始化線程自己的probe和threadLocalRandomSeed屬性
        //這裡被借用來初始化probe屬性
        ThreadLocalRandom.current(); // force initialization
        //重新擷取此時線程最新的probe指派給h
        h = getProbe();
        //對于線程第一次進入的情況,如果是CAS失敗的原因肯定是因為probe沒有初始化才造成的CAS競争數組0索引結點失敗(0&m=0),此時wasUncontended為false
        //是以在probe初始化之後,将wasUncontended統一設定為true,表示“不存在CAS競争”,因為覺得線程初始化之後通過這個probe找到的新索引位置是大機率不會CAS失敗的
        wasUncontended = true;
    }
    //是否可能進行擴容的标志,如果為true,表示則下一次循環可能會進行擴容,如果為false,表示下一次循環一定不會進行擴容
    boolean collide = false;                // True if last slot nonempty
    /*
     * 開啟一個死循環,相當于自旋,嘗試處理相應的情況并且并增加給定值
     * 但是這裡的自旋的效率相比于元素針對單個vlaue變量的效率高得多
     */
    for (; ; ) {
        Cell[] as;
        Cell a;
        int n;
        long v;
        /*
         * as儲存此時的cells,如果as不為null,說明cells數組已經初始化
         * 并且 n儲存此時的as數組的長度,如果n大于0,說明數組初始化完畢
         */
        if ((as = cells) != null && (n = as.length) > 0) {
            /*
             * 通過(n - 1) & h定位到目前線程對應的某個Cell并使用a變量儲存,這個定位方式就是類似于hash函數
             * 如果a為null,說明目前位置還沒有線程使用過,那麼嘗試在該位置新增一個Cell,對于數組某個位置新增Cell的操作需要保證線程安全
             */
            if ((a = as[(n - 1) & h]) == null) {
                //如果cellsBusy為0,表示目前沒有其他線程在建立或擴容cells并且也沒有建立Cell
                //即沒有擷取CAS鎖,此時需要建立一個Cell放到該位置
                if (cellsBusy == 0) {       // Try to attach new Cell
                    //建立一個Cell對象r,内部初始值就是x
                    Cell r = new Cell(x);   // Optimistically create
                    /*
                     * 如果此時cellsBusy還是為0,那麼表示目前沒有其他線程在建立或擴容cells并且也沒有建立Cell
                     * 然後目前線程調用casCellsBusy方法嘗試将cellsBusy從0改為1,表示擷取CAS鎖
                     * 如果CAS成功那麼可以進入下面的if代碼塊,用于進行對上面新建立的Cell進行指派
                     * 如果CAS失敗,說明此時有線程正在操作cells數組,那麼不會進if代碼塊
                     * 同一時刻隻能有一個線程進入此if代碼塊,這裡相當于借助cellsBusy和CAS操作實作了一個CAS鎖
                     * cellsBusy為0表示無鎖,cellsBusy為1表示有線程加鎖
                     */
                    if (cellsBusy == 0 && casCellsBusy()) {
                        //建立的Cell是否放入cells數組對應位置成功的标記,初始化為false表示未成功
                        boolean created = false;
                        //放置Cell的操作
                        try {               // Recheck under lock
                            Cell[] rs;
                            int m, j;
                            /*
                             * 這裡相當于再檢查一遍,為什麼要檢查呢,如果可能在上面擷取as之後加鎖成功之前,底層的cells數組被其他線程改變了
                             * 比如被擴容,比如該位置被其他線程搶先存入了Cell對象或者擴容後計算出的位置本來就有Cell對象等,是以在加鎖之後再一次檢查是很有必要的
                             *
                             * rs儲存此時的cells,如果rs不為null,說明cells數組已經初始化
                             * 并且 m儲存此時的rs數組的長度,如果m大于0,說明數組初始化完畢
                             * 并且 j儲存前線程定位到的Cell索引,該索引的元素為null,說明目前位置還沒有線程使用過
                             *
                             * 這三個條件都滿足,那麼進入if代碼塊
                             * 如果3個條件有一個不滿足,那麼不進入if代碼塊,将會進行下一次循環
                             */
                            if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                //将上面建立的Cell對象r,放入數組對應的j索引位置
                                rs[j] = r;
                                //created改為true,表示放入cells數組對應位置成功
                                created = true;
                            }
                        } finally {
                            //無論上面有沒有成功,最終會将cellsBusy置為0,表示釋放CAS鎖
                            cellsBusy = 0;
                        }
                        //最後判斷created如果為true,表示放入cells數組對應位置成功,
                        //新增值x就是該位置新增元素的預設值,表示增加給定值成功,那麼break跳出這個死循環,longAccumulate方法結束
                        if (created)
                            break;
                        //到這一步,即created為false,表示放入cells數組對應位置失敗,對應的索引位置非null,那麼continue結束本次循環,
                        //本次競争到了鎖說明競争不是很激烈,下一次循環中有很大機率CAS成功,也不需要後續調用advanceProbe計算新的probe值了
                        continue;           // Slot is now non-empty
                    }
                }
                //到這裡,表示cellsBusy為1,即有線程正在建立cells,或者在擴容cells,或者在建立Cell;或者競争CAS鎖失敗
                //collide置為false,下一次循環一定不會擴容,而是繼續嘗試
                collide = false;
                //由于此時并發競争可能會比較嚴重,随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell
            }
            /*
             * 否則,a不為null,表示cells計算出來的索引位置已經存在Cell,如果wasUncontended為false
             * 表示此線程的probe在前面已經被初始化并且是因為add方法中的a.cas調用失敗才進來該方法的
             */
            else if (!wasUncontended)       // CAS already known to fail
                //wasUncontended重置為true,下一次循環中就可能會比對到後面的條件
                //随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell
                wasUncontended = true;      // Continue after rehash
                /*
                 * 否則,表示wasUncontended為true,可能是:
                 * 上面的!wasUncontended條件滿足随後wasUncontended置為true并進行的第二次自旋
                 * 或者由于該線程getProbe()) == 0而在初始化probe之後将wasUncontended置為true
                 *
                 * 此時重新調用a.cas,嘗試CAS的更新該位置的Cell的值,如果CAS成功,那麼表示增加給定值成功,break跳出循環,longAccumulate方法結束
                 */
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                    fn.applyAsLong(v, x))))
                break;
                /*
                 * 否則,表示上面的CAS更新失敗,說明還是發生了CAS沖突
                 * 判斷如果as數組長度n大于等于CPU的實際可用線程數量NCPU,表示達到了數組最大容量,不能夠再擴容了,此後隻能循環CAS直到成功
                 * 或者如果此時的cells不等于as了,表示數組被擴容了,那麼同樣需要重新嘗試CAS操作新數組
                 *
                 * 上面的條件滿足一個,都會将collide設定為false,随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell
                 */
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
                /*
                 * 否則,表示as數組長度n小于CPU的實際可用線程數量NCPU,并且此時的cells等于as了,表示沒有數組沒有擴容
                 * 此時可以進行數組的擴容,但這裡僅僅判斷如果!collide為true,即collide為false,那麼将collide設定為true,
                 * 随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell
                 * 即,在真正嘗試擴容之前,還需要再至少自旋一次,寄希望能夠CAS成功,盡量避免擴容操作
                 *
                 *
                 * collide設定為true表示下一次循環可能進行擴容,但也不是一定的,下一次循環過程中,可能遇到:
                 *      數組被擴容了,計算的新位置沒有Cell,此時該線程會存放Cell,并跳出循環
                 *      CAS成功,跳出循環
                 *      遇到n >= NCPU,即數組不能再擴容了,那麼該線程的擴容操作就會被取消,collide = false,此後會一緻在這幾個操作中循環直到CAS成功
                 *      cells != as,即數組又被擴容了,那麼還會進入下一次循環,collide = false,對新數組進行嘗試
                 *
                 *      前面的都不滿足,此時由于上一次循環中将collide設定為了true,是以這裡的!collide不滿足,終于将會進入下面的else if條件,嘗試進行數組的擴容
                 */
            else if (!collide)
                collide = true;
                /*
                 * 否則,表示collide為true。到這裡,說明并發很嚴重,表示真正的可以擴容了
                 * 同樣首先需要通過cellsBusy擷取CAS鎖,如果失敗,随後會調用advanceProbe重新計算probe,并進行下一次循環,期望下一次循環時得到不同位置的Cell
                 * 但是這裡collide沒有被重置為false
                 */
            else if (cellsBusy == 0 && casCellsBusy()) {
                //加鎖成功之後
                try {
                    //同樣需要校驗該數組是否是最新的數組,因為可能在加鎖之前此數組被其他線程擴容了
                    if (cells == as) {      // Expand table unless stale
                        //建立Cell數組,容量為原容量的兩倍
                        Cell[] rs = new Cell[n << 1];
                        //循環舊數組,将資料轉移到新數組對應的索引位置
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        //将新數組賦給cells,到這裡,表示擴容成功
                        cells = rs;
                    }
                } finally {
                    //無論上面有沒有成功,最終會将cellsBusy置為0,表示釋放CAS鎖
                    cellsBusy = 0;
                }
                //到這裡,表示擴容成功,或者擴容失敗此時是因為其他線程了擴容了數組,是以無論如何下一次循環都會使用新數組
                //collide重置為false
                collide = false;
                //由于擴容了數組,但是還沒有增加給定值,是以還需要繼續循環,但是由于後面循環使用更大的數組,是以将會有更大的機率CAS成功
                //直接continue結束本次循環,也不需要後續調用advanceProbe新的probe值了
                continue;                   // Retry with expanded table
            }

            /*
             * 該輪循環中數組已經初始化的情況下,如果沒有在後續的代碼中break跳出循環或則continue結束本次循環的操作,那麼都将會重新計算目前線程的probe值,
             * 下一次循環時就有很大機率得到另外一個的Cell位置,可以減少下次通路cells元素時的沖突機率。
             */
            //使用xorshift算法生成随機數
            h = advanceProbe(h);
        }
        /*
         * 否則,表示數組沒有初始化,那麼這裡嘗試進行數組的初始化,初始化操作同一時刻隻能有一個線程進入,這裡需要擷取CAS鎖
         *
         * 如果 此時cellsBusy為0,那麼表示目前沒有其他線程在建立、擴容cells并且也沒有建立Cell,但是有可能是其他線程已經初始化數組完畢了
         * 并且 如果此時cells 還是等于 as,說明數組确實沒有被初始化
         * 并且 如果調用casCellsBusy方法嘗試将cellsBusy從0改為1成功,那麼可以進入下面的代碼塊,用于進行數組的初始化
         *
         * 如果上面的條件都滿足,那麼可以嘗試進行數組的初始化,否則表示數組已經被初始化了,将會進入最後一個else if條件
         */
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            //建立數組成功的标記,初始化為false表示未成功
            boolean init = false;
            //初始化數組的操作
            try {                           // Initialize table
                //同樣需要校驗,因為可能在加鎖之前此數組被其他線程初始化了,盡管這樣的機率很低
                if (cells == as) {
                    //建立Cell數組,初始容量為2
                    Cell[] rs = new Cell[2];
                    //通過 h&1 計算目前線程在該數組的位置,如果h的二進制數的最低位為1那麼計算結果就是1索引,否為計算結果就是0索引
                    //随後建立一個Cell對象初始化值就是給定值x,并放入計算出的數組對應的索引位置
                    rs[h & 1] = new Cell(x);
                    //将新數組rs賦給cells,到這裡,表示初始化成功,同時增加給定值成功
                    cells = rs;
                    //init設定為true
                    init = true;
                }
            } finally {
                //無論上面有沒有成功,最終會将cellsBusy置為0,表示釋放CAS鎖
                cellsBusy = 0;
            }
            //最後判斷init如果為true,表示cells數組初始化同時增加給定值成功,那麼break跳出這個死循環,longAccumulate方法結束
            if (init)
                break;
            //到這一步還沒有結束,表示cells數組初始化同時增加給定值失敗,因為數組已被其他線程除除四化了,随後會進行下一次循環
        }
        /*
         * 否則,上面兩個條件都不滿足,表示其他線程正在初始化數組,或者數組已被其他線程初始化完畢,或者CAS競争鎖失敗
         * 那麼還是嘗試使用base變量來增加給定值,如果成功那麼break跳出這個死循環,longAccumulate方法結束
         * 可以看到,即使進入了longAccumulate方法,還有有可能會使用base更新的,那就是多個線程判斷cells為null并同時進入的情況下
         */
        else if (casBase(v = base, ((fn == null) ? v + x :
                fn.applyAsLong(v, x))))

            break;                          // Fall back on using base
        //到這裡,表示casBase失敗,那麼繼續下一次循環
    }
}

/**
 * 嘗試CAS的将cellsBusy值從0更新為1,表示擷取了CAS鎖
 */
final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

/**
 * 使用xorshift算法基于目前probe僞随機的生成下一個probe值
 */
static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
}
           

5.1.2.3 increment自增

public void decrement()

  自增1,内部就是調用的add(1L)方法。

/**
 * 自增1,内部調用add方法,參數為1
 */
public void increment() {
    add(1L);
}
           

5.1.2.4 sum統計

public long sum()

  傳回目前總和。其實是base 的值與Cells 數組裡面所有Cell元素中的value 值的累加。可以看到僅僅對base和cells數組元素value的簡單累加,是以這個sum可能不是最新值,即不準确。

/**
 * 傳回目前總和。其實是base 的值與Cells 數組裡面所有Cell元素中的value 值的累加。
 *
 * @return 總和,非強一緻性的
 */
public long sum() {
    //as儲存此時的cells數組
    Cell[] as = cells;
    Cell a;
    //初始化sum,儲存base
    long sum = base;
    //如果as不為null
    if (as != null) {
        //那麼循環as數組将每一個元素的value相加,由于沒有加CAS鎖,此時的as可能不是最新的cells數組了
        for (int i = 0; i < as.length; ++i) {
            //如果某個索引位置元素不為nuull
            if ((a = as[i]) != null)
                //那麼對sum進行累加
                sum += a.value;
        }
    }
    //傳回sum
    return sum;
}
           

  longValue方法内部也是調用了sum方法:

public long longValue() {
    return sum();
}
           

5.1.2.5 reset重置

public void reset()

  reset為重置操作,将保持總和的變量重置為零。即base值置為0,如果有cells數組,則将每一個元素(如果存在)的value值置為0。

  該方法傳回之後不代表此時總和一定為0,因為可能前面剛剛将某個位置的值置為0,後面馬上被其他線程增加了值,是以這個方法也沒有任何保證。

/**
 * reset為重置操作,将保持總和的變量重置為零。
 * 即base值置為0,如果有cells數組,則将每一個元素(如果存在)的value值置為0 。
 */
public void reset() {
    //as儲存此時的cells數組
    Cell[] as = cells;
    Cell a;
    //base重置為0
    base = 0L;
    //如果as不為null
    if (as != null) {
        //那麼循環as數組将每一個元素的value置為0,由于沒有加CAS鎖,此時的as可能不是最新的cells數組了
        for (int i = 0; i < as.length; ++i) {
            //如果某個索引位置元素不為nuull
            if ((a = as[i]) != null)
                //該位置元素的value置為0
                a.value = 0L;
        }
    }
}
           

5.1.2.6 sumThenReset統計并重置

  相當于sum()後跟reset()。

/**
 * 相當于sum()後跟reset()。
 *
 * @return 總和
 */
public long sumThenReset() {
    //as儲存此時的cells數組
    Cell[] as = cells;
    Cell a;
    //初始化sum,儲存base
    long sum = base;
    //base重置為0
    base = 0L;
    //如果as不為null
    if (as != null) {
        //那麼循環as數組将每一個元素的value值相加,随後置為0,由于沒有加CAS鎖,此時的as可能不是最新的cells數組了
        for (int i = 0; i < as.length; ++i) {
            //如果某個索引位置元素不為null
            if ((a = as[i]) != null) {
                //那麼對sum進行累加
                sum += a.value;
                //該位置元素的value置為0
                a.value = 0L;
            }
        }
    }
    //傳回sum
    return sum;
}
           

5.2 LongAccumulator

5.2.1 LongAccumulator的概述

public class LongAccumulator

  extends Number

  implements Serializable

  LongAccumulator同樣是來自于JDK1.8的atomic包,和LongAdder都繼承了Striped64,但是LongAccumulator 比LongAdder 的功能更強大。

  LongAccumulator 相比于LongAdder,可以為累加器提供非0 的初始值,後者隻能提供預設的0 值。另外,前者還可以指定累加規則,比如不進行累加而進行相乘,隻需要在構造LongAccumulator 時傳入自定義的雙目運算器即可,後者則隻能是累加規則。

5.2.2 LongAccumulator的原理

5.2.2.1 内部結構

  LongAccumulator内部具有兩個自己的屬性,一個LongBinaryOperator類型的雙目運算器執行個體,用于儲存累加規則。一個identity用于儲存指定的初始值,也是base的初始值,在reset等重置方法調用的時候也會指派為base的值。

  隻有一個構造器,可以傳遞指定的累加規則和指定初始值。

/**
 * 内部的LongBinaryOperator類型的屬性,用于儲存傳入的雙目運算器
 * LongBinaryOperator是一個函數式接口,就是對累加規則的封裝
 */
private final LongBinaryOperator function;

/**
 * identity用于儲存的初始值,也是base的初始值,在reset等重置方法調用的時候也會指派為base的值
 */
private final long identity;

/**
 * 使用給定的累加器和初始值建立新執行個體。
 *
 * @param accumulatorFunction 一個雙目運算器執行個體,可以對兩個long類型的值進行計算
 *                            如果為null那麼在計算時将會抛出NullPointerException
 * @param identity            初始值
 */
public LongAccumulator(LongBinaryOperator accumulatorFunction,
                       long identity) {
    //為function指派
    this.function = accumulatorFunction;
    //為base和identity指派
    base = this.identity = identity;
}
           

5.2.2.2 accumulate更新給定值

public void accumulate(long x)

  具有給定值的更新。作為LongAccumulator的核心方法!

  accumulate方法類似于LongAdder的add方法,差別在于可以使用構造器中指定的累加器中的累加規則更新資料。

  這裡的LongBinaryOperator是一個JDK1.8添加的函數式接口,主要是為了lambda的調用,這個接口封裝了對兩個long類型參數的操作規則,通過調用applyAsLong方法對傳入的參數進行操作并傳回操作的結果。

/**
 * 更新方法,與LongAdder的add方法差别在于:
 * 1.調用casBase時LongAdder傳遞的是b+x,LongAccumulator則使用了r = function .applyAsLong(b = b ase, x) 自定義的規則來計算。
 * 2.調用longAccumulate 時第二個參數LongAdder傳遞的是null,LongAccumulator傳遞了function累加器
 */
public void accumulate(long x) {
    Striped64.Cell[] as;
    long b, v, r;
    int m;
    Striped64.Cell a;

    if ((as = cells) != null ||
            //差别1:base要被更新為通過運算器對base和x計算出來的結果,這裡也能知道傳遞的function不能為null
            (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended =
                        (r = function.applyAsLong(v = a.value, x)) == v ||
                                a.cas(v, r)))
            //差别2:第二個參數傳遞的function累加器,而不是null
            //在longAccumulate方法中,在CAS更新value或者base的時候,會判斷function是否為null,即:
            //a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))
            //casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))
            //如果不為null,那麼使用制定累加器規則更新
            longAccumulate(x, function, uncontended);
    }
}
           

5.2.3 其他操作

public long get()

  傳回目前值。類似于LongAdder的sum方法,隻不過由加法變成了指定的累加規則。

public long longValue()

  内部調用 get()方法。

public void reset()

  重置維持更新到辨別值的變量。類似于LongAdder的reset方法,隻不過base值由0變成了在構造器中傳遞的identity。

5.2.4 案例

  一個指定(a * b / 2)的累加規則的案例如下:

//傳遞一個累加器LongBinaryOperator執行個體
LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
    @Override
    public long applyAsLong(long left, long right) {
        //指定的累加規則 left * right / 2
        //這裡的left對應LongAccumulator中的base或者某個Cell的value,這的right對應accumulate傳遞的參數x
        return left * right / 2;
    }
    //初始值為2
}, 2);
//更新2
longAccumulator.accumulate(6);
//擷取結果,應該是6
System.out.println(longAccumulator.get());
           

5.2.5 LongAccumulator的總結

  LongAccumulator的功能更加強大,可以指定初始值和累加規則,這樣看起來LongAdder更像是LongAccumulator的特例,即初始值為0,累加規則為加法。

  是以如果想要使用longAccumulator實作LongAdder的功能,那麼我們手動将累加規則指定為加法,并且identity指定為0即可:

LongAccumulator longAccumulator = new LongAccumulator(new 
LongBinaryOperator() {
    @Override
    public long applyAsLong(long left, long right) {
        //傳回left+right,這樣累加規則就和LongAdder一樣了
        return left + right;
    }
    //初始值為0
}, 0);
//更新6
longAccumulator.accumulate(6);
//更新1
longAccumulator.accumulate(1);
//擷取結果,應該是7
System.out.println(longAccumulator.get());
           

  可以看到我們初始化LongAccumulator對象的代碼量還是比較多的,特别是建立匿名對象的代碼。上面我們說過這個LongBinaryOperator是一個函數式接口,是以我們推薦使用lambda表達式的寫法:

LongAccumulator longAccumulator = new LongAccumulator((left, right) -> {
    return left + right;
}, 0);
           

  更進一步,我們可以使用方法引用:

  它們的含義都是一樣的,即采用加法運算,但是代碼量卻簡單了許多,是以lambda還是很有必要學習的!

6 JMH性能測試

  下面我們測試LongAdder和AtomicLong的性能差别,我們使用JMH方法性能測試。Java使用JMH進行方法性能優化測試。

  本次主要測試AtomicLong的getAndIncrement方法、LongAdder的increment方法以及使用synchronized同步的方法在1秒鐘之内能調用多少次,即測試方法的吞吐量。

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.Throughput)
public class AdderJMHTest {

    private static AtomicLong count = new AtomicLong();
    private static LongAdder longAdder = new LongAdder();
    private static long syn = 0L;

    public static void main(String[] args) throws Exception {
        Options options = new OptionsBuilder().include(AdderJMHTest.class.getName()).warmupIterations(1).measurementIterations(2).forks(1).build();
        new Runner(options).run();
    }

    @Benchmark
    @Threads(10)
    public void run0() {
        count.getAndIncrement();
    }

    @Benchmark
    @Threads(10)
    public void run1() {
        longAdder.increment();
    }

    @Benchmark
    @Threads(10)
    public void run2() {
        synchronized (AdderJMHTest.class) {
            ++syn;
        }
    }
}
           

  在開啟JIT優化的情況下,開啟1個線程,結果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2  155.021          ops/us
AdderJMHTest.run1  thrpt    2  124.791          ops/us
AdderJMHTest.run2  thrpt    2   57.716          ops/us
           

  在開啟JIT優化的情況下,開啟2個線程,結果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   56.432          ops/us
AdderJMHTest.run1  thrpt    2  243.411          ops/us
AdderJMHTest.run2  thrpt    2   32.125          ops/us
           

  在開啟JIT優化的情況下,開啟5個線程,結果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   52.174          ops/us
AdderJMHTest.run1  thrpt    2  486.320          ops/us
AdderJMHTest.run2  thrpt    2   36.689          ops/us
           

  在開啟JIT優化的情況下,開啟10個線程,結果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   48.207          ops/us
AdderJMHTest.run1  thrpt    2  756.315          ops/us
AdderJMHTest.run2  thrpt    2   31.929          ops/us
           

  在開啟JIT優化的情況下,開啟20個線程,結果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   50.508          ops/us
AdderJMHTest.run1  thrpt    2  791.501          ops/us
AdderJMHTest.run2  thrpt    2   36.743          ops/us
           

  在開啟JIT優化的情況下,開啟50個線程,結果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   52.193          ops/us
AdderJMHTest.run1  thrpt    2  800.270          ops/us
AdderJMHTest.run2  thrpt    2   31.817          ops/us
           

  在開啟JIT優化的情況下,開啟100個線程,結果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   51.982          ops/us
AdderJMHTest.run1  thrpt    2  842.155          ops/us
AdderJMHTest.run2  thrpt    2   34.325          ops/us
           

  可見,synchronized吞吐量最少。如果沒有線程競争,那麼LongAdder和AtomicLong的吞吐量差不多。如果線程競争較多,那麼AtomicLong吞吐量降低,LongAdder吞吐量繼續升高,是AtomicLong的是十倍以上。

  在使用-Xint參數關閉JIT優化的情況下,開啟10個線程,結果如下:

Benchmark           Mode  Cnt   Score   Error   Units
AdderJMHTest.run0  thrpt    2   4.781          ops/us
AdderJMHTest.run1  thrpt    2  15.816          ops/us
AdderJMHTest.run2  thrpt    2   3.925          ops/us
           

  雖然它們的總體性能都嚴重降低,但是LongAdder的吞吐量仍然最大,這也說明Java的JIT優化的牛逼之處。

7 atomic的總結

  JDK1.5出現的atomic包下面的原子類,在對于單個變量的複合操作(比如讀-寫)中可以代替鎖的來保證操作的原子性和安全性,并且由于沒有使用鎖而有不錯的性能,但是對于多個變量的複合操作以及一批代碼的原子性和安全性卻無能為力,此時隻能使用鎖。

  我們可以看到,實際上volatile關鍵字以及Unsafe類提供的CAS的方法就是構成原子類的基石,原子類的方法實際上就是對于Unsafe中的CAS方法的二次包裝,方法開發人員使用而已。Unsafe中的CAS方法作為native方法,本身并不是Java語言實作的,它們的源碼位于JVM虛拟機的源碼中,HotSpot虛拟機的源碼中就有這些native方法的具體實作,它們都是采用C++的代碼實作的,友善與底層系統互動,在openjdk中可以找到。

  本文沒有對一些基本知識點做深入講解,比如Unsafe、volatile、CAS、僞共享、JMH等,因為前面的文章中已經講了,都是深入到了虛拟機源碼級别,如果想要深入了解原子類的原理,應該要看看以下文章!

相關文章:

  Unsafe:JUC—Unsafe類的原理詳解與使用案例。

  volatile:Java中的volatile實作原理深度解析以及應用。

  CAS:Java中的CAS實作原理深度解析與應用案例。

  僞共享:Java中的僞共享深度解析以及避免方法。

  JMH:Java使用JMH進行方法性能優化測試。

如果有什麼不懂或者需要交流,可以留言。另外希望點贊、收藏、關注,我将不間斷更新各種Java學習部落格!