public final long incrementAndGet() {
for (;;) {
// 擷取AtomicLong目前對應的long值
long current = get();
// 将current加1
long next = current + 1;
// 通過CAS函數,更新current的值
if (compareAndSet(current, next))
return next;
}
}
// value是AtomicLong對應的long值
private volatile long value;
// 傳回AtomicLong對應的long值
public final long get() {
return value;
}
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
compareAndSet()的作用是更新AtomicLong對應的long值。它會比較AtomicLong的原始值是否與expect相等,若相等的話,則設定AtomicLong的值為update。
高并發下計數,一般最先想到的應該是AtomicLong/AtomicInt,AtmoicXXX使用硬體級别的指令 CAS 來更新計數器的值,這樣可以避免加鎖,機器直接支援的指令,效率也很高。但是AtomicXXX中的 CAS 操作在出現線程競争時,失敗的線程會白白地循環一次,在并發很大的情況下,因為每次CAS都隻有一個線程能成功,競争失敗的線程會非常多。失敗次數越多,循環次數就越多,很多線程的CAS操作越來越接近 自旋鎖(spin lock)。計數操作本來是一個很簡單的操作,實際需要耗費的cpu時間應該是越少越好,AtomicXXX在高并發計數時,大量的cpu時間都浪費會在 自旋 上了,這很浪費,也降低了實際的計數效率。
// jdk1.8的AtomicLong的實作代碼,這段代碼在sun.misc.Unsafe中
// 當線程競争很激烈時,while判斷條件中的CAS會連續多次傳回false,這樣就會造成無用的循環,循環中讀取volatile變量的開銷本來就是比較高的
// 因為這樣,在高并發時,AtomicXXX并不是那麼理想的計數方式
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v + delta));
return v;
}
@SuppressWarnings("serial")
abstract class Striped641 extends Number1 {
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Cell[] cells;// cell數組,長度一樣要是2^n,可以類比為jdk1.7的ConcurrentHashMap中的segments數組
// 累積器的基本值 ,沒有遇到并發的情況,直接使用base,速度更快;
transient volatile long base;//cas更新
// 自旋辨別,在對cells進行初始化,或者後續擴容時,需要通過CAS操作把此辨別設定為1(busy,忙辨別,相當于加鎖), 取消busy時可以直接使用cellsBusy = 0,相當于釋放鎖
transient volatile int cellsBusy;//旋轉鎖
Striped641() {
}
final boolean casBase(long cmp, long val) {//原子更新base
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// 使用CAS将cells自旋辨別更新為1,更新為0時可以不用CAS(指派為0肯定是隻有一個線程在指派為0),直接使用cellsBusy就行
final boolean casCellsBusy() {//原子更新cellsBusy從0到1,以擷取鎖。
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
// 下面這兩個方法是ThreadLocalRandom中的方法,不過因為包通路關系,這裡又重新寫一遍
// probe翻譯過來是探測/探測器/探針這些,不好了解,它是ThreadLocalRandom裡面的一個屬性,
// 不過并不影響對Striped64的了解,這裡可以把它了解為線程本身的hash值
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// 相當于rehash,重新算一遍線程的hash值
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);//CAS設定目前線程的threadLocalRandomProbe
return probe;
}
//x:要增加的數。fn:執行函數。uncontended=false表示更新失敗了,=true表示沒有這個線程的Cell(不可能是更新成功了,更新成功就進不來這裡)。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {//開始分段更新:1.base更新失敗。2.前面分段更新不行或失敗。
//建立,更新,擴容,初始化,更新base。
int h;//線程hash值
// 看下ThreadLocalRandom是否初始化。如果目前線程的threadLocalRandomProbe為0,說明目前線程是第一次進入該方法,
if ((h = getProbe()) == 0) {// 目前線程hash值=0,
ThreadLocalRandom.current(); //初始化目前線程的PROBE值不為0,
h = getProbe();
wasUncontended = true;//下面的cas語句走不走,還是間隔一個for循環在cas uncontended=false代表存在争用,uncontended=true代表不存在争用。
}
boolean collide = false; //下面的擴容語句走不走,還是間隔一個for循環在擴容 。collide=true代表cas有沖突,collide=false代表cas無沖突
for (;;) {
Cell[] as;//局部變量,線程執行時候,局部變量不會變,成員變量會改變(修改屬性位址不變,重新new位址才改變)。as一進來就指派後面沒有更改過。
Cell a;//線程對應的cell,a一進來就指派後面沒有更改過。
//cellsBusy沒有局部變量,直接使用成員變量。是一個鎖。
int n;
long v;
//------------------------------------------重要---多線程時候,在一個線程的周期裡面,前一個指令的判斷(被另一個線程修改)現在不一定成立了-----------------------------------------------------//
/*每次執行真正操作時候,有可能剛才判斷的條件全部不成立了,就要重來,那麼剛才判斷條件有什麼用:不沖突有用。
執行成功時候:在鎖住期間,剛才進來的判斷都成立,近似于單線程操作,或者别的操作了,但是不影響現在要操作的條件。*/
/*多線程同時判斷3個if,有可能一個線程判斷第一個if不成立,但是判斷第二個if時候,第一個if條件又成立了。走到後面的判斷,隻能說剛才前面的判斷不成立,現在前面的判斷不一定不成立了。
是以進入一個if:要看2個判斷,之前判斷是什麼,現在判斷是什麼,才進入這個if條件 */
/*casCellsBusy()用于鎖住這個cells,别的線程不能擴容和初始化和建立cell(但是可以cas更新存在的cell)。但是casCellsBusy()前後的條件不一定再次成立了,是以鎖住之後要再次判斷剛才的條件
,多線程時候上一次的判斷現在不一定成立了。*/
/*局部變量,線程執行時候,局部變量不會變,成員變量會改變(修改屬性位址不變,重新new位址才改變)。as一進來就指派後面沒有更改過。*/
/*在一個線程裡面:1.已經有cells,要麼建立(建立時候看是不是空),要麼更新(要看是不是原值),要麼擴容(要看cells有沒有變化)。2.沒有cells就去初始化(初始話時候再看是不是空)。3.初始化搶不赢就去更新base。*/
//------------------------------------------重要-----多線程時候,在一個線程的判斷周期裡面,前一個指令的判斷(被另一個線程修改)現在不一定成立了---------------------------------------------------//
//as == cells,隻有初始化和擴容(因為重新new)才不相等,修改值還是相等的。
//每次重新來,都會重新擷取as和a,as = cells,a = as[(n - 1) & h],并且更新線程hash。
// 1.已經有分段更新了cells!=null
if ((as = cells) != null && (n = as.length) > 0) {
//1.1 沒有這個線程的Cell,建立
//重新來:1.建立cell時候有人占用cell。2.建立cell時候位置不為空。
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // cellsBusy=1表示有人在修改Cells數組(修改Cell從null到new Cell,擴容,初始化),CAS更新一個已經存在的Cell不用判斷cellsBusy。
Cell r = new Cell(x); //這期間其他線程可以做很多事
if (cellsBusy == 0 && casCellsBusy()) {// cellsBusy是0就進來,然後變成cellsBusy=1,别的進不來。
//不可能多個線程同時進這裡面來。
/*鎖住cells,但是前面判斷a=null,現在不一定a=null了,因為在前面判斷到鎖住cells期間cells有可能改變了,并且cellsBusy從0變到1又變到0。是以鎖住之後在判斷是不是空。*/
/*每次執行真正操作時候,有可能剛才判斷的條件全部不成立了,就要重來,那麼剛才判斷條件有什麼用:不沖突有用。
執行成功時候:在鎖住期間,剛才進來的判斷都成立,近似于單線程操作,或者别的操作了,但是不影響現在要操作的條件。*/
boolean created = false;
try {
Cell[] rs;
int m, j;
// 再次判斷沒有這個cell, 前面if判斷了是空,走到這裡時候有可能别人放進去了并且cellsBusy從0變到1再變到0了。如果不是null了,就不放,下次再來(直接更新)。
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;// 指派
created = true;// 建立完成,退出,不用重新來了。
}
} finally {
cellsBusy = 0;//釋放鎖
}
if (created)// 建立完成,退出
break;
continue; // 這個線程沒有成功建立,肯定重頭再來
}
}
collide = false;// cell不存在,但是有人修改cells,collide = false,
}
//1.2有這個線程的cell
//wasUncontended=false重新來
else if (!wasUncontended) // wasUncontended=false表示更新失敗了,再來,wasUncontended=true下次不進這裡直接去cas更新,否則先不cas先再來一次。
wasUncontended = true;
// 1.3有這個線程的cell
//wasUncontended=true,更新失敗了重新來。
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))//這個線程有Cell,去更新。
break;// 更新成功,退出。
// 1.4有這個線程的cell
//wasUncontended=true,更新失敗,cells初始化擴容了,重新來
else if (n >= NCPU || cells != as) // CPU能夠并行的CAS操作的最大數量是它的核心數 ,cells被改變了(擴容了肯定重新來)。
collide = false;
// 1.5有這個線程的cell
//wasUncontended=true,更新失敗,cells沒有初始化擴容,collide=false,重新來
else if (!collide) //=false走這裡,collide=true,下次不走這裡直接去擴容否則先不去擴容先再來一次。
collide = true;
// 1.6有這個線程的cell
//wasUncontended=true,更新失敗,cells沒有初始化擴容,collide=true,占用cells,擴容完成,重新來
//有這個線程的cell,cas失敗,說明2個線程同時更新這個cell,就擴容。既然你不讓我加,競争這麼厲害,那麼擴容試試看。
else if (cellsBusy == 0 && /* 這期間其他線程可以做很多事 */casCellsBusy()) {
//不可能多個線程同時進這裡面來。
try {
if (cells == as) { //鎖住cells了,最開始as = cells,但是現在as不一定=cells,是以判斷cells沒變擴容,
Cell[] rs = new Cell[n << 1];// 執行2倍擴容
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;// 釋放鎖
}
collide = false;// 擴容意向為false
continue; // 擴容後還沒有設定值(肯定重新來)
}
//1.7 有這個線程的cell
h = advanceProbe(h);// 修改目前線程的hash,降低hash沖突(線程hash改變是無所謂的,關注的是裡面的值,與哪個線程放進去無關),避免下次還映射到這個cell。
}
// 2。某個線程執行時候,前一個if判斷:沒有分段更新,cells==null或者cells.length=0。走到這裡時候cells有可能不為空了,但是要進入這if必須:cellsBusy=0,
//同時cells還是剛才那個as=null(沒有擴容和初始化)并且casCellsBusy()搶成功,就去初始化。
//線程執行到這裡:之前判斷:【沒有cells】,并且現在【沒人擴容或者初始化,并且cells為空】就初始化。
else if (cellsBusy == 0 && cells == as && /*這期間其他線程可以做很多事*/casCellsBusy()) {// cellsBusy=1,别的線程就不能動cells
//不可能多個線程同時進這裡面來。
boolean init = false;
try {
if (cells == as) { //鎖住cells了,但是cells不一定=as=空或者null了, 鎖住之後一定要再檢測一次,如果還是null就初始化
Cell[] rs = new Cell[2];// 初始化時隻建立兩個單元
rs[h & 1] = new Cell(x);// 對其中一個單元進行累積操作,另一個不管,繼續為null
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;// 釋放鎖
}
if (init)// 初始化成功退出,初始化失敗繼續來
break;
}
// 3。走到這裡:前面判斷不成立(不代表現在的前面判斷也不成立),之前判斷:cells=null或者cells.length=0【沒有cells】并且cellsBusy=1或者 cells被改變了【有人正在初始化或擴容】或者casCellsBusy()失敗。
//有了分段更新,還是可以用base,提高效率。準備去擴容的,但是現在有可能别人已經擴容了(cells != as)或者casCellsBusy()失敗(搶着去擴容沒有搶成功)
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break; // 更新base,成功就退出。
}
/*如果Cells表為空,嘗試擷取鎖之後初始化表(初始大小為2);
如果Cells表非空,對應的Cell為空,自旋鎖未被占用,嘗試擷取鎖,添加新的Cell;
如果Cells表非空,找到線程對應的Cell,嘗試通過CAS更新該值;
如果Cells表非空,線程對應的Cell CAS更新失敗,說明存在競争,嘗試擷取自旋鎖之後擴容,将cells數組擴大,降低每個cell的并發量後再試*/
}
// double更long的邏輯基本上是一樣的
final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as;
Cell a;
int n;
long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(Double.doubleToRawLongBits(x));
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs;
int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? Double.doubleToRawLongBits(Double.longBitsToDouble(v) + x)
: Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
} else if (casBase(v = base, ((fn == null) ? Double.doubleToRawLongBits(Double.longBitsToDouble(v) + x)
: Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))
break; // Fall back on using base
}
}
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped641.class;
BASE = UNSAFE.objectFieldOffset(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
// 一個Cell裡面一個value,可以看成是一個簡化的AtomicLong,通過cas操作來更新value的值
// @sun.misc.Contended是一個高端的注解,代表使用緩存行填來避免僞共享
@sun.misc.Contended
static final class Cell {
volatile long value;// cas更新其值
Cell(long x) {
value = x;
}
final boolean cas(long cmp, long val) {// cas更新
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
public class LongAdder1 extends Striped641 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
public LongAdder1() {
}
/*看到這裡我想應該有很多人明白為什麼LongAdder會比AtomicLong更高效了,
沒錯,唯一會制約AtomicLong高效的原因是高并發,高并發意味着CAS的失敗幾率更高,
重試次數更多,越多線程重試,CAS失敗幾率又越高,變成惡性循環,AtomicLong效率降低。
那怎麼解決?** LongAdder給了我們一個非常容易想到的解決方案:減少并發,将單一value的更新壓力分擔到多個value中去
(每個線程更新value數組裡面的自己value【多個線程通路的同一個數組(也隻有一個數組)但是cas更新的是隻是其中一個value】
【因為數組有限,是以不同的線程也會出現同時cas更新一個value的情況】【會出現:多個線程同時通路這個數組的同一個cell】,
不再是多個線程更新同一個value導緻cas經常失敗), 降低單個value的 “熱度”,分段更新 */
/*這樣,線程數再多也會分擔到多個value上去更新,隻需要增加value就可以降低 value的 “熱度”
AtomicLong中的 惡性循環不就解決了嗎? cells 就是這個 “段” cell中的value 就是存放更新值的,
這樣,當我需要總數時,把cells 中的value都累加一下不就可以了麼!!*/
/*當然,聰明之處遠遠不僅僅這裡,在看看add方法中的代碼,casBase方法可不可以不要,直接分段更新,上來就計算 索引位置,然後更新value?
答案是不好,不是不行,因為,casBase操作等價于AtomicLong中的CAS操作,要知道,LongAdder這樣的處理方式是有壞處的,
分段操作必然帶來空間上的浪費,可以空間換時間,但是,能不換就不換,看空間時間都節約~!
是以,casBase操作保證了在低并發時,不會立即進入分支做分段更新操作,因為低并發時,
casBase操作基本都會成功,隻有并發高到一定程度了,才會進入分支,
是以,Doug Lea對該類的說明是:** 低并發時LongAdder和AtomicLong性能差不多,高并發時LongAdder更高效!***/
/*因為低并發時候,使用的是base的原子更新,沒有啟用分段更新(cells=null,并且casBase成功),高并發才啟用分段更新。*/
/*如此,longAccumulate中做了什麼事,也基本略知一二了,因為cell中的value都更新失敗(說明該索引到這個cell的線程也很多
,并發也很高時) 或者cells數組為空時才會調用longAccumulate,*/
// +x,并發計數器LongAdder加X。要麼在base+x更新要麼在Cell[]數組裡面找到對應的Cell+x更新。
public void add(long x) {//base和cells隻有一個,并且是LongAdder的屬性。
Cell[] as;
long b, v;
int m;
Cell a;
//cells!=null不用判斷後面進去(表明已經啟用了分段更新),cells=null并且base的cas更新失敗進去(表示沒有啟用分段更新但是高并發了,
//需要啟用分段更新),cells=null并且base的cas更新成功就退出(沒有啟用分段更新,并且不是高并發,此時跟AotomicLong是一樣的)。
//并發時候更新失敗,AtomicLong的處理方式是死循環嘗試更新,直到成功才傳回,而LongAdder則是進入這個分支。
if ((as = cells) != null || !casBase(b = base, b + x)/*cas把base的值從b變成b+x*/) {
//進來:1.已經啟用分段更新了。2.沒有啟用分段更新但是cas失敗了表示高并發了。否則:沒有啟用分段更新并且不是高并發,就不進來。
boolean uncontended = true;
if (as == null //cells=null進去,沒有啟用分段更新(進來了)表示高并發了。
|| (m = as.length - 1) < 0 //cells.length<=0,沒有啟用分段更新(進來了)表示高并發了。
|| (a = as[getProbe() & m]) == null //對as的長度取餘,從as中擷取這個線程對應的a Cell。=null表示還沒有這個線程對應的cell,
|| !(uncontended = a.cas(v = a.value, v + x))) //a這個Cell裡面的value增加x失敗, 更新成功就不會進下面了。
//1.cells=null。2.cells!=null但沒有這個線程的Cell。2.有這個線程的Cell但是更新失敗了。
longAccumulate(x, null, uncontended); //uncontended=false表示更新失敗了,=true表示沒有這個線程的Cell(不可能是更新成功了,更新成功就進不來這裡)。
}
}
public void increment() {
add(1L);
}
public void decrement() {
add(-1L);
}
//将多個cell數組中的值加起來的和就類似于AtomicLong中的value
// 此傳回值可能不是絕對準确的,因為調用這個方法時還有其他線程可能正在進行計數累加,
// 方法的傳回時刻和調用時刻不是同一個點,在有并發的情況下,這個值隻是近似準确的計數值
// 高并發時,除非全局加鎖,否則得不到程式運作中某個時刻絕對準确的值,但是全局加鎖在高并發情況下是下下策
// 在很多的并發場景中,計數操作并不是核心,這種情況下允許計數器的值出現一點偏差,此時可以使用LongAdder
// 在必須依賴準确計數值的場景中,應該自己處理而不是使用通用的類。
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
public void reset() {
Cell[] as = cells;
Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
public long sumThenReset() {
Cell[] as = cells;
Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
public String toString() {
return Long.toString(sum());
}
public long longValue() {
return sum();
}
public int intValue() {
return (int) sum();
}
public float floatValue() {
return (float) sum();
}
public double doubleValue() {
return (double) sum();
}
private static class SerializationProxy implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
private final long value;//LongAdder1的總和
SerializationProxy(LongAdder1 a) {
value = a.sum();
}
private Object readResolve() {
LongAdder1 a = new LongAdder1();
a.base = value;
return a;
}
}
private Object writeReplace() {
return new SerializationProxy(this);
}
private void readObject(java.io.ObjectInputStream s) throws java.io.InvalidObjectException {
throw new java.io.InvalidObjectException("Proxy required");
}
}
public abstract class Number1 implements java.io.Serializable {
public abstract int intValue();
public abstract long longValue();
public abstract float floatValue();
public abstract double doubleValue();
/*
System.out.println((byte)127);//127
System.out.println((byte)128);//-128
System.out.println((byte)129);//-127
System.out.println((byte)255);//-1
System.out.println((byte)256);//0
System.out.println((byte)257);//1
*/
public byte byteValue() {
return (byte) intValue();
}
public short shortValue() {
return (short) intValue();
}
private static final long serialVersionUID = -8742448824652078965L;
}