ConcurrentHashMap源碼分析
- ConcurrentHashMap面試題
- 1.7
-
- 屬性
- 構造器
- put()方法
- 1.8
-
- 屬性
- 内部類TreeBin
- 内部類 ForwardingNode
- 構造器
- putVal()方法
-
- initTable() 初始化數組
- addCount() 添加元素個數
-
- fullAddCount()
- transfer() 轉移資料
- helpTransfer()
- CopyOnWriteArray
- 跳表 ConcurrentSkipListMap
ConcurrentHashMap是線程安全的,他是寫同步讀無鎖。 也就是put()的時候會加鎖,get()的時候不會加鎖
ConcurrentHashMap面試題
JDK7和JDK8ConcurrentHashMap 比較

JDK7ConcurrentHashMap是如何保證并發安全的
JDK7ConcurrentHashMap底層原理
JDK8是如何保證并發安全的
JDK8 put流程
1.7
ConcurrentHashMap底層是由兩層嵌套數組來實作的:
- ConcurrentHashMap對象中有一個屬性segments,類型為Segment[];
- Segment對象中有一個屬性table,類型為HashEntry[];
屬性
- segment segment繼承了ReenTrantLock
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap 【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - DEFAULT_INITIAL_CAPACITY hashEntry數組的長度預設為16
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - DEFAULT_CONCURRENCY_LEVEL 并發級别 segment數組的大小就是通過并發級别算出來的>=并發級别的最大二次幂
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap 【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap -
MIN_SEGMENT_TABLE_CAPACITY
每個segment的容量最小要是2
構造器
-
确定segment數組的長度 ssize
通過并發級别concurrency 來算ssize為大于等于并發級别的最小二次幂
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap -
确定每個segment對應的hashEntry數組的長度 cap
通過initialCapacity/ssize 算出每個segment應該對應的Entry個數,但是要進行向上取整操作才能保證把我們要求的initialCapacity都放下,而且要保證cap要大于MIN_SEGMENT_TABLE_CAPACITY即最小容量。此外,cap也必須是2的幂次方。
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - new Segment對象 Segment<K,V> s0 new出第0個segment位置的放的對象
為什麼需要先建立一個segment對象放在第0個位置
因為我們往Segment放hashEntry的時候用的是segment對象的put方法,那放之前肯定是要有segment對象存在的才能調用其put方法。如果我們每次都new segment對象的時候還要确定其長度各種,是以提前創一個s0,到時候直接用s0的屬性即可。 - new Segment數組
- 用UNSAFE.putOrderedObject把Segment對象放到Segment數組的第0個位置
put()方法
- 計算key的hashcode,通過hashcode得到在Segment的索引j
- 通過UNSAFE.getObject得到Segment第j個位置的值s
- 第j個位置是null,則ensureSegment 生成segment對象
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - 調用segment的put方法
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - 擷取鎖tryLock()
-
擷取到直接傳回true
tryLock() 擷取到就傳回true,擷取不到就傳回false不會像lock()一樣擷取不到就阻塞。因為tryLock()調的是nonfairTryAcquire。
由于tryLock() 不會阻塞 是以可以通過while(tryLock())在等鎖的過程,提前把 HashEntry給new好。
- 擷取不到調用scanAndLockForPut
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap -
entryForHash
根據hash算出hashEntry對應的索引,擷取到HashEntry對象,沒有就傳回null
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - 通過retries的值控制走的邏輯
- 周遊連結清單,看key相等就換value,到最後都沒有就建立node
- 嘗試次數到64就用lock() 阻塞
- 嘗試的過程中發現目前索引的值已被改變,就又走第一個邏輯周遊有沒有相等的key
-
-
- entryAt擷取HashEntry數組第i個位置上的值
- 周遊HashEntry數組第i個位置的node
- 不為null,就對比key的值,相同則替換value
- 為null 就new 一個 HashEntry(頭插法),通過setEntryAt将建立好的對象放到HashEntry數組的第i個位置
-
rehash 大于門檻值就擴容
new 一個原來兩倍大的數組
第一次循環把連結清單尾端連續的新idx相同的一串直接拿過去(類似蜘蛛紙牌)
第二次循環再依次周遊單個的移
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap
- 擷取鎖tryLock()
1.8
屬性
-
nextTable
擴容時用來存資料的臨時中間表
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap -
sizeCtl 用來标志table初始化和擴容的,不同取值代表不同的含義
代碼裡的sc就是這個sizeCtl
- 0:table還沒有初始化
- -1:table正在初始化
- 小于-1:表示table正在擴容
- 大于0:初始化完成後,代表table最大存放元素的個數,即要擴容的門檻值,預設為0.75*n
- Moved為-1時,表示目前table正在擴容
- MIN_TRANSFER_STRIDE:預設16,table擴容時,每個線程最少遷移的槽位個數
- baseCount 統計size
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap -
counterCells
利用counterCells來計數,統計所有添加的元素的個數
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap 【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap
内部類TreeBin
TreeBin是ConcurrentHashMap的内部類,new 一個TreeBin就是一個紅黑樹對象
為什麼1.8 HashMap不需要這個TreeBin對象呢
因為1.8ConcurrentHashMap是要給對象加鎖的,但是紅黑樹的根節點會改變的,如果按1.8HashMap的結構的話,線程A給在firstNode加鎖後,萬一根節點變化了,那加的鎖相當于沒用了,這樣别的線程就也可以改目前紅黑樹了。
是以這裡new 一個TreeBin,TreeBin内部有一個root屬性,這樣TreeBin對象是永遠不會變的,給TreeBin對象加鎖就沒問題了。
内部類 ForwardingNode
ForwardingNode是一個node,他在數組正在被擴容的時候被建立,并放在正在被transfer的位置上,hash值為MOVED即負一,
ForwardingNode的nextTable屬性存的是新數組。
這樣當有線程putVal 擷取到的hash值為MOVED,就知道目前這個數組在擴容,就會helpTransfer幫助擴容
構造器
putVal()方法
- 自旋:
- if 如果數組為空 則初始化數組 initTable()
-
else if 數組不為空則通過hash計算索引,索引位置為空則通過CAS new Node結點
這裡CAS失敗的,就又會自旋
-
else if 如果正在擴容,則幫助擴容helpTransfer()
f.hash==MOVED MOVED是負1 目前索引位置的對象對應的hash值是負1,說明有别的線程正在擴容,然後目前線程就去幫助擴容
-
else 在目前位置上的對象f(連結清單頭結點/紅黑樹根節點)上用synchronized加鎖
在目前索引處的對象還是加鎖的對象時,即頭結點沒有發生變化時繼續:
- 如果是連結清單,周遊連結清單,如果有key相同的則替換其value,否則new Node 通過尾插法插傳入連結表。
- 是紅黑樹,通過putTreeVal()向紅黑樹插入元素
- if 判斷是否需要樹化treeifyBin()
- addCount() 擴容
initTable() 初始化數組
由于可能多個線程同時來初始化,為了保證隻有一個線程能夠初始化,用了Thread.yield() 和cas ,這裡利用SIZECTL這個參數 實作了無鎖 并發,保證隻有一個能進去初始化,其他運作到都是釋放資源(thread.yield)
- 通過自旋+ Thread.yield() 初始化失敗的線程就Thread.yield() 把運作狀态讓出來(暫時放棄cpu的執行)
-
用CAS方式修改SIZECTL為-1,能修改成功的線程進行初始化
SIZECTL為-1表示正在初始化
sc=n-(n>>>2) ??是什麼意思
- 初始化完成後,SIZECTL改為table最大存放元素的個數
addCount() 添加元素個數
baseCount是用來統計存的元素的個數的,如果都去競争baseCount則效率低,是以用一個CounterCell數組,讓線程來競争CounterCell數組某個位置的CounterCell對象,最後對CounterCell數組進行統計
- 添加元素個數
- CounterCell為空去通過CAS去修改baseCount
- CounterCell不為空或上面競争baseCount失敗的線程
- 數組不為空, 線程生成一個随機數,通過與CounterCell數組的長度進行與操作,來得到在CounterCell中的索引位置,通過CAS方式對目前索引上的value屬性+x
- 數組為空或上一步CAS修改失敗 則 fullAddCount(x, uncontended)
-
統計整個ConcurrentHashMap的元素個數s sumCount()
就是把baseCount和CounterCell中的值都求和
-
while 進行擴容(擴容完後可能新數組又需要擴容
當 s>sizeCtrl即擴容門檻值 且數組不為空 且 連結清單長度是小于MAXIMUM_CAPACITY 則擴容
- sc<0 表示有别的線程正在擴容
- sc>=0
- 通過CAS将sc改成一個負數
- 通過transfer() 遷移
一句話總結:addCount()+fullAddCount() 就是往baseCount或CounterCell 加添加進的元素的個數的,最終就是為了保證一定能加成功,加鎖方式用的是CAS,最後統計size就是統計這兩部分的和。
fullAddCount()
cellsBusy :用來表示目前CounterCell是否在被其他線程使用,如果是0表示沒有被其他線程使用。 隻要對CounterCell進行操作,都要是保證cellsBusy為0的時候,用CAS方式将cellsBusy改為1,相當于用CAS的方式加鎖。
ThreadLocalRandom.getProbe() 對線程生成一個随機數,且這個随機數是不變的對這個線程是唯一的。 也就是對這個線程來說每次調getProbe()的結果都是一樣的
ThreadLocalRandom.advanceProbe(h)生成一個新的值
uncontended:用來控制是否用CAS修改目前cell的對象。因為如果進入fullAddCount()函數之前就已經嘗試過CAS修改目前cell的對象值了,失敗uncontended就為false。這樣這裡先判斷uncontended為false就不讓他執行下面的了。
collide:用來控制是否擴容
一句話總結:fullAddCount()就是保證一定把x加進去,具體做的事情其實就是自旋的方式建立CounterCell數組,對應索引沒對象就建立,有就在此基礎上加x。一直沒加成功的條件下,會擴容。 隻是這些操作的前提都是用CAS操作來加鎖的,保證線程安全。
- 計算目前線程的對應随機數
- 自旋:
- CounterCell數組不為空
-
if 對應索引處為null 即沒有對象
則建立cellsBusy對象,在修改cellsBusy成功的情況将創好的對象指派到CounterCell數組
- else if uncontended為 true的情況 有對象 則通過CAS方式對此對象+x
-
else if collide為true的情況進行對CounterCell擴容
collide為true是什麼情況呢?也就是什麼時候或擴容?
兩次hash計算得到的索引對應的位置都有對象,并且通過cas對此對象修改失敗
- 上面都沒成功就rehash 重新為線程生成随機數
-
- CounterCell數組為空 且cellsBusy為0即其他線程沒有使用
- 通過CAS方式将cellsBusy改為1,也就相當于是給CounterCell數組加鎖。
- 加鎖成功才初始化,即new 一個長度為2的CounterCell數組,并線上程算出來的對應索引處new CounterCell(x) 對象
- 将cellsBusy改為0,即釋放鎖
- 以上情況都不滿足,則還是用CAS對baseCount去修改
- CounterCell數組不為空
transfer() 轉移資料
ForwardNode 就是把正在被擴容的數組的node都包裝成ForwardNode,hash值為MOVED即負1,這樣其他線程對這個數組進行put的時候發現對應位置對應的值為-1就會helpTransfer
advance :advance為true 則一直往前找自己擴容的區域 ,找到後會被修改為false
finishing:為true即目前線程的擴容任務已做完
bound到i為目前線程的擴容範圍(i>bound)
transferIndex: 每個線程進來會根據transferIndex來計算自己應該擴容的區域[bound,i],transferIndex相當于[0,transferIndex]的範圍都還沒被遷移,[transferIndex,n]的範圍已經在被别的線程遷移了
- 準備工作
- 計算目前線程擴容的步長stride 預設為16
-
初始化
nextTab即新數組為null就建立一個長度為原數組2倍的數組,transferIndex為n (即原數組會從後往前進行遷移)
-
建立 ForwardingNode<K,V> 對象fwd,hash值為MOVED即-1,其屬性放着nextTab。
老數組的一個位置為null或者被遷移到新數組後,就會把fwd放在目前位置。表示此數組正在擴容。别的線程通路到就會幫助擴容。
- 循環 進行遷移
-
while(advance)得到目前線程需要擴容的區域[bound,i]
通過cas修改transferIndex來得到目前線程需要擴容的區域[bound,i]
- 目前位置為null,cas将fwd放在此位置
- 目前位置為fwd,advance改為true繼續前進
- synchronized 加鎖
-
目前位置是連結清單,和1.7ConcurrentHashMap連結清單遷移邏輯一樣
周遊連結清單像蜘蛛紙牌一樣,把最後的新位置一樣的連續結點一起拿過去;再周遊連結清單,算出新的對應位置一個一個移過去。
-
目前位置是紅黑樹,和1.8HashMap紅黑樹遷移邏輯一樣
通過每個TreeNode的prev和next 看作雙向連結清單,然後通過高低指針得到兩條鍊(每條對應新數組的新位置),分别看兩條鍊小于untreeify門檻值則untreeify,不小于則還是紅黑樹。
-
-
-
結束條件
目前線程已經把所有可以遷移的都遷移完了(i<n)
-
但其他線程還在遷移(通過SIZECTL和初始值不等判斷得到),則直接return
因為在擴容的時候SIZECTL會被設定為一個很小的負值,每個線程進來擴都會将SIZECTL+1,擴完走的時候會把SIZECTL-1。目前線程走的時候SIZECTL和當初的很小的負值不相等說明還有别的線程在擴容。
- 整個數組已被遷移完,即目前線程是最後一個遷移的,則将新數組指派給table
-
- 準備工作
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - 擷取遷移區域 和結束邏輯
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - 遷移的位置為null或本來就是fwd
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - 遷移的位置是連結清單,加鎖遷移
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap - 遷移的位置是紅黑樹,加鎖遷移
【并發】11、ConcurrentHashMap源碼分析ConcurrentHashMap面試題1.71.8CopyOnWriteArray跳表 ConcurrentSkipListMap
helpTransfer()
CopyOnWriteArray
讀寫分離 空間換時間
從資料一緻性角度講,保證最終一緻性
setArray後 目前副本就變成正本,原來的正本就要被抛棄
跳表 ConcurrentSkipListMap
保證在并發場景下 key有序的map,跳表的時間複雜度O(logn)
保證map的key的順序,底層的資料結構基于連結清單O(n),又維護了一個索引,根據索引去找就更快了
Redis的sortset是基于跳表的
TreeMap也是key有序的map,但是不是線程安全的