以前寫過介紹HashMap的文章,文中提到過HashMap在put的時候,插入的元素超過了容量(由負載因子決定)的範圍就會觸發擴容操作,就是rehash,這個會重新将原數組的内容重新hash到新的擴容數組中,在多線程的環境下,存在同時其他的元素也在進行put操作,如果hash值相同,可能出現同時在同一數組下用連結清單表示,造成閉環,導緻在get時會出現死循環,是以HashMap是線程不安全的。
JDK1.7的實作
整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,是以很多地方都會将其描述為分段鎖。注意,行文中,我很多地方用了“槽”來代表一個 segment。
簡單了解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 通過繼承 ReentrantLock 來進行加鎖,是以每次需要加鎖的操作鎖住的是一個 segment,這樣隻要保證每個 Segment 是線程安全的,也就實作了全局的線程安全。
concurrencyLevel:并行級别、并發數、Segment 數。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,是以理論上,這個時候,最多可以同時支援 16 個線程并發寫,隻要它們的操作分别分布在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。
再具體到每個 Segment 内部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證線程安全,是以處理起來要麻煩些。
初始化
initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。
loadFactor:負載因子,之前我們說了,Segment 數組不可以擴容,是以這個負載因子是給每個 Segment 内部使用的。
1 public ConcurrentHashMap(int initialCapacity,
2 float loadFactor, int concurrencyLevel) {
3 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
4 throw new IllegalArgumentException();
5 if (concurrencyLevel > MAX_SEGMENTS)
6 concurrencyLevel = MAX_SEGMENTS;
7 // Find power-of-two sizes best matching arguments
8 int sshift = 0;
9 int ssize = 1;
10 // 計算并行級别 ssize,因為要保持并行級别是 2 的 n 次方
11 while (ssize < concurrencyLevel) {
12 ++sshift;
13 ssize <<= 1;
14 }
15 // 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4
16 // 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值
17 this.segmentShift = 32 - sshift;
18 this.segmentMask = ssize - 1;
19
20 if (initialCapacity > MAXIMUM_CAPACITY)
21 initialCapacity = MAXIMUM_CAPACITY;
22
23 // initialCapacity 是設定整個 map 初始的大小,
24 // 這裡根據 initialCapacity 計算 Segment 數組中每個位置可以分到的大小
25 // 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個
26 int c = initialCapacity / ssize;
27 if (c * ssize < initialCapacity)
28 ++c;
29 // 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對于具體的槽上,
30 // 插入一個元素不至于擴容,插入第二個的時候才會擴容
31 int cap = MIN_SEGMENT_TABLE_CAPACITY;
32 while (cap < c)
33 cap <<= 1;
34
35 // 建立 Segment 數組,
36 // 并建立數組的第一個元素 segment[0]
37 Segment<K,V> s0 =
38 new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
39 (HashEntry<K,V>[])new HashEntry[cap]);
40 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
41 // 往數組寫入 segment[0]
42 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
43 this.segments = ss;
44 }
初始化完成,我們得到了一個 Segment 數組。
我們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:
- Segment 數組長度為 16,不可以擴容
- Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始門檻值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
- 這裡初始化了 segment[0],其他位置還是 null,至于為什麼要初始化 segment[0],後面的代碼會介紹
- 目前 segmentShift 的值為 32 - 4 = 28,segmentMask 為 16 - 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到
Segment
1 static class Segment<K,V> extends ReentrantLock implements Serializable {
2
3 transient volatile HashEntry<K,V>[] table;
4
5 transient int count;
6
7 transient int modCount;
8
9 }
從上Segment的繼承體系可以看出,Segment實作了ReentrantLock,也就帶有鎖的功能,table使用volatile修飾,保證了記憶體可見性。
put 過程分析
我們先看 put 的主流程,對于其中的一些關鍵細節操作,後面會進行詳細介紹。
1 public V put(K key, V value) {
2 Segment<K,V> s;
3 if (value == null)
4 throw new NullPointerException();
5 // 1. 計算 key 的 hash 值
6 int hash = hash(key);
7 // 2. 根據 hash 值找到 Segment 數組中的位置 j
8 // hash 是 32 位,無符号右移 segmentShift(28) 位,剩下高 4 位,
9 // 然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的高 4 位,也就是槽的數組下标
10 int j = (hash >>> segmentShift) & segmentMask;
11 // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
12 // ensureSegment(j) 對 segment[j] 進行初始化
13 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
14 (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
15 s = ensureSegment(j);
16 // 3. 插入新值到 槽 s 中
17 return s.put(key, hash, value, false);
18 }
初始化槽: ensureSegment
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對于其他槽來說,在插入第一個值的時候進行初始化。
這裡需要考慮并發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過隻要有一個成功了就可以。
1 private Segment<K,V> ensureSegment(int k) {
2 final Segment<K,V>[] ss = this.segments;
3 long u = (k << SSHIFT) + SBASE; // raw offset
4 Segment<K,V> seg;
5 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
6 // 這裡看到為什麼之前要初始化 segment[0] 了,
7 // 使用目前 segment[0] 處的數組長度和負載因子來初始化 segment[k]
8 // 為什麼要用“目前”,因為 segment[0] 可能早就擴容過了
9 Segment<K,V> proto = ss[0];
10 int cap = proto.table.length;
11 float lf = proto.loadFactor;
12 int threshold = (int)(cap * lf);
13
14 // 初始化 segment[k] 内部的數組
15 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
16 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
17 == null) { // 再次檢查一遍該槽是否被其他線程初始化了。
18
19 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
20 // 使用 while 循環,内部用 CAS,目前線程成功設值或其他線程成功設值後,退出,如果其他線程成功設定後,這裡擷取到直接傳回
21 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
22 == null) {
23 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
24 break;
25 }
26 }
27 }
28 return seg;
29 }
總的來說,ensureSegment(int k) 比較簡單,對于并發操作使用 CAS 進行控制。
第一層很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 内部的 put 操作了。
Segment 内部是由 數組+連結清單 組成的。
1 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
2 // 在往該 segment 寫入前,需要先擷取該 segment 的獨占鎖
3 // 先看主流程,後面還會具體介紹這部分内容
4 HashEntry<K,V> node = tryLock() ? null :
5 scanAndLockForPut(key, hash, value);
6 V oldValue;
7 try {
8 // 這個是 segment 内部的數組
9 HashEntry<K,V>[] tab = table;
10 // 再利用 hash 值,求應該放置的數組下标
11 int index = (tab.length - 1) & hash;
12 // first 是數組該位置處的連結清單的表頭
13 HashEntry<K,V> first = entryAt(tab, index);
14
15 // 下面這串 for 循環雖然很長,不過也很好了解,想想該位置沒有任何元素和已經存在一個連結清單這兩種情況
16 for (HashEntry<K,V> e = first;;) {
17 if (e != null) {
18 K k;
19 if ((k = e.key) == key ||
20 (e.hash == hash && key.equals(k))) {
21 oldValue = e.value;
22 if (!onlyIfAbsent) {
23 // 覆寫舊值
24 e.value = value;
25 ++modCount;
26 }
27 break;
28 }
29 // 繼續順着連結清單走
30 e = e.next;
31 }
32 else {
33 // node 到底是不是 null,這個要看擷取鎖的過程,不過和這裡都沒有關系。
34 // 如果不為 null,那就直接将它設定為連結清單表頭;如果是null,初始化并設定為連結清單表頭。
35 if (node != null)
36 node.setNext(first);
37 else
38 node = new HashEntry<K,V>(hash, key, value, first);
39
40 int c = count + 1;
41 // 如果超過了該 segment 的門檻值,這個 segment 需要擴容
42 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
43 rehash(node); // 擴容後面也會具體分析
44 else
45 // 沒有達到門檻值,将 node 放到數組 tab 的 index 位置,
46 // 其實就是将新的節點設定成原連結清單的表頭
47 setEntryAt(tab, index, node);
48 ++modCount;
49 count = c;
50 oldValue = null;
51 break;
52 }
53 }
54 } finally {
55 // 解鎖
56 unlock();
57 }
58 return oldValue;
59 }
整體流程還是比較簡單的,由于有獨占鎖的保護,是以 segment 内部的操作并不複雜。至于這裡面的并發問題,我們稍後再進行介紹。
到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。
擷取寫入鎖: scanAndLockForPut
前面我們看到,在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速擷取該 segment 的獨占鎖,如果失敗,那麼進入到 scanAndLockForPut 這個方法來擷取鎖。
下面我們來具體分析這個方法中是怎麼控制加鎖的。
1 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
2 HashEntry<K,V> first = entryForHash(this, hash);
3 HashEntry<K,V> e = first;
4 HashEntry<K,V> node = null;
5 int retries = -1; // negative while locating node
6
7 // 循環擷取鎖
8 while (!tryLock()) {
9 HashEntry<K,V> f; // to recheck first below
10 if (retries < 0) {
11 if (e == null) {
12 if (node == null) // speculatively create node
13 // 進到這裡說明數組該位置的連結清單是空的,沒有任何元素
14 // 當然,進到這裡的另一個原因是 tryLock() 失敗,是以該槽存在并發,不一定是該位置
15 node = new HashEntry<K,V>(hash, key, value, null);
16 retries = 0;
17 }
18 else if (key.equals(e.key))
19 retries = 0;
20 else
21 // 順着連結清單往下走
22 e = e.next;
23 }
24 // 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞隊列等待鎖
25 // lock() 是阻塞方法,直到擷取鎖後傳回
26 else if (++retries > MAX_SCAN_RETRIES) {
27 lock();
28 break;
29 }
30 else if ((retries & 1) == 0 &&
31 // 這個時候是有大問題了,那就是有新的元素進到了連結清單,成為了新的表頭
32 // 是以這邊的政策是,相當于重新走一遍這個 scanAndLockForPut 方法
33 (f = entryForHash(this, hash)) != first) {
34 e = first = f; // re-traverse if entry changed
35 retries = -1;
36 }
37 }
38 return node;
39 }
這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨占鎖。
這個方法就是看似複雜,但是其實就是做了一件事,那就是擷取該 segment 的獨占鎖,如果需要的話順便執行個體化了一下 node。
擷取鎖時,并不直接使用lock來擷取,因為該方法擷取鎖失敗時會挂起。事實上,它使用了自旋鎖,如果tryLock擷取鎖失敗,說明鎖被其它線程占用,此時通過循環再次以tryLock的方式申請鎖。如果在循環過程中該Key所對應的連結清單頭被修改,則重置retry次數。如果retry次數超過一定值,則使用lock方法申請鎖。
這裡使用自旋鎖是因為自旋鎖的效率比較高,但是它消耗CPU資源比較多,是以在自旋次數超過門檻值時切換為互斥鎖。
擴容: rehash
重複一下,segment 數組不能擴容,擴容是 segment 數組某個位置内部的數組 HashEntry\<k,v>[] 進行擴容,擴容後,容量為原來的 2 倍。
首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導緻該 segment 的元素個數超過門檻值,那麼先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。
該方法不需要考慮并發,因為到這裡的時候,是持有該 segment 的獨占鎖的。
1 // 方法參數上的 node 是這次擴容後,需要添加到新的數組中的資料。
2 private void rehash(HashEntry<K,V> node) {
3 HashEntry<K,V>[] oldTable = table;
4 int oldCapacity = oldTable.length;
5 // 2 倍
6 int newCapacity = oldCapacity << 1;
7 threshold = (int)(newCapacity * loadFactor);
8 // 建立新數組
9 HashEntry<K,V>[] newTable =
10 (HashEntry<K,V>[]) new HashEntry[newCapacity];
11 // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進制 ‘000...00011111’
12 int sizeMask = newCapacity - 1;
13
14 // 周遊原數組,老套路,将原數組位置 i 處的連結清單拆分到 新數組位置 i 和 i+oldCap 兩個位置
15 for (int i = 0; i < oldCapacity ; i++) {
16 // e 是連結清單的第一個元素
17 HashEntry<K,V> e = oldTable[i];
18 if (e != null) {
19 HashEntry<K,V> next = e.next;
20 // 計算應該放置在新數組中的位置,
21 // 假設原數組長度為 16,e 在 oldTable[3] 處,那麼 idx 隻可能是 3 或者是 3 + 16 = 19
22 int idx = e.hash & sizeMask;
23 if (next == null) // 該位置處隻有一個元素,那比較好辦
24 newTable[idx] = e;
25 else { // Reuse consecutive sequence at same slot
26 // e 是連結清單表頭
27 HashEntry<K,V> lastRun = e;
28 // idx 是目前連結清單的頭結點 e 的新位置
29 int lastIdx = idx;
30
31 // 下面這個 for 循環會找到一個 lastRun 節點,這個節點之後的所有元素是将要放到一起的
32 for (HashEntry<K,V> last = next;
33 last != null;
34 last = last.next) {
35 int k = last.hash & sizeMask;
36 if (k != lastIdx) {
37 lastIdx = k;
38 lastRun = last;
39 }
40 }
41 // 将 lastRun 及其之後的所有節點組成的這個連結清單放到 lastIdx 這個位置
42 newTable[lastIdx] = lastRun;
43 // 下面的操作是處理 lastRun 之前的節點,
44 // 這些節點可能配置設定在另一個連結清單中,也可能配置設定到上面的那個連結清單中
45 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
46 V v = p.value;
47 int h = p.hash;
48 int k = h & sizeMask;
49 HashEntry<K,V> n = newTable[k];
50 newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
51 }
52 }
53 }
54 }
55 // 将新來的 node 放到新數組中剛剛的 兩個連結清單之一 的 頭部
56 int nodeIndex = node.hash & sizeMask; // add the new node
57 node.setNext(newTable[nodeIndex]);
58 newTable[nodeIndex] = node;
59 table = newTable;
60 }
總結一下put的流程:
當執行put操作時,會進行第一次key的hash來定位Segment的位置,如果該Segment還沒有初始化,即通過CAS操作進行指派,然後進行第二次hash操作,找到相應的HashEntry的位置,這裡會利用繼承過來的鎖的特性,在将資料插入指定的HashEntry位置時(連結清單的尾端),會通過繼承ReentrantLock的tryLock()方法嘗試去擷取鎖,如果擷取成功就直接插入相應的位置,如果已經有線程擷取該Segment的鎖,那目前線程會以自旋的方式去繼續的調用tryLock()方法去擷取鎖,超過指定次數就挂起,等待喚醒。
get 過程分析
相對于 put 來說,get 真的不要太簡單。
- 計算 hash 值,找到 segment 數組中的具體位置,或我們前面用的“槽”
- 槽中也是一個數組,根據 hash 找到數組中具體的位置
- 到這裡是連結清單了,順着連結清單進行查找即可
1 public V get(Object key) {
2 Segment<K,V> s; // manually integrate access methods to reduce overhead
3 HashEntry<K,V>[] tab;
4 // 1. hash 值
5 int h = hash(key);
6 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
7 // 2. 根據 hash 找到對應的 segment
8 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
9 (tab = s.table) != null) {
10 // 3. 找到segment 内部數組相應位置的連結清單,周遊
11 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
12 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
13 e != null; e = e.next) {
14 K k;
15 if ((k = e.key) == key || (e.hash == h && key.equals(k)))
16 return e.value;
17 }
18 }
19 return null;
20 }
size操作
put、remove和get操作隻需要關心一個Segment,而size操作需要周遊所有的Segment才能算出整個Map的大小。一個簡單的方案是,先鎖住所有Sgment,計算完後再解鎖。但這樣做,在做size操作時,不僅無法對Map進行寫操作,同時也無法進行讀操作,不利于對Map的并行操作。
為更好支援并發操作,ConcurrentHashMap會在不上鎖的前提逐個Segment計算3次size,如果某相鄰兩次計算擷取的所有Segment的更新次數(每個Segment都與HashMap一樣通過modCount跟蹤自己的修改次數,Segment每修改一次其modCount加一)相等,說明這兩次計算過程中無更新操作,則這兩次計算出的總size相等,可直接作為最終結果傳回。如果這三次計算過程中Map有更新,則對所有Segment加鎖重新計算Size。該計算方法代碼如下
1 public int size() {
2 final Segment<K,V>[] segments = this.segments;
3 int size;
4 boolean overflow; // true if size overflows 32 bits
5 long sum; // sum of modCounts
6 long last = 0L; // previous sum
7 int retries = -1; // first iteration isn't retry
8 try {
9 for (;;) {
10 if (retries++ == RETRIES_BEFORE_LOCK) {
11 for (int j = 0; j < segments.length; ++j)
12 ensureSegment(j).lock(); // force creation
13 }
14 sum = 0L;
15 size = 0;
16 overflow = false;
17 for (int j = 0; j < segments.length; ++j) {
18 Segment<K,V> seg = segmentAt(segments, j);
19 if (seg != null) {
20 sum += seg.modCount;
21 int c = seg.count;
22 if (c < 0 || (size += c) < 0)
23 overflow = true;
24 }
25 }
26 if (sum == last)
27 break;
28 last = sum;
29 }
30 } finally {
31 if (retries > RETRIES_BEFORE_LOCK) {
32 for (int j = 0; j < segments.length; ++j)
33 segmentAt(segments, j).unlock();
34 }
35 }
36 return overflow ? Integer.MAX_VALUE : size;
37 }
ConcurrentHashMap的Size方法是一個嵌套循環,大體邏輯如下:
1.周遊所有的Segment。
2.把Segment的元素數量累加起來。
3.把Segment的修改次數累加起來。
4.判斷所有Segment的總修改次數是否大于上一次的總修改次數。如果大于,說明統計過程中有修改,重新統計,嘗試次數+1;如果不是。說明沒有修改,統計結束。
5.如果嘗試次數超過門檻值,則對每一個Segment加鎖,再重新統計。
6.再次判斷所有Segment的總修改次數是否大于上一次的總修改次數。由于已經加鎖,次數一定和上次相等。
7.釋放鎖,統計結束。
并發問題分析
現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮并發問題。
添加節點的操作 put 和删除節點的操作 remove 都是要加 segment 上的獨占鎖的,是以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。
- put 操作的線程安全性。
- 初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的數組。
- 添加節點到連結清單的操作是插入到表頭的,是以,如果這個時候 get 操作在連結清單周遊的過程已經到了中間,是不會影響的。當然,另一個并發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
- 擴容。擴容是新建立了數組,然後進行遷移資料,最後面将 newTable 設定給屬性 table。是以,如果 get 操作此時也在進行,那麼也沒關系,如果 get 先行,那麼就是在舊的 table 上做查詢操作;而 put 先行,那麼 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
-
remove 操作的線程安全性。
remove 操作我們沒有分析源碼,是以這裡說的讀者感興趣的話還是需要到源碼中去求實一下的。
get 操作需要周遊連結清單,但是 remove 操作會"破壞"連結清單。
如果 remove 破壞的節點 get 操作已經過去了,那麼這裡不存在任何問題。
如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麼需要将頭結點的 next 設定為數組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 并不能提供數組内部操作的可見性保證,是以源碼中使用了 UNSAFE 來操作數組,請看方法 setEntryAt。2、如果要删除的節點不是頭結點,它會将要删除節點的後繼節點接到前驅節點中,這裡的并發保證就是 next 屬性是 volatile 的。
最後我們來看看并發操作示意圖
Case1:不同Segment的并發寫入
不同Segment的寫入是可以并發執行的。
Case2:同一Segment的一寫一讀
同一Segment的寫和讀是可以并發執行的。
Case3:同一Segment的并發寫入
Segment的寫入是需要上鎖的,是以對同一Segment的并發寫入會被阻塞。
由此可見,ConcurrentHashMap當中每個Segment各自持有一把鎖。在保證線程安全的同時降低了鎖的粒度,讓并發操作效率更高。
轉載于:https://www.cnblogs.com/java-chen-hao/p/10320783.html