一、與JDK1.7的差別
ConcurrentHashMap的實作與1.7版本有很大的差别,放棄了段鎖的概念,借鑒了HashMap的資料結構:數組+連結清單+紅黑樹。ConcurrentHashMap不接受nullkey和nullvalue。
1.1 資料結構
數組+連結清單+紅黑樹
1.2 并發原理
cas樂觀鎖+synchronized鎖
CAS是compare and swap的縮寫,即我們所說的比較交換。cas是一種基于鎖的操作,而且是樂觀鎖。在java中鎖分為樂觀鎖和悲觀鎖。悲觀鎖是将資源鎖住,等一個之前獲得鎖的線程釋放鎖之後,下一個線程才可以通路。而樂觀鎖采取了一種寬泛的态度,通過某種方式不加鎖來處理資源,比如通過給記錄加version來擷取資料,性能較悲觀鎖有很大的提高。
CAS 操作包含三個操作數 —— 記憶體位置(V)、預期原值(A)和新值(B)。如果記憶體位址裡面的值和A的值是一樣的,那麼就将記憶體裡面的值更新成B。CAS是通過無限循環來擷取資料的,若果在第一輪循環中,a線程擷取位址裡面的值被b線程修改了,那麼a線程需要自旋,到下次循環才有可能機會執行。
看完你就明白的鎖系列之自旋鎖
我們常說的 CAS 自旋鎖是什麼
加鎖對象:數組每個位置的頭節點。
1.3 放棄了分段鎖
JDK8中徹底放棄了Segment轉而采用的是Node,其設計思想也不再是JDK1.7中的分段鎖思想。
Node:儲存key,value及key的hash值的資料結構。其中value和next都用volatile修飾,保證并發的可見性。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
...
}
二、ConcurrentHashMap結構
JDK1.8的實作已經摒棄了Segment的概念,而是直接用Node數組+連結清單+紅黑樹的資料結構來實作,并發控制使用Synchronized和CAS來操作,整個看起來就像是優化過且線程安全的HashMap,雖然在JDK1.8中還能看到Segment的資料結構,但是已經簡化了屬性,隻是為了相容舊版本。
ConcurrentHashMap的基本屬性:
// node數組最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30 ;
// 預設初始值,必須是2的幂數
private static final int DEFAULT_CAPACITY = 16 ;
//數組可能最大值,需要與toArray()相關方法關聯
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 ;
//并發級别,遺留下來的,為相容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 負載因子
private static final float LOAD_FACTOR = 0.75f;
// 連結清單轉紅黑樹閥值,> 8 連結清單轉換為紅黑樹
static final int TREEIFY_THRESHOLD = 8 ;
//樹轉連結清單閥值,小于等于6(tranfer時,lc、hc=0兩個計數器分别++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
//最小轉換成樹,hash桶最少需要64
static final int MIN_TREEIFY_CAPACITY = 64;
//擴容線程每次最少要遷移16個hash桶
private static final int MIN_TRANSFER_STRIDE = 16;
//在sizeCtl中記錄大小戳的位移位
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大線程數
private static final int MAX_RESIZERS = ( 1 << ( 32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中記錄size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32-RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED =-1;
// 樹根節點的hash值
static final int TREEBIN= -2;
// ReservationNode的hash值
static final int RESERVED = -3;
// 可用處理器數量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數組
transient volatile Node<K,V>[] table;
//擴容時,将table中的元素遷移至nextTable;隻有在調整大小時才非空
private transient volatile Node<K,V>[] nextTable;
/*控制辨別符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
*當為負數時:-1代表正在初始化,-N代表有N-1個線程正在 進行擴容
*當為0時:代表當時的table還沒有被初始化
*當為正數時:表示初始化或者下一次進行擴容的大小
*/
private transient volatile int sizeCtl;
//當resizing的時候下一個tab下标索引值(目前值+1)
private transient volatile int transferIndex;
基本屬性定義了ConcurrentHashMap的一些邊界以及操作時的一些控制,下面看一些内部的一些結構組成,這些是整個ConcurrentHashMap整個資料結構的核心。
2.1 Node
Node是ConcurrentHashMap存儲結構的基本單元,繼承于HashMap中的Entry,用于存儲資料,源代碼如下:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//val和next都會在擴容時發生變化,是以加上volatile來保持可見性和禁止重排序
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//不允許更新value
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
//用于map中的get()方法,子類重寫
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
Node資料結構很簡單,從上可知,就是一個連結清單,但是隻允許對資料進行查找,不允許進行修改。
2.2 TreeNode
TreeNode繼承與Node,但是資料結構換成了二叉樹結構,它是紅黑樹的資料的存儲結構,用于紅黑樹中存儲資料,當連結清單的節點數大于8時會轉換成紅黑樹的結構,他就是通過TreeNode作為存儲結構代替Node來轉換成黑紅樹源代碼如下:
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
//标志紅黑樹的紅節點
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
//根據key查找 從根節點開始找出相應的TreeNode,
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
2.3 TreeBin
TreeBin從字面含義中可以了解為存儲樹形結構的容器,而樹形結構就是指TreeNode,是以TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制,部分源碼結構如下:
static final class TreeBin<K,V> extends Node<K,V> {
//指向TreeNode清單和根節點
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 讀寫鎖狀态
static final int WRITER = 1 ; // 擷取寫鎖的狀态
static final int WAITER = 2 ; // 等待寫鎖的狀态
static final int READER = 4 ; // 增加資料時讀鎖的狀态
/**
* 初始化紅黑樹
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
......
}
2.4 ForwardingNode
ForwardingNode在轉移的時候放在頭部的節點,是一個空節點
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
2.5 ConcurrentHashMap無參構造方法
下面來看一下ConcurrentHashMap的無參構造方法:
public ConcurrentHashMap() {
}
由上你會發現ConcurrentHashMap的初始化其實是一個空實作,并沒有做任何事,這裡後面會講到,這也是和其他的集合類有差別的地方,初始化操作并不是在構造函數實作的,而是在put操作中實作,當然ConcurrentHashMap還提供了其他的構造函數,有指定容量大小或者指定負載因子,跟HashMap一樣,這裡就不做介紹了。
2.6 ConcurrentHashMap幾個重要方法
/*
* 用來傳回節點數組的指定位置的節點的原子操作
*/
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/*
* cas原子操作,在指定位置設定值
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/*
* 原子操作,在指定位置設定值
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
三、put方法
/*
* 單純的額調用putVal方法,并且putVal的第三個參數設定為false
* 當設定為false的時候表示這個value一定會設定
* true的時候,隻有當這個key的value為空的時候才會設定
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//分散Hash;擾動函數同HashMap相同,但又增加了一次與操作
int hash = spread(key.hashCode());
//用來計算在這個節點總共有多少個元素,用來控制擴容或者轉移為樹
int binCount = 0;
//這是一個死循環,可能的出口下面有辨別:@1,@2,@3,@4
// 死循環,直到插入成功再跳出,因為如果其他線程正在修改tab,那麼嘗試就會失敗,是以這邊要加一個for循環,不斷的嘗試
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//第一次put的時候table沒有初始化,則初始化table
tab = initTable();
//數組的第一個元素為空,則指派
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//這裡使用了CAS,避免使用鎖。如果CAS失敗,說明該節點已經發生改變,
//可能被其他線程插入了,那麼繼續執行死循環,在鍊尾插入。注意這個時候是沒有加鎖的
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
//@1:可能的出口1
break; // no lock when adding to empty bin
}
/*
* 如果檢測到某個節點的hash值是MOVED,則表示正在進行數組擴張的資料複制階段,
* 則目前線程也會參與去複制,通過允許多線程複制的功能,一次來減少數組的複制所帶來的 性能損失
*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
/*
* 如果在這個位置有元素的話,就采用synchronized的方式加鎖,
* 如果是連結清單的話(hash大于0),就對這個連結清單的所有元素進行周遊,
* 如果找到了key和key的hash值都一樣的節點,則把它的值替換到
* 如果沒找到的話,則添加在連結清單的最後面
* 否則,是樹的話,則調用putTreeVal方法添加到樹中去
*
* 在添加完之後,會對該節點上關聯的的數目進行判斷,
* 如果在8個以上的話,則會調用treeifyBin方法,來嘗試轉化為樹,或者是擴容
*/
V oldVal = null;
//synchronized鎖。這裡要注意,不會出現
//桶正在resize的過程中執行插入,因為桶resize的時候
//也請求了synchronized鎖。即如果該桶正在resize,這裡會發生鎖等待
synchronized (f) {
//如果是連結清單的首個節點
if (tabAt(tab, i) == f) {
//取出來的元素的hash值大于0,當轉換為樹之後,hash值為-2
if (fh >= 0) {
binCount = 1;
//周遊這個連結清單
for (Node<K,V> e = f;; ++binCount) {
K ek;
//找到相等的元素更新其value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//當使用putIfAbsent的時候,隻有在這個key沒有設定值得時候才設定
if (!onlyIfAbsent)
e.val = value;
//@2:可能的出口2
break;
}
Node<K,V> pred = e;
//否則添加到連結清單尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
//@3:可能的出口3
break;
}
}
}
//表示已經轉化成紅黑樹類型了
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//調用putTreeVal方法,将該元素添加到樹中去
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//當在同一個節點的數目達到8個的時候,則擴張數組或将給節點的資料轉為tree
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
//@4:可能的出口4
break;
}
}
}
//計數
addCount(1L, binCount);
return null;
}
putValue過程描述:
- 根據key的hash值定位到桶的位置;
- 判斷if(table==null),先初始化table;
- 死循環。判斷if(table[i]==null),cas添加元素。成功則跳出循環,失敗則進入下一輪for循環。
- 判斷是否有其他線程在擴容table(取出來的節點的hash值是MOVED(-1)的話,則表示目前正在對這個數組進行擴容),有則幫忙擴容,擴容完成再添加元素。進入真正的put步驟
- 真正的put步驟。桶的位置不為空,周遊該桶的連結清單或者紅黑樹,若key已存在,則覆寫;不存在則将key插入到連結清單或紅黑樹的尾部。具體措施參見6
-
如果這個節點,不為空,也不在擴容,則通過synchronized來加鎖,進行添加操作: 判斷目前取出的節點位置存放的是連結清單還是樹: ——如果是連結清單的話,則周遊整個連結清單,直到取出來的節點的key來個要放的key進行比較,如果key相等,并且key的hash值也相等的話,則說明是同一個key,則覆寫掉value,否則的話則添加到連結清單的末尾; ——如果是樹的話,則調用putTreeVal方法把這個元素添加到樹中去
- 最後在添加完成之後,會判斷在該節點處共有多少個節點(注意是添加前的個數),如果達到8個以上了的話,則調用treeifyBin方法來嘗試将處的連結清單轉為樹,或者擴容數組。
并發問題:假如put操作時正好有别的線程正在對table數組(map)擴容怎麼辦?
—— 答:暫停put操作,先幫助其他線程對map擴容。
三、ConcurrentHashMap的擴容詳解(太難啃了,目前還沒有啃得動,有機會再補)
3.1 helpTransfer方法
在上述put方法中,如果檢測到某個節點的hash值是MOVED(-1),會調用helpTransfer擴容方法,源碼如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//tab為空時,則說明擴容已經完成
//hash for forwarding nodes,說明這個為了移動節點而準備的常量。
/*如果 table 不是空 且 node 節點是轉移類型,資料檢驗
*且 node 節點的 nextTable(新 table) 不是空,同樣也是資料校驗,
*嘗試幫助擴容
*/
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根據 length 得到一個辨別符号
int rs = resizeStamp(tab.length);
// 如果 nextTab 沒有被并發修改 且 tab 也沒有被并發修改
// 且 sizeCtl < 0 (說明還在擴容)
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 如果 sizeCtl 無符号右移 16 不等于 rs ( sc前 16 位如果不等于辨別符,則辨別符變化了)
// 或者 sizeCtl == rs + 1 (擴容結束了,不再有線程進行擴容)(預設第一個線程設定 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會将 sc 減一。這個時候,sc 就等于 rs + 1)
// 或者 sizeCtl == rs + 65535 (如果達到最大幫助線程的數量,即 65535)
// 或者轉移下标正在調整 (擴容結束)
// 結束循環,傳回 table
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 如果以上都不是, 将 sizeCtl + 1, (表示增加了一個線程幫助其擴容)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 進行轉移
transfer(tab, nextTab);
// 結束循環
break;
}
}
return nextTab;
}
return table;
}
-
這裡解釋一下為什麼nextTable為空,說明擴容已經完成。
——table是原數組的位址,如果為空,說明成員變量table已經已經被新的數組替代,而老數組會被清空。
- resizeStamp方法下面會有介紹。
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0);這就代碼的作用: 判斷擴容是否結束或者并發擴容線程數是否已達最大值,如果是的話直接結束while循環。具體看上面注釋
在 Node 的子類 ForwardingNode 的構造方法中,可以看到這個變量作為 hash 值進行了初始化。
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
疑問:
1.nextTab屬性按我的了解是新hash桶數組,f是目前key經過hash後在原hash桶中的位置,為什麼要将f.nextTable的值賦給nextTab?說不通啊,一個是節點的屬性,一個是存放節點的數組。
3.2 resizeStamp方法
在helpTransfer方法中會調用resizeStamp方法生成rs。resizeStamp方法源碼如下
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
numberOfLeadingZeros(n)傳回的是n的二進制辨別的從高位開始到第一個非0的數字的之間0的個數,比如numberOfLeadingZeros(8)傳回的就是28 ,因為0000 0000 0000 0000 0000 0000 0000 1000在1前面有28個0
RESIZE_STAMP_BITS 的值是16,1 << (RESIZE_STAMP_BITS - 1)就是将1左移位15位,0000 0000 0000 0000 1000 0000 0000 0000
然後将兩個數字再按位或,将相當于 将移位後的 兩個數相加。
舉個例子 :
8的二進制表示是: 0000 0000 0000 0000 0000 0000 0000 1000 = 8
7的二進制表示是: 0000 0000 0000 0000 0000 0000 0000 0111 = 7
按位或的結果就是:0000 0000 0000 0000 0000 0000 0000 1111 =15
相當于 8 + 7 =15
為什麼會出現這種效果呢?因為8是2的整數次幂,也就是說8的二進制表示隻會在某個高位上是1,其餘地位都是0,是以在按位或的時候,低位表示的全是7的位值,是以出現了這種效果。
3.3 transfer方法
//複制元素到nextTab
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//stride是步長,transfer會依據stride,把table分為若幹部分,依次處理,好讓多線程能協助transfer
int n = tab.length, stride;
//NCPU為CPU核心數,每個核心均分複制任務,如果均分小于16個
//那麼以16為步長分給處理器:例如0-15号給處理器1,16-32号分給處理器2。處理器3就不用接任務了。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
/*
* 如果複制的目标nextTab為null的話,則初始化一個table兩倍長的nextTab
* 此時nextTable被設定值了(在初始情況下是為null的)
* 因為如果有一個線程開始了表的擴張的時候,其他線程也會進來幫忙擴張,
* 而隻是第一個開始擴張的線程需要初始化下目标數組
*/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
/*
* 建立一個fwd節點,這個是用來控制并發的,當一個節點為空或已經被轉移之後,就設定為fwd節點
* 這是一個空的标志節點
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//是否繼續向前查找的标志位
boolean advance = true;
// to ensure sweep(清掃) before committing nextTab,在完成之前重新在掃描一遍數組,看看有沒完成的沒
boolean finishing = false; // to ensure sweep before committing nextTab
//開始轉移各個槽
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//STEP1 判斷是否可以進入下一個stride 确認i和bound
//通過stride領取一部分的transfer任務,while循環就是确認邊界
while (advance) {
int nextIndex, nextBound;
//認領的部分已經被執行完(一個stride執行完)
if (--i >= bound || finishing)
advance = false;
//transfer任務被認領完
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//認領一個stride的任務
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/**
* i < 0 說明要轉移的桶 都已經處理過了
*
*
* 以上條件已經說明 transfer已經完成了
*/
//transfer 結束
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果完成整個 transfer的過程 清空nextTable 讓table等于擴容後的數組
if (finishing) {
nextTable = null;
table = nextTab;
//0.75f * n 重新計算下次擴容的門檻值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//一個線程完成了transfer
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果還有其他線程在transfer ,,就不能設定finish為true,先傳回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//說明這是最後一個在transfer的線程 是以finish标志被置為 true
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果該節點為null,則對該節點的遷移立馬完成,設定成forwardNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else { //開始遷移該節點
synchronized (f) {
if (tabAt(tab, i) == f) { //double-check
//ln是擴容後依舊保留在原index上的node連結清單;hn是移到index + n 上的node連結清單
Node<K,V> ln, hn;
//普通連結清單
if (fh >= 0) {
/*
* 因為n的值為數組的長度,且是power(2,x)的,是以,在&操作的結果隻可能是0或者n
* 根據這個規則
* 0--> 放在新表的相同位置
* n--> 放在新表的(n+原來位置)
*/
int runBit = fh & n;
Node<K,V> lastRun = f;
//這一次周遊的目的是找到最後一個一個節點,其後的節點hash & N 都不發生改變
//例如 有A->B->C->D,其hash & n 為 0,1,1,1 那就是找到B點
//這樣做的目的是之後對連結清單進行拆分時 C和D不需要單獨處理 維持和B的關系 B移動到新的tab[i]或tab[i+cap]上即可
//還有不了解的可以參考另一作者測試代碼:https://github.com/insaneXs/all-mess/blob/master/src/main/java/com/insanexs/mess/collection/TestConHashMapSeq.java
/*
* lastRun 表示的是需要複制的最後一個節點
* 每當新節點的hash&n -> b 發生變化的時候,就把runBit設定為這個結果b
* 這樣for循環之後,runBit的值就是最後不變的hash&n的值
* 而lastRun的值就是最後一次導緻hash&n 發生變化的節點(假設為p節點)
* 為什麼要這麼做呢?因為p節點後面的節點的hash&n 值跟p節點是一樣的,
* 是以在複制到新的table的時候,它肯定還是跟p節點在同一個位置
* 在複制完p節點之後,p節點的next節點還是指向它原來的節點,就不需要進行複制了,自己就被帶過去了
* 這也就導緻了一個問題就是複制後的連結清單的順序并不一定是原來的倒序
*/
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;//n的值為擴張前的數組的長度
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//如果runBit == 0 說明之前找到的節點應該在tab[i]
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//否則說明之前的節點在tab[i+cap]
else {
hn = lastRun;
ln = null;
}
//上面分析了連結清單的拆分隻用周遊到lastRun的前一節點 因為lastRun及之後的節點已經移動好了
/*
* 構造兩個連結清單,順序大部分和原來是反的
* 分别放到原來的位置和新增加的長度的相同位置(i/n+i)
*/
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//這裡不再繼續使用尾插法而是改用了頭插法 是以連結清單的順序可能會發生颠倒(lastRun及之後的節點不受影響)
if ((ph & n) == 0)
/*
* 假設runBit的值為0,
* 則第一次進入這個設定的時候相當于把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同為0的節點)設定到舊的table的第一個hash計算後為0的節點下一個節點
* 并且把自己傳回,然後在下次進來的時候把它自己設定為後面節點的下一個節點
*/
ln = new Node<K,V>(ph, pk, pv, ln);
/*
* 假設runBit的值不為0,
* 則第一次進入這個設定的時候相當于把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同不為0的節點)設定到舊的table的第一個hash計算後不為0的節點下一個節點
* 并且把自己傳回,然後在下次進來的時候把它自己設定為後面節點的下一個節點
*/
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将新的連結清單移動到nextTab的對應坐标中
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//tab上對應坐标的節點變為ForwardingNode
setTabAt(tab, i, fwd);
advance = true;
}
//紅黑樹節點
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
//同樣拆成兩棵樹
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
/*
* 在複制完樹節點之後,判斷該節點處構成的樹還有幾個節點,
* 如果≤6個的話,就轉回為一個連結清單
*/
//是否需要退化成連結清單
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
到這裡,ConcurrentHashMap的put操作和擴容都介紹的差不多了。
下面的兩點一定要注意:
- 複制之後的新連結清單不是舊連結清單的絕對倒序。
- 在擴容的時候每個線程都有處理的步長,最少為16,在這個步長範圍内的數組節點隻有自己一個線程來處理
擴容過程:
transfer的代碼比較長,我們也一部分一部分的分析各段代碼的作用。
1.首先,最先發起擴容的線程需要對數組進行翻倍,然後将翻倍後得到的新數組通過nextTable變量儲存。并且啟用了transferIndex變量,初始值為舊數組的容量n,這個變量會被用來标記已經被認領的桶的下标。
擴容過程是從後往前的,是以transferIndex的初始值才是n。并且整個擴容過程依據步長stride,被拆分成個部分,線程從後往前依次領取一個部分,是以每次有線程領取任務,transferIndex總是要被減去一個stride。
2.當線程認領的一個步長的任務完成後,繼續去認領下一個步長,直到transferIndex < 0,說明所有資料都被認領完。
3.當參與擴容的線程發現沒有其他任務能被認領,那麼就會更新sizeCtl為 sizeCtl-1 (說明有一條線程退出擴容)。最後一條線程完成了任務,發現sizeCtl == (resizeStamp(n) << RESIZE_STAMP_SHIFT + 2) ,那麼說明所有的線程都完成了擴容任務,此時需要将nextTable替換為table,重置transferIndex,并計算新的sizeCtl表示下一次擴容的門檻值。
上面介紹了線程每次認領一個步長的桶數負責rehash,這裡介紹下針對每個桶的rehash過程。
1.首先,如果桶上沒有元素或是桶上的元素是ForwardingNode,說明不用處理該桶,繼續處理上一個桶。
2.對于桶上存放正常的節點而言,為了線程安全,需要對桶的頭節點進行上鎖,然後以連結清單為例,需要将連結清單拆為兩個部分,這兩部分存放的位置是很有規律的,如果舊數組容量為oldCap,且節點之前在舊數組的下标為i,那麼rehash連結清單中的所有節點将放在nextTable[i]或者nextTable[i+oldCap]的桶上(這一點可以從之前哈希值中比n最高位還靠前的一位來考慮,目前一位為0時,就落在nextTable[i]上,而前一位為1時,就落在nextTable[i+oldCap])。
同理紅黑樹也會被rehash()成兩部分,如果新的紅黑樹不滿足成樹條件,将會被退化成連結清單。
3.當一個桶的元素被transfer完成後,舊數組相關位置上會被放上ForwardingNode的特殊節點表示該桶已經被遷移過。且ForwardingNode會指向nextTable。
3.4 tryPresize方法
/**
* 擴容表為指可以容納指定個數的大小(總是2的N次方)
* 假設原來的數組長度為16,則在調用tryPresize的時候,size參數的值為16<<1(32),此時sizeCtl的值為12
* 計算出來c的值為64,則要擴容到sizeCtl≥為止
* 第一次擴容之後 數組長:32 sizeCtl:24
* 第二次擴容之後 數組長:64 sizeCtl:48
* 第二次擴容之後 數組長:128 sizeCtl:94 --> 這個時候才會退出擴容
*/
private final void tryPresize(int size) {
/*
* MAXIMUM_CAPACITY = 1 << 30
* 如果給定的大小大于等于數組容量的一半,則直接使用最大容量,
* 否則使用tableSizeFor算出來
* 後面table一直要擴容到這個值小于等于sizeCtrl(數組長度的3/4)才退出擴容
*/
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
/*
* 如果數組table還沒有被初始化,則初始化一個大小為sizeCtrl和剛剛算出來的c中較大的一個大小的數組
* 初始化的時候,設定sizeCtrl為-1,初始化完成之後把sizeCtrl設定為數組長度的3/4
* 為什麼要在擴張的地方來初始化數組呢?這是因為如果第一次put的時候不是put單個元素,
* 而是調用putAll方法直接put一個map的話,在putALl方法中沒有調用initTable方法去初始化table,
* 而是直接調用了tryPresize方法,是以這裡需要做一個是不是需要初始化table的判斷
*/
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
//初始化tab的時候,把sizeCtl設為-1
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
/*
* 一直擴容到的c小于等于sizeCtl或者數組長度大于最大長度的時候,則退出
* 是以在一次擴容之後,不是原來長度的兩倍,而是2的n次方倍
*/
else if (c <= sc || n >= MAXIMUM_CAPACITY)
//退出擴張
break;
else if (tab == table) {
int rs = resizeStamp(n);
/*
* 如果正在擴容Table的話,則幫助擴容
* 否則的話,開始新的擴容
* 在transfer操作,将第一個參數的table中的元素,移動到第二個元素的table中去,
* 雖然此時第二個參數設定的是null,但是,在transfer方法中,當第二個參數為null的時候,
* 會建立一個兩倍大小的table
*/
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
/*
* transfer的線程數加一,該線程将進行transfer的幫忙
* 在transfer的時候,sc表示在transfer工作的線程數
*/
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
/*
* 沒有在初始化或擴容,則開始擴容
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
疑問:下圖的 if (table == tab)處判斷是否有意義?tab=table難道不是引用傳遞嗎,即使table被其他線程改變了,tab應該恒==于table的呀。
tryPresize方法的代碼還沒有完全了解,後續在慢慢去了解。。。
四、擴容方法(接上)
什麼情況會導緻擴容?
1.連結清單轉換為紅黑樹時(連結清單節點個數達到8個可能會轉換為紅黑樹)。如果轉換時map長度小于64則直接擴容一倍,不轉化為紅黑樹。如果此時map長度大于64,則不會擴容,直接進行連結清單轉紅黑樹的操作。
2.map中總節點數大于門檻值(即大于map長度的0.75倍)時會進行擴容。
如何擴容?
1.建立一個新的map,是原先map的兩倍。注意此過程是單線程建立的
2.複制舊的map到新的map中。注意此過程是多線程并發完成。(将map按照線程數量平均劃分成多個相等區域,每個線程負責一塊區域的複制任務)
擴容的具體過程:
答:
注:擴容操作是hashmap最複雜難懂的地方,部落客也是看了很久才看懂個大概。一兩句話真的很難說清楚,建議有時間還是看源碼比較好。網上很少有人使用通俗易懂語言來描述擴容的機制。是以這裡我嘗試用自己的語言做一個簡要的概括,描述一下大體的流程,供大家參考,如果覺得不錯,可以點個贊,表示對部落客的支援,謝謝。
整體思路:擴容是并發擴容,也就是多個線程共同協作,把舊table中的連結清單一個個複制到新table中。
1.給多個線程劃分各自負責的區域。配置設定時是從後向前配置設定。假設table原先長度是64,有四個線程,則第一個到達的線程負責48-63這塊内容的複制,第二個線程負責32-47,第三個負責16-31,第四個負責0-15。
2.每個線程負責各自區域,複制時是一個個從後向前複制的。如第一個線程先複制下标為63的桶的複制。63複制完了接下來複制62,一直向前,直到完成自己負責區域的所有複制。
3.完成自己區域的任務之後,還沒有結束,這時還會判斷一下其他線程負責區域有沒有完成所有複制任務,如果沒有完成,則可能還會去幫助其它線程複制。比如線程1先完成了,這時它看到線程2才做了一半,這時它會幫助線程2去做剩下一半任務。
4.那麼複制到底是怎麼完成的呢?線程之間互相幫忙會導緻混亂嗎?
5.首先回答上面第一個問題,我們知道,每個數組的每個桶存放的是一個連結清單(紅黑樹也可能,這裡隻讨論是連結清單情況)。複制的時候,先将連結清單拆分成兩個連結清單。拆分的依據是連結清單中的每個節點的hash值和未擴容前數組長度n進行與運算。運算結果可能為0和1,是以結果為0的組成一個新連結清單,結果為1的組成一個新連結清單。為0的連結清單放在新table的 i 位置,為1的連結清單放在 新table的 i+n處。擴容後新table是原先table的兩倍,即長度是2n。
6.接着回答上面第二個問題,線程之間互相幫忙不會造成混亂。因為線程已完成複制的位置會标記該位置已完成,其他線程看到标記則會直接跳過。而對于正在執行的複制任務的位置,則會直接鎖住該桶,表示這個桶我來負責,其他線程不要插手。這樣,就不會有并發問題了。
7.什麼時候結束呢?每個線程參加複制前會将标記位sizeCtl加1,同樣退出時會将sizeCtl減1,這樣每個線程退出時,隻要檢查一下sizeCtl是否等于進入前的狀态就知道是否全都退出了。最後一個退出的線程,則将就table的位址更新指向新table的位址,這樣後面的操作就是新table的操作了。
總結:上面的一字一句都是自己看完源碼手敲出來的,為了簡單易懂,可能會将一些細節忽略,但是其中最重要的思想都還包含在上面。如果有疑問或者有錯誤的地方,歡迎在評論區留言。
五、get方法
//不用擔心get的過程中發生resize,get可能遇到兩種情況
//1.桶未resize(無論是沒達到門檻值還是resize已經開始但是還未處理該桶),周遊連結清單
//2.在桶的連結清單周遊的過程中resize,上面的resize分析可以看出并未破壞原tab的桶的節點關系,周遊仍可以繼續
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法:
根據key的hash值定位,周遊連結清單或者紅黑樹,擷取節點。
具體一點:
- 根據key的hash值定位到桶位置。
- map是否初始化,沒有初始化則傳回null。否則進入3
- 定位到的桶位置是否有頭結點,沒有傳回nul,否則進入4
- 是否有其他線程在擴容,有的話調用find方法查找。是以這裡可以看出,擴容操作和get操作不沖突,擴容map的同時可以get操作。
- 若沒有其他線程在擴容,則周遊桶對應的連結清單或者紅黑樹,使用equals方法進行比較。key相同則傳回value,不存在則傳回null.
并發問題:假如此時正好有别的線程正在對數組擴容怎麼辦?
答:沒關系,擴容的時候不會破壞原來的table,周遊任然可以繼續,不需要加鎖。
源碼:
//不用擔心get的過程中發生resize,get可能遇到兩種情況
//1.桶未resize(無論是沒達到門檻值還是resize已經開始但是還未處理該桶),周遊連結清單
//2.在桶的連結清單周遊的過程中resize,上面的resize分析可以看出并未破壞原tab的桶的節點關系,周遊仍可以繼續
六、addCount()方法
在putVal()方法的最後會調用addCount()方法,開始分析。
addCount()更新元素的容器個數
當ConcurrentHashMap添加了元素之後,需要通過addCount()更新元素的個數。并且如果發現元素的個數達到了擴容門檻值(sizeCtl),那麼将進行resize()操作。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//更新size
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//resize
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//不斷CAS重試
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {//需要resize
//為每個size生成一個獨特的stamp 這個stamp的第16為必為1 後15位針對每個n都是一個特定的值 表示n最高位的1前面有幾個零
int rs = resizeStamp(n);
//sc會在庫容時變成 rs << RESIZE_STAMP_SHIFT + 2;上面說了rs的第16位為1 是以在左移16位後 該位的1會到達符号位 是以在擴容是sc會成為一個負數
//而後16位用來記錄參與擴容的線程數
//此時sc < 0 說明正在擴
if (sc < 0) {
/**
* 分别對五個條件進行說明
* sc >>> RESIZE_STAMP_SHIFT != rs 取sc的高16位 如果!=rs 則說明HashMap底層資料的n已經發生了變化
* sc == rs + 1 此處可能有問題 我先按自己的了解 覺得應該是 sc == rs << RESIZE_STAMP_SHIFT + 1; 因為開始transfer時 sc = rs << RESIZE_STAMP_SHIFT + 2(一條線程在擴容,且之後有新線程參與擴容sc均會加1,而一條線程完成後sc - 1)說明是參與transfer的線程已經完成了transfer
* 同理sc == rs + MAX_RESIZERS 這個應該也改為 sc = rs << RESIZE_STAMP_SHIFT + MAX_RESIZERS 表示參與遷移的線程已經到達最大數量 本線程可以不用參與
* (nt = nextTable) == null 首先nextTable是在擴容中間狀态才使用的數組(這一點和redis的漸進式擴容方式很像) 當nextTable 重新為null時 說明transfer 已經finish
* transferIndex <= 0 也是同理
* 遇上以上這些情況 說明此線程都不需要參與transfer的工作
* PS: 翻了下JDK16的代碼 這部分已經改掉了 rs = resizeStamp(n) << RESIZE_STAMP_SHIFT 證明我們的猜想應該是正确的
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//否則該線程需要一起transfer
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//說明沒有其他線程正在擴容 該線程會将sizeCtl設定為負數 表示正在擴容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
如上文所說,這個方法有兩個作用,一是更新元素個數,二是判斷是否需要resize()。
更新size()
我們可以單獨看addCount中更新size的部分
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
1.首先判斷countCells是否已經被初始化,如果沒有被初始化,那麼将嘗試在size的更新操作放在baseCount上。如果此時沒有沖突,那麼CAS修改baseCount就能成功,size的更新就落在了baseCount上。
2.如果此時已經有countCells了,那麼會根據線程的探針随機落到countCells的某個下标上。對size的更新就是更新對應CountCells的value值。
3.如果還是不行,将會進入
方法中,自旋重試直到更新成功。這裡不對
fullAddCount
fullAddCount
展開介紹,具體操作也類似,size的變化要麼累加在對應的CountCell上,要麼累加在baseCount上。
這裡說一下我個人對ConcurrentHashMap采用這麼複雜的方式進行計數的了解。因為ConcurrenthHashMap是出于吞吐量最大的目的設計的,是以,如果單純的用一個size直接記錄元素的個數,那麼每次增删操作都需要同步size,這會讓ConcurrentHashMap的吞吐量大大降低。
因為,将size分散成多個部分,每次修改隻需要對其中的一部分進行修改,可以有效的減少競争,進而增加吞吐量。
resize()
對于resize()過程,我其實在代碼的注釋中說明的比較詳細了。
1.首先,是一個while()循環,其中的條件是元素的size(由上一步計算而來)已經大于等于sizeCtl(說明到達了擴容條件,需要進行resize),這是用來配合CAS操作的。
2.接着,是根據目前數組的容量計算了resizeStamp(該函數會根據不同的容量得到一個确定的數)。得到的這個數會在之後的擴容過程中被使用。
3.然後是比較sizeCtl,如果sizeCtl小于0,說明此時已經有線程正在擴容,排除了幾種不需要參與擴容的情況(例如,擴容已經完成,或是參與的擴容線程數已經到最大值,具體情況代碼上的注解已經給出了分析),剩下的情況目前線程會幫助其他線程一起擴容,擴容前需要修改CAS修改sizeCtl(因為在擴容時,sizeCtl的後16位表示參與擴容的線程數,每當有一個線程參與擴容,需要對sizeCtl加1,當該線程完成時,對sizeCtl減1,這樣比對sizeCtl就可以知道是否所有線程都完成了擴容)。
另外如果sizeCtl大于0,說明還沒有線程參與擴容,此時需要CAS修改sizeCtl為rs << RESIZE_STAMP_SHIFT + 2(其中rs是有resizeStamp(n)得到的),這是一個負數,上文也說了這個數的後16位表示參與擴容的線程,當所有線程都完成了擴容時,sizeCtl應該為rs << RESIZE_STAMP_SHIFT + 1。這是我們結束擴容的條件,會在後文看到。
七、initTable方法
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//如果table為null或者長度為0, //則一直循環試圖初始化table(如果某一時刻别的線程将table初始化好了,那table不為null,
該//線程就結束while循環)。
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl小于0,
//即有其他線程正在初始化或者擴容,執行Thread.yield()将目前線程挂起,讓出CPU時間,
//該線程從運作态轉成就緒态。
//如果該線程從就緒态轉成運作态了,此時table可能已被别的線程初始化完成,table不為
//null,該線程結束while循環。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//如果此時sizeCtl不小于0,即沒有别的線程在做table初始化和擴容操作,
//那麼該線程就會調用Unsafe的CAS操作compareAndSwapInt嘗試将sizeCtl的值修改成
//-1(sizeCtl=-1表示table正在初始化,别的線程如果也進入了initTable方法則會執行
//Thread.yield()将它的線程挂起 讓出CPU時間),
//如果compareAndSwapInt将sizeCtl=-1設定成功 則進入if裡面,否則繼續while循環。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次确認目前table為null即還未初始化,這個判斷不能少。
if ((tab = table) == null || tab.length == 0) {
//如果sc(sizeCtl)大于0,則n=sc,否則n=預設的容量大
小16,
//這裡的sc=sizeCtl=0,即如果在構造函數沒有指定容量
大小,
//否則使用了有參數的構造函數,sc=sizeCtl=指定的容量大小。
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//建立指定容量的Node數組(table)。
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//計算門檻值,n - (n >>> 2) = 0.75n當ConcurrentHashMap儲存的鍵值對數量
//大于這個門檻值,就會發生擴容。
//這裡的0.75相當于HashMap的預設負載因子,可以發現HashMap、Hashtable如果
//使用傳入了負載因子的構造函數初始化的話,那麼每次擴容,新門檻值都是=新容
//量 * 負載因子,而ConcurrentHashMap不管使用的哪一種構造函數初始化,
//新門檻值都是=新容量 * 0.75。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
簡單來說就是:
- 多線程使用cas樂觀鎖競争tab數組初始化的權力。
- 線程競争成功,則初始化tab數組。
- 競争失敗的線程則讓出cpu(從運作态到就緒态)。等再次得到cpu時,發現tab!=null,即已經有線程初始化tab數組了,則退出即可。
八、remove方法
public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
//計算需要移除的鍵key的哈希位址。
int hash = spread(key.hashCode());
//周遊table。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//table為空,或者鍵key所在的bucket為空,則跳出循環傳回。
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
//如果目前table正在擴容,則調用helpTransfer方法,去協助擴容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//将鍵key所在的bucket加鎖。
synchronized (f) {
if (tabAt(tab, i) == f) {
//bucket頭節點的哈希位址大于等于0,為連結清單。
if (fh >= 0) {
validated = true;
//周遊連結清單。
for (Node<K,V> e = f, pred = null;;) {
K ek;
//找到哈希位址、鍵key相同的節點,進行移除。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
//如果bucket的頭節點小于0,即為紅黑樹。
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
//找到節點,并且移除。
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
//調用addCount方法,将目前ConcurrentHashMap存儲的鍵值對數量-1。
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
九、總結
1.擴容完成後做了什麼?
nextTable=null //新數組的引用置為null
tab=nextTab //舊數組的引用指向新數組
sizeCtl=0.75n //擴容門檻值重新設定,數組元素個數超過這個門檻值就會觸發擴容
2.concurrentHashMap中設定為volatile的變量有哪些?
Node,nextTable,baseCount,sizeCtl
3.單線程初始化,多線程擴容
4.什麼時候觸發擴容?
1.連結清單轉換為紅黑樹時(連結清單節點個數達到8個可能會轉換為紅黑樹),table數組長度小于64。
2.數組中總節點數大于門檻值(數組長度的0.75倍)
5.如何保證初始化nextTable時是單線程的?
所有調用transfer的方法(例如helperTransfer、addCount)幾乎都預先判斷了nextTab!=null,而nextTab隻會在transfer方法中初始化,保證了第一個進來的線程初始化之後其他線程才能進入。
6.get操作時擴容怎麼辦?
7.put操作擴容時怎麼辦?
8.如何hash定位?
答:h^(h>>>16)&0x7fffffff,即先将hashCode的高16位和低16位異或運算,這個做目的是為了讓hash值更加随機。和0x7fffffff相與運算是為了得到正數,因為負數的hash有特殊用途,如-1表示forwardingNode(上面說的表示該位置正在擴容),-2表示是一顆紅黑樹。
9.forwardingNode有什麼内容?
nextTable //擴容時執向新table的引用
hash=moved //moved是常量-1,正在擴容的标記
10.擴容前連結清單和擴容後連結清單順序問題
語言描述很難解釋,直接看圖,hn指向最後同一類的第一個節點,hn->6->7,此時ln->null,接着從頭開始周遊連結清單;
第一個節點:由于1的hash&n==1,是以應該放到hn指向的連結清單,采用頭插法。hn->1->6->7
第二個節點:同樣,hn->2->1->6->7
第三個節點:hash&n==0,是以應該插入到ln連結清單,采用頭插法,ln->3
.....
最後:
ln->5->3 //複制到新table的i位置處
hn->2->1->6->7 //複制到新table的i+n位置處
可以看到ln中所有元素都是後來一個個插入進來的,是以都是逆序
而hn中6->7是初始賦予的是以順序,而其1,2是後來插入的,是以逆序。
總結:有部分順序,有部分逆序。看情況
參考文章:
https://www.cnblogs.com/ylspace/p/12726672.html
https://www.cnblogs.com/zerotomax/p/8687425.html
https://www.cnblogs.com/insaneXs/p/13586928.html