源碼角度了解ConcurrentHashMap
ConcurrentHashMap大家都知道,它的資料結構前期是連結清單後期是紅黑樹,我們通過節點類型是Node節點和TreeNode節點可以知道它目前的結構是連結清單還是紅黑樹,ConcurrentHashMap為什麼使用紅黑樹呢?說白了,當元素變多的時候,紅黑樹能有更好的查詢和更新速度,還能解決Hash沖突的問題
ConcurrentHashMap是使用CAS和synchronized來實作線程安全的,從以下的源碼中我們就能了解到。
ConcurrentHashMap的結構如下:
put()方法
- 如果tab為空,調用initTable()方法進行初始化
- 如果tab不為空,就判斷所在的槽是否為空,如果是的話,說明是第一個元素,就調用casTabAt()方法直接建立節點添加到Node數組中就可以了
- 如果正在擴容,就幫助擴容
- 如果沒有擴容也不為空,就把元素插入槽中,先使用synchronized進行加鎖,這個鎖的粒度就是數組的具體的一個元素,fh是目前索引位置的hash值,如果大于等于0,說明是連結清單,否則是紅黑樹。連結清單插入會對binCount加一操作,新元素插入尾部,如果key相同覆寫原來的值
- 判斷binCount是否大于等于TREEIFY_THRESHOLD(值為8) ,這時候調用treeifyBin()方法考慮将連結清單轉換為紅黑樹,真正要轉為紅黑樹還要求數組長度大于64
initTable()方法初始化數組
初始化ConcurrentHashMap,使用sizeCtl記錄大小
sizeCtl為-1表示ConcurrentHashMap正在初始化
sizeCtl小于-1表示正在擴容,-n表示有n-1個線程正在擴容
sizeCtl為正數,如果tab=null表示未初始化之前的初始容量,如果已經初始化,sizeCtl存儲的是下一次擴容的門檻值0.75n
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- sizeCtl的值指派給sc,判斷sc是否小于0,小于0就讓出CPU,小于0表示ConcurrentHashMap可能正在擴容或初始化,我們不妨讓出CPU。
- 如果不小于0,設定sizeCtl為-1,表示正在初始化
- 如果設定了初始容量就使用初始容量,沒有設定使用預設初始容量16
- 建立長度為n的Node數組,指派給tab,
- sc設定為n的0.75倍
- 最終指派将sc指派給sizeCtl,此時sizeCtl記錄擴容門檻值
- 傳回tab,至此初始化ConcurrentHashMap結束
值得注意的是這段有兩行(tab = table) == null || tab.length == 0校驗數組是否為空,這裡是單例模式雙重檢查鎖DCL(double-checked locking)的展現
treeifyBin()擴容或轉為紅黑樹
在給定索引處替換 bin 中的所有連結節點轉為紅黑樹,如果數組太小,調整數組大小擴容。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
- 判斷數組的長度,如果長度小于MIN_TREEIFY_CAPACITY(值為64),調用tryPresize()進行嘗試擴容,擴容到原來的兩倍,是以擴容後元素的位置是i或者i+n。
- 如果長度達到64,就對數組元素(也就是連結清單的頭結點)加鎖,周遊連結清單,轉換為紅黑樹
tryPresize()方法
- 對擴容數組的長度進行判斷,看是否達到最大容量,如果達到最大容量設定成最大容量,如果沒有就調用tableSizeFor()方法将擴容大小轉換為2的n次幂
- 如果tab為空,進行初始化數組
- 如果擴容長度小于門檻值或者資料長度大于等于最大容量了,表示擴容完成了
-
以上都不滿足就進行擴容,如果sc小于0表示正在擴容,那麼就進行幫助擴容,否則就調用transfer()方法進行擴容
ransfer()方法中
stride表示了一個線程擴容的長度,步長和CPU和核數有關,最小步長為16位,每個線程擴容這一段,
變量transferIndex記錄擴容的進度,初始值是n,當一個線程擴容完後減去stride的值,到0 的時候表示擴容完成,由于transferIndex可能被多個線程操作,所有使用CAS方法compareAndSwapInt()改變transferIndex的值。
在擴容過程中,ForwardingNode連接配接了兩個數組,如果需要通路key對應的value的話,會通路ForwardingNode來擷取資料。
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
總結
❤️ 感謝大家
- 歡迎關注我❤️,點贊👍🏻,評論🤤,轉發🙏
- 有不當之處歡迎批評指正。