ConcurrentHashMap是concurrent并發包下重要的工具類,它的設計和實作非常的巧妙,它将資料分段,每段資料使用一個AQS鎖,進而減小了鎖的粒度。
1.ConcurrentHashMap的結構
結構圖
一個ConcurrentHashMap是由多個Segment(段)組成的。Segment類繼承了ReentrantLock類,這意味着每個Segment擁有了一個AQS,多個線程的操作落到同一個Segment上時,發生了鎖的競争。ConcurrentHashMap預設有16個Segment,在初始化之後,Segment個數不可修改。
一個Segment包含了一個HashEntry的數組,每個HashEntry都是一個單向連結清單,HashEntry的數組可以擴容,擴容後數組的長度為原來的2倍。HashEntry類如下圖所示:
我們看到value和next都是volatile修飾的,這保證了資料的可見性。
2.put方法詳解
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//計算key的hash值
int hash = hash(key);
//hash為32位無符号數,segmentShift預設為28,向右移28位,剩下高4位
//然後和segmentMask(預設值為15)做與運算,結果還是高4位
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 對 segment[j] 進行初始化
s = ensureSegment(j);
//放入到對應的段内
return s.put(key, hash, value, false);
}
第一步是根據hash值快速擷取到相應的Segment,第二步就是Segment内部的put操作了。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//擷取該segment的獨占鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//用hash值和(數組長度-1)做與運算,得出數組的下标
int index = (tab.length - 1) & hash;
//first 是數組該位置處的連結清單的表頭
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
//判斷是不是到了尾部,尾部==null
if (e != null) {
K k;
//key相同,值更新
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//繼續下一個節點
e = e.next;
}
else {
//node不為空,則node作為頭節點,使用的是頭插法
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超過了該 segment 的門檻值,這個 segment 需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 沒有達到門檻值,将 node 放到數組 tab 的 index 位置,
// 其實就是将新的節點設定成原連結清單的表頭
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
如何進行擴容:
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
//擴容為原來的2倍
int newCapacity = oldCapacity << 1;
//計算閥值
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
//循環舊的HashEntry數組
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
//該連結清單上隻有一個節點
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
//計算在新數組中的下标
int k = last.hash & sizeMask;
//目前節點的下标和上一個節點的下标不一緻時,修改最終節點值
//注意如果後面的節點和前面的節點下标一緻,
//那麼後面的節點保持原有的順序,直接帶到新tab[k]的連結清單中
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//采用頭插法,最後一個節點作為頭節點
newTable[lastIdx] = lastRun;
//重新計算節點在數組中的位置,采用頭插法插入到連結清單
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//添加新節點
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
//将newTable指派給該segment的table
table = newTable;
}
自旋擷取aqs鎖:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//如果嘗試加鎖失敗,那麼就對segment[hash]對應的連結清單進行周遊找到需要put的這個entry所在的連結清單中的位置,
//這裡之是以進行一次周遊找到坑位,主要是為了通過周遊過程将周遊過的entry全部放到CPU高速緩存中,
//這樣在擷取到鎖了之後,再次進行定位的時候速度會十分快,這是線上程無法擷取到鎖前并等待的過程中的一種預熱方式。
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//擷取鎖失敗,初始時retries=-1必然開始先進入第一個if
if (retries < 0) {
//e=null代表兩種意思,
//1.第一種就是周遊連結清單到了最後,仍然沒有發現指定key的entry;
//2.第二種情況是剛開始時entryForHash(通過hash找到的table中對應位置連結清單的結點)找到的HashEntry就是空的
if (e == null) {
/*
當然這裡之是以還需要對node==null進行判斷,是因為有可能在第一次給node指派完畢後,然後預熱準備工作已經搞定,然後進行循環嘗試擷取鎖,在循環次數還未達到<2>64次以前,某一次在條件<3>判斷時發現有其它線程對這個segment進行了修改,那麼retries被重置為-1,進而再一次進入到<1>條件内,此時如果再次周遊到連結清單最後時,因為上一次周遊時已經給node指派過了,是以這裡判斷node是否為空,進而避免第二次建立對象給node重複指派。
*/
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 嘗試擷取鎖次數超過設定的最大值,直接進入阻塞等待,這就是所謂的有限制的自旋擷取鎖,
//之是以這樣是因為如果持有鎖的線程要過很久才釋放鎖,這期間如果一直無限制的自旋其實是對cpu性能有消耗的,
//這樣無限制的自旋是不利的,是以加入最大自旋次數,超過這個次數則進入阻塞狀态等待對方釋放鎖并擷取鎖
lock();
break;
}
// 周遊過程中,有可能其它線程改變了周遊的連結清單,這時就需要重新進行周遊了。
//判斷是否初始化了結點 并且 判斷連結清單頭結點是否改變(1.7使用頭插法)
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
這個方法用了一個while循環去擷取aqs鎖,有兩種情況需要說明下:
1.如果嘗試的次數超過了最大自旋次數,會進入到aqs的等待隊列,避免了cpu的空轉。
2.如果在循環的過程中,其他的線程擷取到了鎖,并且改變了周遊的連結清單,那麼自旋計數器重置為-1,從連結清單的頭節點重新開始周遊。
3.get方法
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
get方法并沒有加鎖,基本思路是:
1.先定位到所在的segment
2.定位對應的segment的tab數組内的位置
3.然後周遊連結清單元素,如果找到相同的key就傳回對應的value
4.總結:
ConcurrentHashMap1.7采用了分而治之的思想,将資料分段,每個段持有一把aqs鎖。
它的寫操作都是需要擷取鎖之後再操作,而讀操作不需要擷取鎖,這也說明它适合讀多寫少的業務場景。
線程在擷取不到aqs鎖的情況下,不會立即進入到等待隊列,會進行一定次數的自旋。