天天看點

ConcurrentHashMap1.7版本源碼深度解讀

作者:詩歌樂園

ConcurrentHashMap是concurrent并發包下重要的工具類,它的設計和實作非常的巧妙,它将資料分段,每段資料使用一個AQS鎖,進而減小了鎖的粒度。

1.ConcurrentHashMap的結構

ConcurrentHashMap1.7版本源碼深度解讀

結構圖

一個ConcurrentHashMap是由多個Segment(段)組成的。Segment類繼承了ReentrantLock類,這意味着每個Segment擁有了一個AQS,多個線程的操作落到同一個Segment上時,發生了鎖的競争。ConcurrentHashMap預設有16個Segment,在初始化之後,Segment個數不可修改。

一個Segment包含了一個HashEntry的數組,每個HashEntry都是一個單向連結清單,HashEntry的數組可以擴容,擴容後數組的長度為原來的2倍。HashEntry類如下圖所示:

ConcurrentHashMap1.7版本源碼深度解讀

我們看到value和next都是volatile修飾的,這保證了資料的可見性。

2.put方法詳解

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    //計算key的hash值
    int hash = hash(key);
    //hash為32位無符号數,segmentShift預設為28,向右移28位,剩下高4位
    //然後和segmentMask(預設值為15)做與運算,結果還是高4位
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
       // 對 segment[j] 進行初始化
      s = ensureSegment(j);
    //放入到對應的段内
    return s.put(key, hash, value, false);
}           

第一步是根據hash值快速擷取到相應的Segment,第二步就是Segment内部的put操作了。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    //擷取該segment的獨占鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        //用hash值和(數組長度-1)做與運算,得出數組的下标
        int index = (tab.length - 1) & hash;
        //first 是數組該位置處的連結清單的表頭
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            //判斷是不是到了尾部,尾部==null
            if (e != null) {
                K k;
                //key相同,值更新
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                //繼續下一個節點
                e = e.next;
            }
            else {
                //node不為空,則node作為頭節點,使用的是頭插法
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 如果超過了該 segment 的門檻值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // 沒有達到門檻值,将 node 放到數組 tab 的 index 位置,
                    // 其實就是将新的節點設定成原連結清單的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}           

如何進行擴容:

private void rehash(HashEntry<K,V> node) {
    
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    //擴容為原來的2倍
    int newCapacity = oldCapacity << 1;
    //計算閥值
    threshold = (int)(newCapacity * loadFactor);
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    //循環舊的HashEntry數組
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            //該連結清單上隻有一個節點
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    //計算在新數組中的下标
                    int k = last.hash & sizeMask;
                    //目前節點的下标和上一個節點的下标不一緻時,修改最終節點值
                    //注意如果後面的節點和前面的節點下标一緻,
                    //那麼後面的節點保持原有的順序,直接帶到新tab[k]的連結清單中
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                //采用頭插法,最後一個節點作為頭節點
                newTable[lastIdx] = lastRun;
                //重新計算節點在數組中的位置,采用頭插法插入到連結清單
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    //添加新節點
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    //将newTable指派給該segment的table
    table = newTable;
}           

自旋擷取aqs鎖:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
     //如果嘗試加鎖失敗,那麼就對segment[hash]對應的連結清單進行周遊找到需要put的這個entry所在的連結清單中的位置,
    //這裡之是以進行一次周遊找到坑位,主要是為了通過周遊過程将周遊過的entry全部放到CPU高速緩存中,
    //這樣在擷取到鎖了之後,再次進行定位的時候速度會十分快,這是線上程無法擷取到鎖前并等待的過程中的一種預熱方式。
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        //擷取鎖失敗,初始時retries=-1必然開始先進入第一個if
        if (retries < 0) {
            //e=null代表兩種意思,
            //1.第一種就是周遊連結清單到了最後,仍然沒有發現指定key的entry;
            //2.第二種情況是剛開始時entryForHash(通過hash找到的table中對應位置連結清單的結點)找到的HashEntry就是空的
            if (e == null) {
                /*
                當然這裡之是以還需要對node==null進行判斷,是因為有可能在第一次給node指派完畢後,然後預熱準備工作已經搞定,然後進行循環嘗試擷取鎖,在循環次數還未達到<2>64次以前,某一次在條件<3>判斷時發現有其它線程對這個segment進行了修改,那麼retries被重置為-1,進而再一次進入到<1>條件内,此時如果再次周遊到連結清單最後時,因為上一次周遊時已經給node指派過了,是以這裡判斷node是否為空,進而避免第二次建立對象給node重複指派。
                */
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // 嘗試擷取鎖次數超過設定的最大值,直接進入阻塞等待,這就是所謂的有限制的自旋擷取鎖,
            //之是以這樣是因為如果持有鎖的線程要過很久才釋放鎖,這期間如果一直無限制的自旋其實是對cpu性能有消耗的,
            //這樣無限制的自旋是不利的,是以加入最大自旋次數,超過這個次數則進入阻塞狀态等待對方釋放鎖并擷取鎖
            lock();
            break;
        }
        // 周遊過程中,有可能其它線程改變了周遊的連結清單,這時就需要重新進行周遊了。
        //判斷是否初始化了結點 并且 判斷連結清單頭結點是否改變(1.7使用頭插法)
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}           

這個方法用了一個while循環去擷取aqs鎖,有兩種情況需要說明下:

1.如果嘗試的次數超過了最大自旋次數,會進入到aqs的等待隊列,避免了cpu的空轉。

2.如果在循環的過程中,其他的線程擷取到了鎖,并且改變了周遊的連結清單,那麼自旋計數器重置為-1,從連結清單的頭節點重新開始周遊。

3.get方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}           

get方法并沒有加鎖,基本思路是:

1.先定位到所在的segment

2.定位對應的segment的tab數組内的位置

3.然後周遊連結清單元素,如果找到相同的key就傳回對應的value

4.總結:

ConcurrentHashMap1.7采用了分而治之的思想,将資料分段,每個段持有一把aqs鎖。

它的寫操作都是需要擷取鎖之後再操作,而讀操作不需要擷取鎖,這也說明它适合讀多寫少的業務場景。

線程在擷取不到aqs鎖的情況下,不會立即進入到等待隊列,會進行一定次數的自旋。