文章目錄
- 1、LongAdder原理介紹
- 2、LongAdder源碼介紹
- 3、LongAdder并發時應用
- 4、DoubleAdder分析
- 5、LongAccumulator分析
- 6、DoubleAccumulator分析
1、LongAdder原理介紹
LongAdder是JDK8中新增加的一個并發類,LongAdder主要用于對long型變量進行CAS(CAS可以參考前面系列文章)操作,但AtomicLong也是對long型變量進行CAS操作,兩者什麼差別呢?
AtomicLong内部維護了一個volatile long類型的變量,多個線程并發CAS操作該變量,而LongAdder把一個long變量拆分出多個long變量,采用分段鎖的方式對多個變量進行并發。
在LongAdder内部,一個long型變量被拆分成了一個base變量和一組Cell,每個Cell内部維護了一個long型變量,多個線程并發時,如果并發低,直接累加到base變量,并發高沖突大時,平攤到一組Cell中,最後取值時,再把base和全部的Cell中的value值進行 累加求和。
上述原理内容在Striped64元子類中的longAccumulate方法實作的,LongAdder繼承了Striped64,LongAdder隻是一個元子類并發的架構子,其分段鎖核心思想均在父類Striped64中longAccumulate方法實作。Striped64内部維護了一個base和一組Cell,當線程執行時會首先 更新base變量,如果更新失敗,認為目前線程有競争,就會對Cell進行初始化或擴容,然後根據目前線程的探針計算hash值映射到一組Cell的下标中,如果多個線程競争同一個Cell導緻更新Cell失敗,會擴容Cell的數組并重新hash數組的下标,不同的并發線程分散到不同的Cell中執行。
這種分段鎖的思想,在并發低的時候,不如AtomicLong的效率高,因為并發低時,LongAdder還要把base和各個Cell累加起來;當并發非常高時,LongAdder的效率就顯現出來了,是以LongAdder适合大規模并發統計的場景。
2、LongAdder源碼介紹
首先分析LongAdder的源碼
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
public LongAdder() {
}
/*
* LongAdder繼承了Striped64,是以同時繼承了Striped64中的屬性cells、base、cellsBusy
* */
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/*
**首次執行的時候,都是試着把x加到base上面,首次執行時cell都是null,如果casBase失敗,則産生了競争,進入longAccumulate處理
**非首次執行,cells數組已經初始化過了,目前線程hash對應下标的cell為空,則進入longAccumulate建立cell,并加入cells;如果不為空,則把x值加到對應的cell商
**如果x加到cell上失敗,說明多線程産生了競争,進入longAccumulate重試或者擴容
*
**在取cells下标時,是根據目前線程的hash,即getProbe()&m,getProbe為目前線程的探針值
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
/**
* 加1
*/
public void increment() {
add(1L);
}
/**
* 減1
*/
public void decrement() {
add(-1L);
}
/**
* 把cells中所有的value值累加到base上傳回結果
* 該方法局限性:不能有競争線程存在
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
/**
* 該方法重置base以及cells中所有value的值為0,相當于建立了一個新的LongAdder
* 但是該方法時有局限性的,在重置時不能有并發存在,否則會導緻base或cells中values未能有效置0
*/
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
/**
* 先把cells中所有values累加到base中,然後重置base和cells中所有value值為0
* 該方法局限性:不能有競争線程存在
*/
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
public String toString() {
return Long.toString(sum());
}
/**
* 求出base以及cells中所有value總和
*/
public long longValue() {
return sum();
}
/**
* 把sum總和向下轉為int類型,如果超出Int的最大範圍,則高位截斷
*/
public int intValue() {
return (int)sum();
}
/**
* sum總和轉化為float類型
*/
public float floatValue() {
return (float)sum();
}
/**
* sum總和轉化為double類型
*/
public double doubleValue() {
return (double)sum();
}
/*
* 序列化代理内部類
* 在對LongAdder序列化時,不再直接對LongAdder進行序列化,會暴露LongAdder中各種屬性,通過SerializationProxy類,隻把LongAdder中的sum總和進行序列化,隐藏了LongAdder的結構
* */
private static class SerializationProxy implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
/**
* The current value returned by sum().
* @serial
*/
private final long value;
SerializationProxy(LongAdder a) {
value = a.sum();
}
/*
* 反序列化時,讀取該方法進行 反序列化
* */
private Object readResolve() {
LongAdder a = new LongAdder();
a.base = value;
return a;
}
}
/*
* 調用該方法,對LongAdder的序列化代理内部類SerializationProxy進行序列化
* */
private Object writeReplace() {
return new SerializationProxy(this);
}
/*
* 如果反序列結果與序列化結果不一緻,調用該方法抛出無效對象異常
* */
private void readObject(java.io.ObjectInputStream s)
throws java.io.InvalidObjectException {
throw new java.io.InvalidObjectException("Proxy required");
}
}
可以看出LongAdder内部的核心方法是add,在并發比較高時,add方法調用了父類Strped64中的方法longAccumulate。下面分析下Strped64源碼
@SuppressWarnings("serial")
abstract class Striped64 extends Number {
/*
* Cell為Striped64的内部類,Cell中維護了一個value變量值,當多線程并發時,把要競争的資源攤分到不同的Cell中進行執行,減少并發
* *後續對value操作都是對value在Cell中記憶體偏移量進行操作的
* */
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/*擷取所在機器的CPU數量*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* cells數組中容量是2的倍數
*/
transient volatile Cell[] cells;
/**
* *從最開始沒有競争時,對base進行累加或累減操作
*/
transient volatile long base;
/**
* *相當于一個鎖标志,當對cells進行擴容或建立時,需要先擷取cellBusy鎖,如果擷取不到,說明有其他線程已經擷取了
*/
transient volatile int cellsBusy;
Striped64() {
}
/**
* * 對base進行CAS操作
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
/**
* *CELLSBUSY是cellsBusy的記憶體偏移量
* *對CELLSBUSY做CAS操作,修改成功,cellsBusy的值為1,表示拿到鎖
*/
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
/**
* *傳回目前線程的探針
*/
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
/**
**利用xorshift算法生成僞随機數,生成目前線程的探針值
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
/*
* *該函數用于處理涉及cells初始化、擴容、更新情況,對于更新cells進行從試或擴容
* */
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
/*如果目前線程還未hash,初始化線程的hash探針*/
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; //collide=false表示cells不會擴容,collide=true表示cells會擴容
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
/*
* 當cells不為null時,對目前線程探針作hash作為cells的下标,确定下标後,x的值就要作用到該指定位置的Cell上
* 但是當cells指定位置的Cell為null時,就要在該坐标位置建立Cell
* 在建立Cell時一定要先擷取cellsBusy鎖
* */
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
/*
* 在LongAdder的add方法中,在對cells做CAS時失敗,wasUncontended=false傳入到該longAccumulate方法,說明發生了線程競争
* 現在置wasUncontended=true,重新對cells執行
* */
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
/*
* 對線程探針hash後的值對應cells的坐标有值,則對該坐标處的Cell執行CAS操作
* 如果fn==null,則直接把x累加到a中value上;如果fn!=null,則對v和x執行功能函數fn,執行後的值更新到a中的value值上
* */
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
/*
* 如果a做CAS失敗,說明線程發生了競争,多個線程在競争a資源
* 此時判斷是否對cells擴容:如果n>=CPU的個數,不擴容;或者其他線程已經修改了cells,也不擴容
* */
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
/*
* 如果判斷不對cells進行擴容,則把擴容标志collide設定true,從新進行自旋,也即執行for循環
* */
else if (!collide)
collide = true;
/*
* 如果在執行前面(a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))操作失敗後,即更新Cell失敗,
* 又滿足擴容條件,即n<NCPU,則對cells進行擴容一倍,并把原來cells中内容複制到新擴容後的cells中
* */
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
/*
* a.cas執行失敗,又不滿足擴容條件時,重新hash目前線程探針值,并從新執行自旋操作,即for循環
* */
h = advanceProbe(h);
}
/*
* cells為空時,在LongAdder中執行add方法時,首先加到base上,加到base上失敗,即有競争時,走該分支
* *初始化cells,cells初始值容量是2,後續擴容時乘2,
*
* *在初始化cells時,首先拿到cellsBusy鎖,即cellsBuys為0時,證明沒有其它線程競争,後續才可以初始化cells,因為其它線程此時也可能在初始化cells,
* *隻能由一個線程初始化cells,是以要先判斷是否獲得鎖
* */
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
/*
* *如果前一步初始化cells失敗,即沒有拿到cellsBusy鎖,說明有其它線程已經拿到了cellsBusy鎖,正在初始化cells
* *在此種情況下,會把x的值加到base上
* */
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
/**
* doubleAccumulate與longAccumulate操作一緻
* 隻不過longAccumulate時處理long型分段鎖的,doubleAccumulate是執行double類型分段鎖的,
* 但是doubleAccumulate中double類型也是轉換成long類型,然後用分段鎖的,最後再把執行結果轉換成double類型
*/
final void doubleAccumulate(double x, DoubleBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(Double.doubleToRawLongBits(x));
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value,
((fn == null) ?
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x) :
Double.doubleToRawLongBits
(fn.applyAsDouble
(Double.longBitsToDouble(v), x)))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base,
((fn == null) ?
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x) :
Double.doubleToRawLongBits
(fn.applyAsDouble
(Double.longBitsToDouble(v), x)))))
break; // Fall back on using base
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
3、LongAdder并發時應用
比如對銀行中熱點賬戶問題,一個火爆的理财賬戶對外進行搶售賣(假設不設額度上限),如果隻有一個線程對該賬戶執行,多個客戶同時進行購買理财時就會導緻其他幾個客戶購買失敗,造成非常壞的體驗,利用本節的LongAdder,把一個熱點賬戶拆分成幾個賬戶同時對外售賣,就可以大大減少并發失敗率。
package com.lzj.atomic.striped64;
import java.util.concurrent.atomic.LongAdder;
public class LongAdderDemo {
private static LongAdder adder = new LongAdder();
public static void main(String[] args) throws InterruptedException {
Runnable run1 = new Runnable() {
@Override
public void run() {
for(int i=0; i<100; i++) {
adder.add(100); //假設線程1每次賣理财100,統計100次
}
}
};
Runnable run2 = new Runnable() {
@Override
public void run() {
for(int i=0; i<100; i++) {
adder.add(50); //假設線程2每次賣理财50,統計100次
}
}
};
Thread thread1= new Thread(run1);
Thread thread2 = new Thread(run2);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("理财賬戶總共賣出理财 :" + adder.sum());
}
}
線程執行結束後,輸出如下,可見所有客戶購買理财成功
4、DoubleAdder分析
LongAdder是處理long型變量分段鎖并發的,而DoubleAdder内部是處理double類型分段鎖并發的,兩者原理完全一緻,用法也完全一緻,DoubleAdder内部的核心方法也是add方法,該方法并發高時,調用了父類Striped64中的doubleAccumulate方法。
并且DoubleAdder的代碼與LongAdder的代碼都一緻,兩者都繼承了Striped64基類,DoubleAdder内部就是利用long型分段鎖進行并發的,并發結束後,又把long型變量轉化了double類型,可見DoubleAdder内部也是利用LongAdder進行并發計算的。
5、LongAccumulator分析
LongAccumulator與LongAdder原理類似,都是調用了父類Striped64中longAccumulate方法完成分段鎖的更新,隻是LongAccumulator的功能更強大。
LongAdder隻能進行累加操作,并且初始值為0;LongAccumulator可以進行兩個二維元素的操作,初始值可以自定義。LongAdder在調用Striped64中longAccumulate中方法時,傳入的LongBinaryOperator對象為null,導緻LongAdder隻能進行累加減操作,因為LongAdder在執行CAS的時候執行的casBase(b, b+x),或者a.cas(v = a.value, v + x);而LongAccumulator在調用Striped64中longAccumulate中方法時,自定義了一個對二維元素操作的LongBinaryOperator對象,可以對這個二維元素進行任意操作,因為LongAccumulator在CAS時候執行的casBase(b,r),或者a.cas(v=a.value,r),其中r=LongBinaryOperator.applyAsLong(base,x),或者LongBinaryOperator(a.value,x)。
用法示例如下,假設對一個指定因子,用多個線程對其翻倍處理,示例中因子為1
public class LongAccumulatorDemo {
public static void main(String[] args) throws InterruptedException {
LongBinaryOperator operator = (x, y) -> x*y;
LongAccumulator accumulator = new LongAccumulator(operator, 1);
Runnable runn1 = new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
accumulator.accumulate(2);
}
}
};
Runnable runn2 = new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
accumulator.accumulate(2);
}
}
};
Thread thread1 = new Thread(runn1);
Thread thread2 = new Thread(runn2);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("并發結果為:" + accumulator.get());
}
}
執行結果如下:
6、DoubleAccumulator分析
DoubleAccumulator原理與用法與LongAccumulator完全一緻,隻是DoubleAccumulator用來處理double類型的數值,LongAccumulator用來處理long類型的數值。其實DoubleAccumulator内部與DoubleAdder一樣都是先轉化為Long類型處理,結果再轉化為Double類型。