上一篇部落格中,了解了put方法,結尾的時候,有addCount()方法,今天繼續學習。
在map中,容易出現線程安全的,除了put()方法的操作,另一個就是map的擴容機制。 接下來就深入了解concurrentHashMap中的的擴容機制。
在調用addCount()方法的時候,會涉及到transfer()方法的調用,而該方法就是我們今天要深入學習的擴容方法。
transfer()
- 表示 擴容之前table的數組的長度; stride 表示配置設定給線程任務的步長;
- 當nextTab == null 成立:表示目前線程為觸發本次擴容,需要做一些擴容準備工作;不成立:表示目前線程需要擴容
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
- nextn 表示 新數組的長度;
- FWD節點:當某個桶位資料處理完畢後,将此桶位設定為fwd節點,其他寫線程或讀線程看到後,會有不同的邏輯
5.推進标記 / 表示完成标記
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
6.接下來在for循環中,i 表示配置設定給目前線程的任務,執行到的桶位; bound: 表示配置設定給目前線程任務的下界限制
在循環裡面,有幾處節點,第一是while循環;第二個是if (i < 0 || i >= n || i + n >= nextn) 及其他;
先說while循環這個方法,該方法有三個作用
1)給目前線程發配任務區間
2)維護目前線程任務進度(i表示目前線程處理的桶位)
3)維護目前map對象全局範圍内的進度
7. nextIndex 表示配置設定任務開始的下标; nextBound 表示配置設定任務結束的下标
8.該while 裡面的第一個if 成立的話,表示目前線程任務的尚未完成,還有相應的區間的桶位需要處理 --i 就讓目前線程處理下一個桶位
if (--i >= bound || finishing)
advance = false;
- 第二個if 前置條件: 目前線程任務已經完成,或者 未配置設定;條件成立表示對象全局範圍内的桶位都已經配置設定完畢了,沒有區間配置設定了,則設定目前線程的i變量為 -1,跳出循環後,執行退出偏移任務相關的程式.
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
10.第三個if 前置條件 1,目前線程需要配置設定任務區間,2.全局範圍内,還有桶位尚未遷移; 如果條件為true 說明給目前線程配置設定任務成功
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
- 接下來主要是描述裡面的另一個方法,if(i < 0 || i >= n || i + n >= nextn ); 在該方法中 , sizeCtl 的值為 擴容數組大小的0.75的門檻值
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
12.接下來的方法,與上面兩個方法不同的是,接下來的步驟主要是進行桶位資料的偏移工作處理,處理完畢後,advance 為true,表示繼續推薦,然後就會回到for裡面; 能進入到下面的步驟中的前置條件為:目前線程任務尚未完成,正在進行中
- CASE 2: 為true時, 說明目前桶位未存放資料,隻需要将此設定為fwd節點即可
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
14.CASE 3: 為true時,說明目前桶位已經遷移過了,目前線程不用處理,直接再次更新目前線程任務索引 再次處理下一個桶位或者其他操作
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
-
CASE 4 目前桶位有資料,且node節點不是fwd節點,說明這些資料需要遷移;
該方法中會分别處理連結清單桶位;和紅黑樹的代理結點
進入到該方法中,先進行synchtonized進行鎖住頭結點,再進行處理業務時,又進行了一次判斷;判斷目前頭對象,是否是加鎖對象,這樣做的原因是,防止在加鎖頭對象之前,目前桶位頭對象被其他線程修改過,導緻目前加鎖對象錯誤。
16.連結清單桶位進行偏移處理
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
- 紅黑樹 代理結點 TreeBin
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
整個擴容的機制流程如圖所示
具體的描述如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n 表示 擴容之前table的數組的長度
// stride 表示配置設定給線程任務的步長
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 條件成立:表示目前線程為觸發本次擴容,需要做一些擴容準備工作
// 不成立: 表示目前線程是寫出擴容的線程
if (nextTab == null) { // initiating
try {
// 春暖國家一個比擴容之前大一倍的table
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 指派給對象屬性 nextTable,翻倍協助擴容線程,拿到新表
nextTable = nextTab;
// 記錄遷移資料整體位置的一個标記,index 技術是從1開始計算的
transferIndex = n;
}
// 表示新表數組的長度
int nextn = nextTab.length;
// FWD節點:當某個桶位資料處理完畢後,将此桶位設定為fwd節點,其他寫線程或讀線程看到後,會有不同的邏輯
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 推進标記
boolean advance = true;
// 表示完成标記
boolean finishing = false; // to ensure sweep before committing nextTab
// i 表示配置設定給目前線程的任務,執行到的桶位
// bound: 表示配置設定給目前線程任務的下界限制
for (int i = 0, bound = 0;;) {
// f 表示桶位頭結點
// fh 表示頭結點的hash
Node<K,V> f; int fh;
// 1.給目前線程發配任務區間
// 2.維護目前線程任務進度(i表示目前線程處理的桶位)
// 3. 維護目前map對象全局範圍内的進度
while (advance) {
// 配置設定任務區間的變量
// nextIndex 配置設定任務開始的下标
// nextBound 配置設定任務結束的下标
int nextIndex, nextBound;
//CASE 1:
// 條件一: --i >= bound 成立:表示目前線程任務的尚未完成,還有相應的區間的桶位需要處理 --i 就讓目前線程處理下一個桶位
// 不成立: 目前線程任務已經完成,或者 未配置設定
if (--i >= bound || finishing)
advance = false;
// CASE 2: 前置條件: 目前線程任務已經完成,或者 未配置設定
// 條件成立,表示對象全局範圍内的桶位都已經配置設定完畢了,沒有區間配置設定了
// 則設定目前線程的i變量為 -1,跳出循環後,執行退出偏移任務相關的程式
// 條件不成立: 說明對象全局範圍内還有未配置設定的桶位,
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// CASE 3: 前置條件 1,目前線程需要配置設定任務區間,2.全局範圍内,還有桶位尚未遷移
// true: 說明給目前線程配置設定任務成功
// false: 說明配置設定給目前線程失敗,應該是其他線程發生了競争
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 處理線程任務後,線程退出transfer方法的邏輯
// 條件一成立: 表示目前線程未配置設定到任務
if (i < 0 || i >= n || i + n >= nextn) {
// 儲存sizeCtl 的變量
int sc;
//
if (finishing) {
nextTable = null;
table = nextTab;
// sizeCtrl = 0.75 * 2n 擴容後的數組的大小的 0.75的門檻值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// true:說明設定sizeCtl 低16位 -1 成功,目前線程可以正常退出
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// (sc - 2) != resizeStamp(n) 為true時: 說明目前線程不是最後一個結束的線程
// false時: 是退出transfer任務的線程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 正常退出
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 下方表示: 線程處理一個桶位資料的偏移工作,處理完畢以後,
// advance 為true,表示繼續推薦,然後就會回到for裡面
// 前置條件: 目前線程任務尚未完成,正在進行中
//CASE 2:
// true: 說明目前桶位未存放資料,隻需要将此設定為fwd節點即可
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// CASE 3:
// true: 說明目前桶位已經遷移過了,目前線程不用處理,直接再次更新目前線程任務索引 再次處理下一個桶位或者其他操作
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 前置條件: 目前桶位有資料,且node節點不是fwd節點,說明這些資料需要遷移
// CASE 4
else {
// 加鎖目前桶位的頭結點
synchronized (f) {
// 判斷目前頭對象,是否是加鎖的頭對象,
// 防止在加鎖頭對象之前,目前桶位頭對象被其他線程修改過,導緻目前加鎖對象錯誤
if (tabAt(tab, i) == f) {
// ln 表示低位連結清單引用
// hn 表示高位連結清單引用
Node<K,V> ln, hn;
// 條件成立: 表示目前桶位是連結清單桶位
if (fh >= 0) {
// lastRun 機制:
// 可以擷取出目前連結清單末尾連續高位不變的node
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 條件成立: 說明lastRun的連結清單為 低位連結清單,那麼就是讓 ln 指向低位連結清單
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 否則,說明lastRun引用連結清單為 高位連結清單, 就讓hn指向高位連結清單
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 條件成立: 表示目前桶位是紅黑樹 代理結點 TreeBin
else if (f instanceof TreeBin) {
// 轉換頭結點為 TreeBin 引用 t
TreeBin<K,V> t = (TreeBin<K,V>)f;
// lo : 低位雙向連結清單,指向低位連結清單的頭
// loTail : 指向低位連結清單的尾巴
// hi: 高位雙向連結清單,指向高位連結清單的頭
// hiTail: 指向高位連結清單的尾巴
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
// lc 表示低位連結清單元素數量
// hc 表示高位連結清單元素數量
int lc = 0, hc = 0;
// 疊代TreeBin中的雙向連結清單,從頭結點 至 尾結點
for (Node<K,V> e = t.first; e != null; e = e.next) {
// h 表示目前循環節點的 hash值
int h = e.hash;
// p 使用目前節點建構出來的新的 TreeNode
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 條件成立, 表示目前循環節點,屬于低位連結清單節點
if ((h & n) == 0) {
// 條件成立:說明低位連結清單 沒有資料
if ((p.prev = loTail) == null)
lo = p;
// 低位連結清單 已經有資料了,此時目前元素 追加到低位連結清單的末尾
else
loTail.next = p;
// 将低位連結清單 尾指針 指向 p 節點
loTail = p;
++lc;
}
// 否則,目前節點屬于高位連結清單節點
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}