前言
ConcurrentHashMap從名稱是可以看出,它是一個HashMap而且是線程安全的。在多線程程式設計中使用非常廣泛。ConcurrentHashMap的實作方式,在jdk6,7,8中都不一樣。本文隻針對jdk8中的實作作一些說明。
ConcurrentHashMap實作原理
先來看看ConcurrentHashMap底層是發何實作的。總的來說,它是采用Node<K,V>類型(繼承了Map.Entry)的數組table+單向連結清單+紅黑樹的結構。table數組的大小預設為16,數組中的每一項稱為桶(bucket),桶中存放的是連結清單或者是紅黑樹結構,取決于連結清單的長度是否達到了閥值8(大于等于8)(預設),如果是,接着再判斷數組的長度是否小于64,如果小于則優先擴容table容量來解決單個桶中元素增多的問題,如果不是則轉換成紅黑樹結構存放。
再次,我們看到ConcurrentHashMap類中,Unsafe類。說明線程安全的實作是基于CAS算法的無鎖化修改值的操作,它可以大大降低鎖帶來的性能消耗。其基本思想是不停的去比較目前記憶體中的變量值與給定的值是否相同(值相等且引用也相等),如果相同則修改成指定的值,否則什麼也不做。這與樂觀鎖的思想類似。缺點就是消耗CPU性能。
private static final sun.misc.Unsafe U;
U = sun.misc.Unsafe.getUnsafe();
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
源碼分析
先來看看ConcurrentHashMap擴容是如何發生的,主要是在put一個KV時,如果達到某些閥值則會重新new一個nextTable其長度是原table的2倍。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
//onlyIfAbsent的意思是在put一個KV時,如果K已經存在什麼也不做則傳回null
//如果不存在則put操作後傳回V值
final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap中是不能有空K或空V的
if (key == null || value == null) throw new NullPointerException();
//hash算法得到hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table是空的,就去初始化,下一個循環就不是空的了
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果沒有取到值,即取i位的元素是空的,為什麼i取值是(n-1)&hash??
//這是hash的精華所在,在這裡可以先思考一下
//此時直接到KV包裝成Node節點放在i位置即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//MOVED,定義為-1。标記原table正在執行擴容任務,可以去幫忙(支援多線程擴容)
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//這種情況是,在i的位置找到了一個元素,說明此元素的K與之間的某個K的hash結果是一樣的
//
V oldVal = null;
synchronized (f) {//同步鎖住第一個元素
if (tabAt(tab, i) == f) {//為了安全起見,再一次判斷
if (fh >= 0) {//節點的hash值大于0,說明是一個連結清單結構
binCount = 1;//記錄連結清單的元素個數
for (Node<K,V> e = f;; ++binCount) {
K ek;
//判斷給定的key是否與取出的key相同,如果是則替換元素
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;//直接跳出,這是一種思想。在程式設計時可以減少一些if else判斷
}
//否則就是不相等,那就把此元素放在連結清單的最後一個元素
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果不是連結清單,而是紅黑樹
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//把元素放入樹中的對應位置
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//連結清單的元素大于等于8時,就把連結清單轉換為紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//新添加一個元素,size加1,可能會觸發擴容
addCount(1L, binCount);
return null;
}
上面是對put操作的整個流程的分析,可以看出需要關注的幾個點
- hash算法及table下标i的計算方法
- 首次放元素時,initTable方法做了哪些事情
- 目前為正在擴容時help做了哪些操作?
- table中的元素有可能是連結清單結構,也有可能是紅黑樹結構
- 什麼條件下會去執行連結清單轉換成紅黑樹?
下面,我們先來看看連結清單轉成紅黑樹的方法操作
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//先判斷table的長度是否小于64,如果小于,則優先使用擴容來解決問題
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//擴容為原來的一位,調整某一個桶中元素過多的問題(超出了8個))
//會觸發某些桶中的元素重新配置設定,避免在一個桶中有太多的元素影響通路效率
tryPresize(n << 1);
//桶中存在結點,并且此結點的hash值大于0,調整紅黑樹的結構
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {//鎖住節點,把元素添加到樹中
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
還有一個就是addCount方法,這個方法在執行時,有可能會觸發擴容操作
private final void addCount(long x, int check) {
............省略無關代碼.....
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);//可見是通過原子修改sizectl的值來判斷是否需要擴容操作
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
在多線的環境下,用volatile的方式讀取sizectrl屬性的值,來判斷map所處的狀态,通過cas修改操作來告訴其它線程Map的狀态類型。不同的數值類型,代表着不同的狀态:
- 未初始化
- 等于0,表示未指定初始化容量,則使用預設容量
- 大于0,為指定的初始化容量
- 初始化中
- 等于-1,表示正在初始化,并且通過cas告訴其它線程
- 正常狀态
- 等于原table長度n*0.75,擴容閥值
- 擴容中
- 小于0,表示有其他線程正在執行擴容操作
- 等于(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2表示此時隻有一個線程在執行擴容
接下來我們來看看擴容方法
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//取CPU的數量,确定每次遷移的Node的數量,確定不會少于MIN_TRANSFER_STRIDE=16個
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//擴容一倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//擴容索引,表示已經配置設定給擴容線程的table數組索引位置。
//主要用來協調多個線程,安全地擷取遷移"桶"。
transferIndex = n;
}
int nextn = nextTab.length;
//标記目前節點已經遷移完成,它的hash值是MOVED=-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//1 逆序遷移已經擷取到的hash桶集合,如果遷移完畢,則更新transferIndex,擷取下一批待遷移的hash桶
//2 如果transferIndex=0,表示是以hash桶均被配置設定,将i置為-1,準備退出transfer方法
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/**
第一個擴容的線程,執行transfer方法之前,會設定 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
後續幫其擴容的線程,執行transfer方法之前,會設定 sizeCtl = sizeCtl+1
每一個退出transfer的方法的線程,退出之前,會設定 sizeCtl = sizeCtl-1
那麼最後一個線程退出時:
必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
*/
//不相等,說明不到最後一個線程,直接退出transfer方法
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {//開始遷移
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//遷移連結清單,将node連結清單分成兩個新的連結清單
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;//取桶中每個節點的hash值
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将node連結清單放在新的table對應的位置
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//遷移紅黑樹
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
關于上面遷移連結清單的操作,比較有意思,我們來分析一下。還記得,在putVal方法有有一段代碼
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
用于計算tab中元素的下标的,n就是tab的長度,隻會是2的x次幂,先來熟悉一下&運算,它是指對應的二進制位上,如果都是1則結果為1,否則為0。假設現在,tab的長度為16,換成二進制就是10000,減1就是01111,取hash的值,這個值有點特别,就是從右起第x位(log以2為底的16)=4(從0開始數)。如果是10000&此數,則結果一定是0,例如:
0000000000010000 0000000000001111
0101001000001001 結果為0 0101001000001001 結果是9,即i下标是9
如果此時tab擴容到32,也就是100000,再來看看(n-1)&hash的結果
0000000000011111
0101001000001001 結果也是9,即i下标是9
說明,如果右起第x位為0的話,runbit==0成立,此時擴容到原來的2倍的話在新數組中的下标是不變的,所在可以看到把ln連結清單直接放到nextTable的i位了。
再來看看,右起第x位為1的情況
0000000000010000 0000000000001111
0101001000011001 結果為16不等于0 0101001000011001 結果是9,即i下标是9
如果此時擴容到了32,也就是100000時,再來看看(n-1)&hash的結果
0000000000011111
0101001000011001 結果是16+8+1=25
即擴容後新下标變成了25,也就是原來的下标9再加擴容的量16,就是i+n的結果,是以對于hn來說在新table中的位置就變成了i+n了。
總結
通過代碼我們可以看出,這裡面的思想還是值得學習借鑒的。下标取(n-1)&hash并不是随便設計出來的,而是經過精心設計的。擴容後,桶的數量發生了變化,但無論是目前時刻使用的是新table還是擴容後的table通路的位置相對table長度來說都沒有發生變化,為通路get提供便利。擴容時也不用重新計算hash值,同時結合多線程操作擴容提升操作效率。
參考:
https://blog.csdn.net/ls1firesoar/article/details/78980002
https://www.cnblogs.com/stateis0/p/9062089.html