final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
ConcurrentHashMap——擴容機制
/**
* x,擴容一個
* check,檢查原節點的基數(比如連結清單原來是地盤上個,單節點是1個,紅黑樹是節點個數)
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
/**
* @(1)并發基礎size的處理
* 用于傳回size以及各種擴容的判斷基礎
* BASECOUNT:偏移位址
* baseCount:期望值
* s:最終值等于原值+1
* counterCells as 更新的失敗線程
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
/**
* @多線程修改baseCount時,用LongAdder方法實作數字并發,修改完後直接return整個方法
* 拆分成多個cell然後合并數字增加并發效率
* 最後也是直接return
*/
fullAddCount(x, uncontended);//
return;
}
/**
* @并發失敗一次,如果隻是單節點增加
* 也是直接return,否則就将并發修改的數量累加起來進行擴容判斷
*/
if (check <= 1)
return;
s = sumCount();
}
/**
* @并發擴容的處理
* 沒有baseCount的并發問題就進入擴容判斷流程
*/
if (check >= 0) {
/**
* tab 數組指針
* nt 指向分段數組的指針
* n 數組的長度指針
* sc 擴容閥值
*/
Node<K,V>[] tab, nt; int n, sc;
/**
* 滿足擴容閥值進行擴容操作
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
/**
* @sc<0表示需要進行并發擴容了
* 并發擴容時,nextTable可能還未必初始化或已經被初始化是以帶個nt過來
*/
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
/**
* @利用時間戳判斷此時隻有一個線程正在進行擴容
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
/**
* @n: 原table數組長度
* @nextTab: 是用來辨別是不是第一個進來擴容的線程,是的話就需要初始化一下nextTale,由于volative是實時共享,但是我們隻想知道移動前是不是null是以直接将指針複制進來
* @stride: 利用cpu空閑情況,算出此處線程應該應該拿到多個個任務
*/
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
/**
* @(1)為空就建構一個nextTable為table的兩倍
*/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//将transferIndex等于數組長度
}
/**
* @建構一個死循環完成複制功能
*/
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
/**
* @nextn: 擴容的數組長度;
* @fwd: 将剛複制好的talbe包裝一個ForwardingNode節點
* @advance:
* @finishing: 所有的節點都已經完成複制工作的标記
* @i: 每個線程的内置的計數器,用于判斷自己的任務執行到哪裡
* @bound: 每個線程的内置的哈希數量,用于判斷自己的任務是否執行完
* f 擴容線程會根據transferIndex任務标記号,周遊拿到屬于自己多個節點
* fh 每次周遊的節點的哈希
*/
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
/**
* @(1)主要将transferIndex同步給bound,然後不斷i說明任務執行的進度
* 每個擴容線程先拿到任務号transferIndex
* 讓并發的擴容線程擷取到transferIndex拿到遷移任務
*/
while (advance) {
//更新遷移索引i。
int nextIndex, nextBound;
/**
* 執行完了标記需要直接退出循環
*/
if (--i >= bound || finishing)
advance = false;
/**
* transferIndex<=0表示已經沒有需要遷移的hash桶,将i置為-1,線程準備退出
*/
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
/*
* 更新自己的bound嘗試更新transferIndex
*/
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/**
* @利用算法判斷是否所有哈希捅被遷移完了
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
/**
* @如果已經完成複制
* 就将nextTable指派為null,table指針指向我們的nextTab
* 然後擴容閥值增大
*/
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/**
第一個擴容的線程,執行transfer方法之前,會設定 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
後續幫其擴容的線程,執行transfer方法之前,會設定 sizeCtl = sizeCtl+1
每一個退出transfer的方法的線程,退出之前,會設定 sizeCtl = sizeCtl-1
那麼最後一個線程退出時:
必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
/**
* 空節點直接插入
*/
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
/**
* 如果周遊到ForwardingNode節點 ,說明這個點已經被處理過,直接跳過
*/
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
/**
* 非空節點擴容需要加鎖,但很明細對象鎖自己的線程有效(每個線程的任務分割不一樣),實作了分段鎖的
*/
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}