現在notion寫的筆記,然後遷移過來,注釋不對等湊合看,好不好嘛
相對于 atmoic,并發性能更牆,追求最終一緻性,但是單一操作沒有atmoic性能好
它是将所有并發所要儲存的值放在一個base中, ,然後進行增減工作
将并發的線程,設計為一個數組,并将所要累加的值放入base中,base是cas操作,根據數組中元素是否為空,來進行最終的增改,多線程較多情況下,還會進行擴容,依然是二的次方幂,主要采用多重判斷,和手動加鎖機制,來進行
主方法add()
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 數組已經初始化,初始化直接走第二個條件寫入資料
// 寫入資料失敗,發生了競争或者需要擴容
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 預設無競争
boolean uncontended = true;
// 1 : 數組未初始化,需要初始化
// 2 : 目前要放入下标值為空,需要建立
// 3 : 寫入值是發生了競争關系
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
核心方法longAccumulate()
// wasUncontended目前線程競争失敗才會是true
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 目前線程hash值還未配置設定
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
// 預設情況下放在0位置,未初始化,不當做一次競争
wasUncontended = true;
}
// false 表示會擴容, true 可能會擴容
boolean collide = false; // True if last slot nonempty
for (;;) {
// as 目前cell數組,cell目前需要使用的cell,n cell數組的長度,v 期望值
Cell[] as; Cell a; int n; long v;
// 數組已經初始化,而且目前數組下标資料為null,需要将元素防到相應位置
if ((as = cells) != null && (n = as.length) > 0) {
// 1.1 : 下标位置為空,可以進行指派
if ((a = as[(n - 1) & h]) == null) {
// 目前處于未加鎖狀态
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 多線程多重判斷,未加鎖狀态,并且擷取鎖
if (cellsBusy == 0 && casCellsBusy()) {
// 建立新元素是否成功
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
//重複判斷多線程情況下,該位置是否被其他線程所指派,放置資料覆寫
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 不論是否放置元素成功,都要釋放鎖
cellsBusy = 0;
}
if (created)
break;
// 未成功,自旋
continue; // Slot is now non-empty
}
}
collide = false;
}
// 1.2 多線程時寫入資料失敗才 為false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 1.3 目前線程rehash過 線程hash值, 并且新命中的cell數組下标已經存在值
// true 則表示設定成功 false 則表示依然存在競争
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 1.4 n大于cpu總值,不再擴容了, cells 已經被其線程擴容過了,rehash重試
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 1.5 -> 1.4 之後近來,将該值設定true,有擴容意向
else if (!collide)
collide = true;
// 1.6 擴容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 沒有被其他線程擴容
if (cells == as) { // Expand table unless stale
// 擴容
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 擴容是否成功都需要釋放鎖
cellsBusy = 0;
}
// 關閉擴容意向
collide = false;
// 自旋重試
continue; // Retry with expanded table
}
// 重置線程hash值
h = advanceProbe(h);
}
// 初始化cell數組
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 其他線程正在初始化cell數組,需要将值累加到base
// 線程被上鎖,搶占失敗,需要将值累加到base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}