java.util.concurrent.atomic包類的原理
atomic包底層原理都是Unsafe.compareAndSwapXXX()方法,也就是通常說的CAS,compareAndSwapXXX方法是native辨別的方法,是Java底層代碼,不是Java代碼實作的,
public final class Unsafe {
//object
public final native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
//int
public final native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
//long
public final native boolean compareAndSwapLong(Object obj, long offset, long expect, long update);
}
都是有4個參數:
第一個參數 obj:要更新的變量名
第二個參數 offset:主記憶體中該變量的值
第三個參數 expect:期待的該變量的值
第四個參數 update:如果offset==expect,則更新obj為update值
java.util.concurrent.atomic包結構
AtomicBoolean
AtomicBoolean原理,有一個int 類型的成員變量,該值隻可能為1-true,0-false,兩個
AtomicBoolean有一點值得注意的,就是AtomicBoolean的方法compareAndSet(),隻會執行一次,
//AtomicBoolean 方法
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
AtomicInteger 原子操作一個Integer類型的變量
以getAndAdd 方法為例,其他方法原理一樣:
//AtomicInteger 的方法
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
/**
* Unsafe 類的方法
* 三個參數,比如 int a = 2+1
* 第一個參數var1代表變量名a
* 第二個參數var2代表變量a期待的值為2
* 第三個參數var4代表如果var2的值為2,則進行加法的值
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//var5 表示調用底層方法得到的變量var1在主記憶體中的值
var5 = this.getIntVolatile(var1, var2);
// var1 表示目前變量,var2表示期待的值,var5表示該變量在底層的值,
// 如果var2==var5 ,則進行var5 + var4操作
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
上面代碼,是個循環,隻有設定值成功才會退出,如果預期的值和底層的值不一樣,則還會進入循環,重新擷取底層變量的值var5 ,預期值var2也重新從對象var1中擷取,然後對比var2,var5,直到兩個值相等,則進行運算操作,退出循環。這裡可以看出,如果修改這個變量的線程非常多并發量非常大時,var2,var5一直不相等,cpu 就會一直空轉,浪費cpu資源,是以atomic包适合并發量不大的場景
AtomicIntegerArray原子操作Integer類型的數組的類
AtomicIntegerArray 和AtomicInteger差別不大,就是一個是單個Integer變量,一個是Integer數組,操作數組中的每個數也都是原子操作方法
以getAndAdd 方法為例,其他方法原理一樣:
//AtomicIntegerArray 方法,第一個參數數數組坐标,第二個參數是要加的值
//傳回值為做加法前的數值
public final int getAndAdd(int i, int delta) {
//checkedByteOffset方法,擷取數組對應i坐标的變量值
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
/**
* Unsafe 類的方法
* 三個參數,比如 int a = 2+1
* 第一個參數var1代表變量名a
* 第二個參數var2代表變量a期待的值為2
* 第三個參數var4代表如果var2的值為2,則進行加法的值
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//var5 表示調用底層方法得到的變量var1在主記憶體中的值
var5 = this.getIntVolatile(var1, var2);
// var1 表示目前變量,var2表示期待的值,var5表示該變量在底層的值,
// 如果var2==var5 ,則進行var5 + var4操作
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
//傳回值是操作前的值
return var5;
}
AtomicIntegerFieldUpdater
這個類可以cas原子更新一個類的一個成員變量的值,并且要求成員變量非static并且volatile修飾,例如:
@Slf4j
public class Test {
//泛型是類:Test,
//newUpdater方法第一個參數是類的class,第二個參數數該類的成員變量名
private static AtomicIntegerFieldUpdater<Test> updater = AtomicIntegerFieldUpdater.newUpdater(Test.class,"count");
//注意該變量必須由volatile關鍵字修飾,
//并且注意不能是包裝類,也不能是static修飾,否則會報錯
private volatile int count=100;
public static void main(String[] args) throws Exception {
Test test = new Test();
if(updater.compareAndSet(test ,100,120)){
log.info("【1】如果類對象test 的成員變量count的值是100,則更新為120,----更新成功");
}else{
log.info("【2】如果類對象test 的成員變量count的值是100,則更新為120,----更新失敗");
}
if(updater.compareAndSet(test ,100,120)){
log.info("【3】如果類對象test 的成員變量count的值是100,則更新為120,----更新成功");
}else{
log.info("【4】如果類對象test 的成員變量count的值是100,則更新為120,----更新失敗");
}
}
public Integer getCount() {
return count;
}
}
運作結果:
14:29:54.953 [main] INFO test.Test - 【1】如果類對象test 的成員變量count的值是100,則更新為120,----更新成功
14:29:54.958 [main] INFO test.Test - 【4】如果類對象test 的成員變量count的值是100,則更新為120,----更新失敗
AtomicLong
同AtomicInteger
AtomicLongArray
同AtomicIntegerArray
AtomicLongFieldUpdater
同AtomicIntegerFieldUpdater
AtomicMarkableReference
這個跟AtomicStampedReference類似,也有個辨別,隻是AtomicStampedReference的版本号可以随意加,可以為很多版本,但是AtomicMarkableReferenced的辨別隻能為boolean值,即true,false,例子:
public class Test {
/**
* 構造函數,第一個參數是初始值,第二個參數是初始版本号
*/
static AtomicMarkableReference<Integer> atomicStampedReference = new AtomicMarkableReference(0, false);
public static void main(String[] args) throws InterruptedException {
final int reference = atomicStampedReference.getReference();
System.out.println("初始值:" + reference );
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("更新前值:" + reference + " --》 如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:"
+ atomicStampedReference.compareAndSet(reference, reference + 10, false, true));
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
Integer reference = atomicStampedReference.getReference();
System.out.println("更新前值:" + reference + " --》 如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:"
+ atomicStampedReference.compareAndSet(reference, reference + 10, false, true));
}
});
t1.start();
t1.join();
t2.start();
t2.join();
System.out.println("更新後值:" + atomicStampedReference.getReference());
}
}
執行結果:
初始值:0
更新前值:0 --》 如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:true
更新前值:10 --》 如果值為0,标志為false,則更新值為10,标志為true,是否更新成功:false
更新後值:10
AtomicReference
泛型傳入一個類,然後cas原子操作這個變量
public static void main(String[] args) throws Exception {
AtomicReference<Character> characterAtomicReference = new AtomicReference<>('a');
//如果characterAtomicReference 的值為a,則更新為c
characterAtomicReference.compareAndSet('a','c'); //c
characterAtomicReference.compareAndSet('b','d');//no
characterAtomicReference.compareAndSet('c','e');//e
characterAtomicReference.compareAndSet('d','g');//no
characterAtomicReference.compareAndSet('e','j');//j
characterAtomicReference.compareAndSet('j','w');//w
System.out.println(characterAtomicReference.get());//w
}
AtomicReferenceArray
同AtomicIntegerArray
AtomicReferenceFieldUpdater
同 AtomicIntegerFieldUpdater
AtomicStampedReference
該類,可以解決CAS的ABA問題,
cas 的ABA問題是指,變量a從1 更新為2,又從2更新為1,其他線程查詢是1,以為這個變量沒有更新,實際這個變量更新過。解決辦法,就是添加版本号,比如 變量a從1 更新為2版本為v1,又從2更新為1版本為v2,
//AtomicStampedReference 核心類
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp && //這裡多了版本比較,
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
LongAdder簡介
JDK1.8時,java.util.concurrent.atomic包中提供了一個新的原子類:LongAdder。
根據Oracle官方文檔的介紹,LongAdder在高并發的場景下會比它的前輩————AtomicLong 具有更好的性能,代價是消耗更多的記憶體空間:
那麼,問題來了:
為什麼要引入LongAdder? AtomicLong在高并發的場景下有什麼問題嗎? 如果低并發環境下,
LongAdder和AtomicLong性能差不多,那LongAdder是否就可以替代AtomicLong了?
為什麼要引入LongAdder?
我們知道,AtomicLong是利用了底層的CAS操作來提供并發性的,比如addAndGet方法:
/**
* 以原子方式将給定值添加到目前值。
* 參數:增量-要添加的值
* 傳回值:更新值
*/
public final long addAndGet(long delta) {
//unsafe.getAndAddInt 方法進行cas 更新操作,傳回值是操作前的值
//addAndGet方法傳回的是操作後的值,是以是,傳回前值+delta
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
/**
* Unsafe 類的方法
* 三個參數,比如 int a = 2+1
* 第一個參數var1代表變量名a
* 第二個參數var2代表變量a期待的值為2
* 第三個參數var4代表如果var2的值為2,則進行加法的值
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//var5 表示調用底層方法得到的變量var1在主記憶體中的值
var5 = this.getIntVolatile(var1, var2);
// var1 表示目前變量,var2表示期待的值,var5表示該變量在底層的值,
// 如果var2==var5 ,則進行var5 + var4操作
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
//傳回值是操作前的值
return var5;
}
上述方法調用了Unsafe類的getAndAddLong方法,該方法内部是個native方法,它的邏輯是采用自旋的方式不斷更新目标值,直到更新成功。
在并發量較低的環境下,線程沖突的機率比較小,自旋的次數不會很多。但是,高并發環境下,N個線程同時進行自旋操作,會出現大量失敗并不斷自旋的情況,此時AtomicLong的自旋會成為瓶頸。
這就是LongAdder引入的初衷——解決高并發環境下AtomicLong的自旋瓶頸問題。
LongAdder快在哪裡?
既然說到LongAdder可以顯著提升高并發環境下的性能,那麼它是如何做到的?這裡先簡單的說下LongAdder的思路,第二部分會詳述LongAdder的原理。
我們知道,AtomicLong中有個内部變量value儲存着實際的long值,所有的操作都是針對該變量進行。也就是說,高并發環境下,value變量其實是一個熱點,也就是N個線程競争一個熱點。
LongAdder的基本思路就是分散熱點,将value值分散到一個數組中,不同線程會命中到數組的不同槽中,各個線程隻對自己槽中的那個值進行CAS操作,這樣熱點就被分散了,沖突的機率就小很多。如果要擷取真正的long值,隻要将各個槽中的變量值累加傳回。
這種做法有沒有似曾相識的感覺?沒錯,ConcurrentHashMap中的“分段鎖”其實就是類似的思路。
LongAdder能否替代AtomicLong?
回答這個問題之前,我們先來看下LongAdder提供的API:
可以看到,LongAdder提供的API和AtomicLong比較接近,兩者都能以原子的方式對long型變量進行增減。
- AtomicLong提供的功能其實更豐富,尤其是addAndGet、decrementAndGet、compareAndSet這些方法。addAndGet、decrementAndGet除了單純的做自增自減外,還可以立即擷取增減後的值,
- LongAdder則需要做同步控制才能精确擷取增減後的值。如果業務需求需要精确的控制計數,做計數比較,AtomicLong也更合适。
另外,從空間方面考慮,LongAdder其實是一種“空間換時間”的思想,從這一點來講AtomicLong更适合。當然,如果你一定要跟我杠現代主機的記憶體對于這點消耗根本不算什麼,那我也辦法。
總之,低并發、一般的業務場景下AtomicLong是足夠了。如果并發量很多,存在大量寫多讀少的情況,那LongAdder可能更合适。适合的才是最好的,如果真出現了需要考慮到底用AtomicLong好還是LongAdder的業務場景,那麼這樣的讨論是沒有意義的,因為這種情況下要麼進行性能測試,以準确評估在目前業務場景下兩者的性能,要麼換個思路尋求其它解決方案。
最後,給出國外一位部落客對LongAdder和AtomicLong的性能評測,以供參考:
http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/
LongAdder原理
之前說了,AtomicLong是多個線程針對單個熱點值value進行原子操作。而LongAdder是每個線程擁有自己的槽,各個線程一般隻對自己槽中的那個值進行CAS操作。
比如有三個ThreadA、ThreadB、ThreadC,每個線程對value增加10。
對于AtomicLong,最終結果的計算始終是下面這個形式:
value = 10 + 10 + 10 = 30 value=10+10+10=30
但是對于LongAdder來說,内部有一個base變量,一個Cell[]數組。
base變量:非競态條件下,直接累加到該變量上
Cell[]數組:競态條件下,累加個各個線程自己的槽Cell[i]中的值
最終結果的計算是下面這個形式:
等價于:value=base + Cell[0] + Cell[1] + Cell[2] +…+Cell[n-1]; (n為Cell數值的長度)
LongAdder的内部結構
LongAdder隻有一個空構造器,其本身也沒有什麼特殊的地方,所有複雜的邏輯都在它的父類Striped64中。
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
public LongAdder() {
}
}
來看下Striped64的内部結構,這個類實作一些核心操作,處理64位資料。
Striped64隻有一個空構造器,初始化時,通過Unsafe擷取到類字段的偏移量,以便後續CAS操作:
Striped64
abstract class Striped64 extends Number {
Striped64() {
}
// Unsafe 類
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面有個比較特殊的字段是threadLocalRandomProbe,可以把它看成是線程的hash值。這個後面我們會講到。
定義了一個内部Cell類,這就是我們之前所說的槽,每個Cell對象存有一個value值,可以通過Unsafe來CAS操作它的值:
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe 類
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
其它的字段:
可以看到Cell[]就是之前提到的槽數組,base就是非并發條件下的基數累計值。
/** CPUS的數目,以限制表大 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* cells數組。 如果為非null,則數組大小為2的幂。
*/
transient volatile Cell[] cells;
/**
* 基本值,主要在沒有争用時使用,也用作數組初始化過程中的後備。 通過CAS更新
*/
transient volatile long base;
/**
* 擴容或者建立數組時使用的自旋鎖(通過CAS鎖定),将标志設定為1(加鎖狀态),
* 初始化完畢或者擴容完畢時将此标志設定為0
*/
transient volatile int cellsBusy;
LongAdder的核心方法
還是通過例子來看:
假設現在有一個LongAdder對象la,四個線程A、B、C、D同時對la進行累加操作。
LongAdder la = new LongAdder();
la.add(10);
①ThreadA調用add方法(假設此時沒有并發)
// LongAdder方法
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
/**
* Striped64 方法
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
初始時Cell[]為null,base為0。是以ThreadA會調用casBase方法(定義在Striped64中),因為沒有并發,CAS操作成功将base變為10,可以看到,如果線程A、B、C、D線性執行,那casBase永遠不會失敗,也就永遠不會進入到base方法的if塊中,所有的值都會累積到base中。
那麼,如果任意線程有并發沖突,導緻caseBase失敗呢?失敗就會進入if方法體:
這個方法體會先再次判斷Cell[]槽數組有沒初始化過,如果初始化過了,以後所有的CAS操作都隻針對槽中的Cell;否則,進入longAccumulate方法。整個add方法的邏輯如下圖:
可以看到,隻有從未出現過并發沖突的時候,base基數才會使用到,一旦出現了并發沖突,之後所有的操作都隻針對Cell[]數組中的單元Cell。
如果Cell[]數組未初始化,會調用父類的longAccumelate去初始化Cell[],如果Cell[]已經初始化但是沖突發生在Cell單元内,則也調用父類的longAccumelate,此時可能就需要對Cell[]擴容了。
Striped64的核心方法
我們來看下Striped64的核心方法longAccumulate到底做了什麼:
//Striped64方法
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//getProbe()方法給目前線程生成一個非0的hash值
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
//如果哈希取模映射得到的cell單元不為null,則為true,此值也可看做擴容意向
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
//第一種可能:cells已經初始化了
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//第二種可能,cells沒有加鎖也沒有初始化,則嘗試對cells加鎖,并進行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2]; //初始化大小為2(必須為2的幂次方)
rs[h & 1] = new Cell(x); //根據hash值定位,并指派
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//第三種可能:cells正在初始化,則嘗試直接在base上進行累加操作
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
上述代碼首先給目前線程配置設定一個hash值,然後進入一個自旋,這個自旋分為三個分支:
- 第一種可能:Cell[]數組已經初始化
- 第二種可能:Cell[]數組未初始化
- 第三種可能:Cell[]數組正在初始化中
第二種可能:Cell[]數組未初始化
我們之前讨論了,初始時Cell[]數組還沒有初始化,是以會進入分支②:
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2]; //初始化大小為2(必須為2的幂次方)
rs[h & 1] = new Cell(x); //根據hash值定位,并指派
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
首先會将cellsBusy置為1:加鎖狀态
/**
* 将cellBusy字段從0設為1,以擷取鎖定。
*/
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
然後,初始化Cell[]數組(初始大小為2),根據目前線程的hash值計算映射的索引,并建立對應的Cell對象,Cell單元中的初始值x就是本次要累加的值。
第三種可能:Cell[]數組正在初始化中
如果在初始化過程中,另一個線程ThreadB也進入了longAccumulate方法,就會進入分支③:
//第三種可能:cells正在初始化,則嘗試直接在base上進行累加操作
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
可以看到,分支③直接操作base基數,将值累加到base上。
第一種可能:Cell[]數組已經初始化
如果初始化完成後,其它線程也進入了longAccumulate方法,就會進入分支①:
//第一種可能:cells已經初始化了
if ((as = cells) != null && (n = as.length) > 0) {
//目前線程的hash值運算後映射到Cell單元為null,說明該Cell沒有被使用
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // cells數字沒有正在擴容
Cell r = new Cell(x);
//嘗試加鎖,成功後cellsBusy=1
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
//在有鎖的情況下,再次堅持cell單元是否為null,為空則指派
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// wasUncontended表示前一次CAS更新Cell單元是否成功了
else if (!wasUncontended)
// 沒成功,重置為true,後面會重新計算線程的hash值
wasUncontended = true;
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)//當cells數組的大小超過cpu核數後,永遠都不會再擴容了
//
collide = false;// 擴容辨別,為false,表示不會再進行擴容
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {//嘗試加鎖
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];//擴容為原來的兩倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);//計算目前線程新的hash值
}
LongAdder的sum方法
最後,我們來看下LongAdder的sum方法:
sum求和的公式就是我們開頭說的:
需要注意的是,這個方法隻能得到某個時刻的近似值,也就是如果要求一個準确無誤的值,應該用AtomicLong接口,這也就是LongAdder并不能完全替代LongAtomic的原因之一。
LongAdder的其它兄弟
JDK1.8時,java.util.concurrent.atomic包中,除了新引入LongAdder外,還有引入了它的三個兄弟類:LongAccumulator、DoubleAdder、DoubleAccumulator
LongAccumulator
LongAccumulator是LongAdder的增強版。LongAdder隻能針對數值的進行加減運算,而LongAccumulator提供了自定義的函數操作。其構造函數如下:
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
通過LongBinaryOperator,可以自定義對入參的任意操作,并傳回結果(LongBinaryOperator接收2個long作為參數,并傳回1個long)
LongAccumulator内部原理和LongAdder幾乎完全一樣,都是利用了父類Striped64的longAccumulate方法。這裡就不再贅述了,可以自己閱讀源碼。
DoubleAdder和DoubleAccumulator
從名字也可以看出,DoubleAdder和DoubleAccumulator用于操作double原始類型。
與LongAdder的唯一差別就是,其内部會通過一些方法,将原始的double類型,轉換為long類型,其餘和LongAdder完全一樣:
// DoubleAdder方法
public void add(double x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null ||
!casBase(b = base,
Double.doubleToRawLongBits
(Double.longBitsToDouble(b) + x))) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value,
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x))))
doubleAccumulate(x, null, uncontended);
}
}
參考
https://segmentfault.com/a/1190000015865714