天天看點

Map接口以及那些實作類

從今天開始,我的部落格更新将圍繞着老鐵每天面試的問題開始,哈哈,直接進入昨天老鐵問倒我的第二個問題:hashLinkedMap,但是順便加上了幾個常問的進行總結:

hashmap:

Map接口以及那些實作類

java1.7 hashMap 底層實作是數組+連結清單

java1.8 對上面進行優化 數組+連結清單+紅黑樹

2.hashmap 是怎麼儲存資料的。

Node結構:

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        V value;
        Node<K,V> next;

        Node(int hash, K key, V value, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
           

當我們像hashMap 中放入資料時,其實就是一個

Enity{
  private String key;
  private String vaue;
}

           

在存之前會把這個Entity 轉成Node :

根據Entity 的key通過hash算出一個值當成Node 的 hash

Map接口以及那些實作類

複制完成後,還需要通過Entity 對象的hash 算出 該 Entiry對象 具體應該放在 hashMap 的那個位置。計算如下 Hash&(lenth-1) 得到的值就是hashMap 對應具體的位置。(length是目前hashMap 的長度)。

map.put(k,v)實作原理

第一步首先将k,v封裝到Node對象當中(節點)。第二步它的底層會調用K的hashCode()方法得出hash值。第三步通過哈希表函數/雜湊演算法,将hash值轉換成數組的下标,下标位置上如果沒有任何元素,就把Node添加到這個位置上。如果說下标對應的位置上有連結清單。此時,就會拿着k和連結清單上每個節點的k進行equal。如果所有的equals方法傳回都是false,那麼這個新的節點将被添加到連結清單的末尾。如其中有一個equals傳回了true,那麼這個節點的value将會被覆寫。

HashMap集合的key,會先後調用兩個方法,hashCode and equals方法,這這兩個方法都需要重寫。

map.get(k)實作原理

先調用k的hashCode()方法得出哈希值,并通過雜湊演算法轉換成數組的下标。第二步:通過上一步雜湊演算法轉換成數組的下标之後,在通過數組下标快速定位到某個位置上。重點了解如果這個位置上什麼都沒有,則傳回null。如果這個位置上有單向連結清單,那麼它就會拿着參數K和單向連結清單上的每一個節點的K進行equals,如果所有equals方法都傳回false,則get方法傳回null。如果其中一個節點的K和參數K進行equals傳回true,那麼此時該節點的value就是我們要找的value了,get方法最終傳回這個要找的value。

解決hash 沖突

    就是不同的元素通過 Hash&(lenth-1) 公式算出的位置相同,現在就啟動了連結清單(單項連結清單),挂在了目前位置的下面,而連結清單的元素怎麼關聯呢,就用到了Node 的next ,next的值就是該連結清單下一個元素在記憶體中的位址。

jdk1.7 就是這樣處理的,而到了 1.8 以後,就引用了紅黑樹,1.8以後這個連結清單隻讓挂7個元素,超過七個就會轉成一個紅黑樹進行處理(最多是64,超多64 就會重新拆分)。

Map接口以及那些實作類

當紅黑樹下挂的節點小于等于6的時候,系統會把紅黑樹轉成連結清單。 Node 在jdk1.8之前是插傳入連結表頭部的,在jdk1.8中是插傳入連結表的尾部的。

hashMap 擴容:

hashMap 會根據 目前的hashMap 的存儲量達到 160.75=12 的時候,就是擴容 162=32 依次類推下去。2倍擴容。

JDK1.7下的resize()方法是這樣的:

void resize(int newCapacity) {
        Entry[] oldTable = table;
        int oldCapacity = oldTable.length;
        if (oldCapacity == MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return;
        }
 
        Entry[] newTable = new Entry[newCapacity];
        transfer(newTable, initHashSeedAsNeeded(newCapacity));
        table = newTable;
        threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
    }
           

代碼中可以看到,如果原有table長度已經達到了上限,就不再擴容了。

如果還未達到上限,則建立一個新的table,并調用transfer方法:

/**
     * Transfers all entries from current table to newTable.
     */
    void transfer(Entry[] newTable, boolean rehash) {
        int newCapacity = newTable.length;
        for (Entry<K,V> e : table) {
            while(null != e) {
                Entry<K,V> next = e.next;              //注釋1
                if (rehash) {
                    e.hash = null == e.key ? 0 : hash(e.key);
                }
                int i = indexFor(e.hash, newCapacity); //注釋2
                e.next = newTable[i];                  //注釋3
                newTable[i] = e;                       //注釋4
                e = next;                              //注釋5
            }
        }
    }
           

transfer方法的作用是把原table的Node放到新的table中,使用的是頭插法,也就是說,新table中連結清單的順序和舊清單中是相反的,在HashMap線程不安全的情況下,這種頭插法可能會導緻環狀節點。

jdk1.8:

final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table;
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        if (oldCap > 0) {
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)                      //注釋1
                newThr = oldThr << 1; // double threshold
        }
        else if (oldThr > 0) // initial capacity was placed in threshold
            newCap = oldThr;
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        if (oldTab != null) {
            for (int j = 0; j < oldCap; ++j) {                                 //注釋2
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    if (e.next == null)                                        //注釋3
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {                      //注釋4
                                if (loTail == null)                            //注釋5
                                    loHead = e;
                                else
                                    loTail.next = e;                           //注釋6
                                loTail = e;                                    //注釋7
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {                                  /注釋8
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }
           

代碼解析:

1,在resize()方法中,定義了oldCap參數,記錄了原table的長度,定義了newCap參數,記錄新table長度,newCap是oldCap長度的2倍(注釋1),同時擴充點也乘2。

2,注釋2是循環原table,把原table中的每個連結清單中的每個元素放入新table。

3,注釋3,e.next==null,指的是連結清單中隻有一個元素,是以直接把e放入新table,其中的e.hash & (newCap - 1)就是計算e在新table中的位置,和JDK1.7中的indexFor()方法是一回事。

4,注釋// preserve order,這個注釋是源碼自帶的,這裡定義了4個變量:loHead,loTail,hiHead,hiTail,看起來可能有點眼暈,其實這裡展現了JDK1.8對于計算節點在table中下标的新思路:

正常情況下,計算節點在table中的下标的方法是:hash&(oldTable.length-1),擴容之後,table長度翻倍,計算table下标的方法是hash&(newTable.length-1),也就是hash&(oldTable.length*2-1),于是我們有了這樣的結論:這新舊兩次計算下标的結果,要不然就相同,要不然就是新下标等于舊下标加上舊數組的長度。

舉個例子,假設table原長度是16,擴容後長度32,那麼一個hash值在擴容前後的table下标是這麼計算的:

Map接口以及那些實作類

hash值的每個二進制位用abcde來表示,那麼,hash和新舊table按位與的結果,最後4位顯然是相同的,唯一可能出現的差別就在第5位,也就是hash值的b所在的那一位,如果b所在的那一位是0,那麼新table按位與的結果和舊table的結果就相同,反之如果b所在的那一位是1,則新table按位與的結果就比舊table的結果多了10000(二進制),而這個二進制10000就是舊table的長度16。

換言之,hash值的新散列下标是不是需要加上舊table長度,隻需要看看hash值第5位是不是1就行了,位運算的方法就是hash值和10000(也就是舊table長度)來按位與,其結果隻可能是10000或者00000。

是以,注釋4處的e.hash & oldCap,就是用于計算位置b到底是0還是1用的,隻要其結果是0,則新散列下标就等于原散列下标,否則新散列坐标要在原散列坐标的基礎上加上原table長度。

hashLinkedMap

先看個類圖:

Map接口以及那些實作類

本質上,HashMap和雙向連結清單合二為一即是LinkedHashMap。所謂LinkedHashMap,其落腳點在HashMap,是以更準确地說,它是一個将所有Entry節點鍊入一個雙向連結清單雙向連結清單的HashMap。

Map接口以及那些實作類

HashMap是無序的,當我們希望有順序地去存儲key-value時,就需要使用LinkedHashMap了。

由于LinkedHashMap是HashMap的子類,是以LinkedHashMap自然會擁有HashMap的所有特性。比如,LinkedHashMap也最多隻允許一條Entry的鍵為Null(多條會覆寫),但允許多條Entry的值為Null。

put方法

LinkedHashMap沒有重寫put方法,是以還是調用HashMap得到put方法,如下:

public V put(K key, V value) {
        // 對key為null的處理
        if (key == null)
            return putForNullKey(value);
        // 計算hash
        int hash = hash(key);
        // 得到在table中的index
        int i = indexFor(hash, table.length);
        // 周遊table[index],是否key已經存在,存在則替換,并傳回舊值
        for (Entry<K,V> e = table[i]; e != null; e = e.next) {
            Object k;
            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }
        
        modCount++;
        // 如果key之前在table中不存在,則調用addEntry,LinkedHashMap重寫了該方法
        addEntry(hash, key, value, i);
        return null;
    }
           

我們看看LinkedHashMap的addEntry方法:

void addEntry(int hash, K key, V value, int bucketIndex) {
        // 調用父類的addEntry,增加一個Entry到HashMap中
        super.addEntry(hash, key, value, bucketIndex);

        // removeEldestEntry方法預設傳回false,不用考慮
        Entry<K,V> eldest = header.after;
        if (removeEldestEntry(eldest)) {
            removeEntryForKey(eldest.key);
        }
    }
           

HashMap的addEntry方法中,createEntry方法,LinkedHashMap進行了重寫。

void createEntry(int hash, K key, V value, int bucketIndex) {
       HashMap.Entry<K,V> old = table[bucketIndex];
       // e就是新建立了Entry,會加入到table[bucketIndex]的表頭
       Entry<K,V> e = new Entry<>(hash, key, value, old);
       table[bucketIndex] = e;
       // 把新建立的Entry,加入到雙向連結清單中
       e.addBefore(header);
       size++;
   }
           

從這裡就可以看出,當put元素時,不但要把它加入到HashMap中去,還要加入到雙向連結清單中,是以可以看出LinkedHashMap就是HashMap+雙向連結清單,

LinkedHashMap中添加資料的過程,紅色部分是雙向連結清單,黑色部分是HashMap結構,header是一個Entry類型的雙向連結清單表頭,本身不存儲資料。

首先是隻加入一個元素Entry1,假設index為0:

Map接口以及那些實作類

當再加入一個元素Entry2,假設index為15:

Map接口以及那些實作類

當再加入一個元素Entry3, 假設index也是0:

Map接口以及那些實作類

擴容

在HashMap的put方法中,如果發現前元素個數超過了擴容閥值時,會調用resize方法,如下:

void resize(int newCapacity) {
        Entry[] oldTable = table;
        int oldCapacity = oldTable.length;
        if (oldCapacity == MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return;
        }

        Entry[] newTable = new Entry[newCapacity];
        boolean oldAltHashing = useAltHashing;
        useAltHashing |= sun.misc.VM.isBooted() &&
                (newCapacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
        boolean rehash = oldAltHashing ^ useAltHashing;
       // 把舊table的資料遷移到新table
        transfer(newTable, rehash);
        table = newTable;
        threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
    }
           

LinkedHashMap重寫了transfer方法,資料的遷移,它的實作如下:

void resize(int newCapacity) {
        Entry[] oldTable = table;
        int oldCapacity = oldTable.length;
        if (oldCapacity == MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return;
        }

        Entry[] newTable = new Entry[newCapacity];
        boolean oldAltHashing = useAltHashing;
        useAltHashing |= sun.misc.VM.isBooted() &&
                (newCapacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
        boolean rehash = oldAltHashing ^ useAltHashing;
       // 把舊table的資料遷移到新table
        transfer(newTable, rehash);
        table = newTable;
        threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
    }
           

可以看出,LinkedHashMap擴容時,資料的再散列和HashMap是不一樣的。

HashMap是先周遊舊table,再周遊舊table中每個元素的單向連結清單,取得Entry以後,重新計算hash值,然後存放到新table的對應位置。

LinkedHashMap是周遊的雙向連結清單,取得每一個Entry,然後重新計算hash值,然後存放到新table的對應位置。

從周遊的效率來說,周遊雙向連結清單的效率要高于周遊table,因為周遊雙向連結清單是N次(N為元素個數);而周遊table是N+table的空餘個數(N為元素個數)。

雙向連結清單的重排序

前面分析的,主要是目前LinkedHashMap中不存在目前key時,新增Entry的情況。當key如果已經存在時,則進行更新Entry的value。就是HashMap的put方法中的如下代碼:

for (Entry<K,V> e = table[i]; e != null; e = e.next) {
            Object k;
            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                V oldValue = e.value;
                e.value = value;
                // 重排序
                e.recordAccess(this);
                return oldValue;
            }
        }
           

主要看e.recordAccess(this),這個方法跟通路順序有關,而HashMap是無序的,是以在HashMap.Entry的recordAccess方法是空實作,但是LinkedHashMap是有序的,LinkedHashMap.Entry對recordAccess方法進行了重寫。

void recordAccess(HashMap<K,V> m) {
            LinkedHashMap<K,V> lm = (LinkedHashMap<K,V>)m;
            // 如果LinkedHashMap的accessOrder為true,則進行重排序
            // 比如前面提到LruCache中使用到的LinkedHashMap的accessOrder屬性就為true
            if (lm.accessOrder) {
                lm.modCount++;
                // 把更新的Entry從雙向連結清單中移除
                remove();
                // 再把更新的Entry加入到雙向連結清單的表尾
                addBefore(lm.header);
            }
        }
           

在LinkedHashMap中,隻有accessOrder為true,即是通路順序模式,才會put時對更新的Entry進行重新排序,而如果是插入順序模式時,不會重新排序,這裡的排序跟在HashMap中存儲沒有關系,隻是指在雙向連結清單中的順序。

舉個栗子:開始時,HashMap中有Entry1、Entry2、Entry3,并設定LinkedHashMap為通路順序,則更新Entry1時,會先把Entry1從雙向連結清單中删除,然後再把Entry1加入到雙向連結清單的表尾,而Entry1在HashMap結構中的存儲位置沒有變化,對比圖如下所示:

Map接口以及那些實作類

get方法

public V get(Object key) {
        // 調用genEntry得到Entry
        Entry<K,V> e = (Entry<K,V>)getEntry(key);
        if (e == null)
            return null;
        // 如果LinkedHashMap是通路順序的,則get時,也需要重新排序
        e.recordAccess(this);
        return e.value;
    }
           

先是調用了getEntry方法,通過key得到Entry,而LinkedHashMap并沒有重寫getEntry方法,是以調用的是HashMap的getEntry方法,在上一篇文章中我們分析過HashMap的getEntry方法:首先通過key算出hash值,然後根據hash值算出在table中存儲的index,然後周遊table[index]的單向連結清單去對比key,如果找到了就傳回Entry。

後面調用了LinkedHashMap.Entry的recordAccess方法,上面分析過put過程中這個方法,其實就是在通路順序的LinkedHashMap進行了get操作以後,重新排序,把get的Entry移動到雙向連結清單的表尾。

周遊方式取資料

來看看HashMap使用周遊方式取資料的過程:

Map接口以及那些實作類

很明顯,這樣取出來的Entry順序肯定跟插入順序不同了,既然LinkedHashMap是有序的,那麼它是怎麼實作的呢?

先看看LinkedHashMap取周遊方式擷取資料的代碼:

Map<String, String> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put("name1", "josan1");
        linkedHashMap.put("name2", "josan2");
        linkedHashMap.put("name3", "josan3");
                // LinkedHashMap沒有重寫該方法,調用的HashMap中的entrySet方法
        Set<Entry<String, String>> set = linkedHashMap.entrySet();
        Iterator<Entry<String, String>> iterator = set.iterator();
        while(iterator.hasNext()) {
            Entry entry = iterator.next();
            String key = (String) entry.getKey();
            String value = (String) entry.getValue();
            System.out.println("key:" + key + ",value:" + value);
        }
           

LinkedHashMap沒有重寫entrySet方法,我們先來看HashMap中的entrySet,如下:

public Set<Map.Entry<K,V>> entrySet() {
        return entrySet0();
    }

    private Set<Map.Entry<K,V>> entrySet0() {
        Set<Map.Entry<K,V>> es = entrySet;
        return es != null ? es : (entrySet = new EntrySet());
    }

    private final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
        public Iterator<Map.Entry<K,V>> iterator() {
            return newEntryIterator();
        }
        // 無關代碼
        ......
    }
           

可以看到,HashMap的entrySet方法,其實就是傳回了一個EntrySet對象。

我們得到EntrySet會調用它的iterator方法去得到疊代器Iterator,從上面的代碼也可以看到,iterator方法中直接調用了newEntryIterator方法并傳回,而LinkedHashMap重寫了該方法

Iterator<Map.Entry<K,V>> newEntryIterator() { 
        return new EntryIterator();
    }
           

LinkedHashMap中,Iterator<Entry<String, String>> iterator = set.iterator(),這段代碼會傳回一個繼承LinkedHashIterator的Iterator,它有着跟HashIterator不一樣的周遊規則。

該iterator周遊的是雙向連結清單,是以不會存在HashMap中需要尋找下一條單向連結清單的情況,從頭結點Entry header的下一個節點開始,隻要把目前傳回的Entry的after作為下一個應該傳回的節點即可。直到到達雙向連結清單的尾部時,after為雙向連結清單的表頭節點Entry header,這時候hasNext就會傳回false,表示沒有下一個元素了。LinkedHashMap的周遊取值如下圖所示:

Map接口以及那些實作類

LinkedHashMap在Android中的應用

在Android中使用圖檔時,一般會用LruCacha做圖檔的記憶體緩存,它裡面就是使用LinkedHashMap來實作存儲的。

public class LruCache<K, V> {
    private final LinkedHashMap<K, V> map;
    public LruCache(int maxSize) {
        if (maxSize <= 0) {
            throw new IllegalArgumentException("maxSize <= 0");
        }
        this.maxSize = maxSize;
        // 注意第三個參數,是accessOrder,這裡為true,後面會講到
        this.map = new LinkedHashMap<K, V>(0, 0.75f, true);
    }
           

前面提到了,accessOrder為true,表示LinkedHashMap為通路順序,當對已存在LinkedHashMap中的Entry進行get和put操作時,會把Entry移動到雙向連結清單的表尾(其實是先删除,再插入)。

我們拿LruCache的put方法舉例:

public final V put(K key, V value) {
        if (key == null || value == null) {
            throw new NullPointerException("key == null || value == null");
        }

        V previous;
        // 對map進行操作之前,先進行同步操作
        synchronized (this) {
            putCount++;
            size += safeSizeOf(key, value);
            previous = map.put(key, value);
            if (previous != null) {
                size -= safeSizeOf(key, previous);
            }
        }

        if (previous != null) {
            entryRemoved(false, key, previous, value);
        }
        // 整理記憶體,看是否需要移除LinkedHashMap中的元素
        trimToSize(maxSize);
        return previous;
    }
           

之前提到了,HashMap是線程不安全的,LinkedHashMap同樣是線程不安全的。是以在對調用LinkedHashMap的put方法時,先使用synchronized 進行了同步操作。

我們最關心的是倒數第一行代碼,其中maxSize為我們給LruCache設定的最大緩存大小。我們看看該方法

/**
     * Remove the eldest entries until the total of remaining entries is at or
     * below the requested size.
     *
     * @param maxSize the maximum size of the cache before returning. May be -1
     *            to evict even 0-sized elements.
     */
    public void trimToSize(int maxSize) {
        // while死循環,直到滿足目前緩存大小小于或等于最大可緩存大小
        while (true) {
            K key;
            V value;
            // 線程不安全,需要同步
            synchronized (this) {
                if (size < 0 || (map.isEmpty() && size != 0)) {
                    throw new IllegalStateException(getClass().getName()
                            + ".sizeOf() is reporting inconsistent results!");
                }
                // 如果目前緩存的大小,已經小于等于最大可緩存大小,則直接傳回
                // 不需要再移除LinkedHashMap中的資料
                if (size <= maxSize || map.isEmpty()) {
                    break;
                }
                // 得到的就是雙向連結清單表頭header的下一個Entry
                Map.Entry<K, V> toEvict = map.entrySet().iterator().next();
                key = toEvict.getKey();
                value = toEvict.getValue();
                // 移除目前取出的Entry
                map.remove(key);
                // 從新計算目前的緩存大小
                size -= safeSizeOf(key, value);
                evictionCount++;
            }

            entryRemoved(true, key, value, null);
        }
    }

           

前面的重排序我們知道,對LinkedHashMap的put和get操作,都會讓被操作的Entry移動到雙向連結清單的表尾,而移除是從map.entrySet().iterator().next()開始的,也就是雙向連結清單的表頭的header的after開始的,這也就符合了LRU算法的需求。

TreeMap

TreeMap的底層是一顆紅黑樹

紅黑樹簡介

紅黑樹又稱紅-黑二叉樹,它首先是一顆二叉樹,它具體二叉樹所有的特性。同時紅黑樹更是一顆自平衡的排序二叉樹。

我們知道一顆基本的二叉樹他們都需要滿足一個基本性質--即樹中的任何節點的值大于它的左子節點,且小于它的右子節點。按照這個基本性質使得樹的檢索效率大大提高。我們知道在生成二叉樹的過程是非常容易失衡的,最壞的情況就是一邊倒(隻有右/左子樹),這樣勢必會導緻二叉樹的檢索效率大大降低(O(n)),是以為了維持二叉樹的平衡,大牛們提出了各種實作的算法,如:AVL,SBT,伸展樹,TREAP ,紅黑樹等等。

   平衡二叉樹必須具備如下特性:它是一棵空樹或它的左右兩個子樹的高度差的絕對值不超過1,并且左右兩個子樹都是一棵平衡二叉樹。也就是說該二叉樹的任何一個等等子節點,其左右子樹的高度都相近。
           

紅黑樹顧名思義就是節點是紅色或者黑色的平衡二叉樹,它通過顔色的限制來維持着二叉樹的平衡。對于一棵有效的紅黑樹二叉樹而言我們必須增加如下規則:

1、每個節點都隻能是紅色或者黑色

   2、根節點是黑色

   3、每個葉節點(NIL節點,空節點)是黑色的。

   4、如果一個結點是紅的,則它兩個子節點都是黑的。也就是說在一條路徑上不能出現相鄰的兩個紅色結點。

   5、從任一節點到其每個葉子的所有路徑都包含相同數目的黑色節點。
           

這些限制強制了紅黑樹的關鍵性質: 從根到葉子的最長的可能路徑不多于最短的可能路徑的兩倍長。結果是這棵樹大緻上是平衡的。因為操作比如插入、删除和查找某個值的最壞情況時間都要求與樹的高度成比例,這個在高度上的理論上限允許紅黑樹在最壞情況下都是高效的,而不同于普通的二叉查找樹。是以紅黑樹它是複雜而高效的,其檢索效率O(log n)。下圖為一顆典型的紅黑二叉樹。

Map接口以及那些實作類

TreeMap繼承AbstractMap,實作NavigableMap、Cloneable、Serializable三個接口。其中AbstractMap表明TreeMap為一個Map即支援key-value的集合, NavigableMap(更多)則意味着它支援一系列的導航方法,具備針對給定搜尋目标傳回最接近比對項的導航方法 。

繼承體系: Map -> SortMap -> NavigbleMap

// 既然叫做SortMap, 要排序的話,當然需要一個比較器了
Comparator<? super K> comparator();

SortedMap<K,V> subMap(K fromKey, K toKey);

// 源碼注釋: 傳回比Map中key比參數toKey小的所有kv對
SortedMap<K,V> headMap(K toKey);

// 源碼注釋:傳回比Map中key比參數fromKey大或相等的所有kv對
SortedMap<K,V> tailMap(K fromKey);

K firstKey();

K lastKey()
           

接着就是 NavigableMap 定義的接口

// 傳回Map中比傳入參數key小的kv對中,key最大的一個kv對
Map.Entry<K,V> lowerEntry(K key);
K lowerKey(K key);

// 傳回Map中比傳入參數key小或相等的kv對中,key最大的一個kv對
Map.Entry<K,V> floorEntry(K key);
K floorKey(K key);

// 傳回Map中比傳入參數key大或相等的kv對中,key最小的一個kv對
Map.Entry<K,V> ceilingEntry(K key);
K ceilingKey(K key);

// 傳回Map中比傳入參數key大的kv對中,key最小的一個kv對
Map.Entry<K,V> higherEntry(K key);
K higherKey(K key);


Map.Entry<K,V> firstEntry();
Map.Entry<K,V> lastEntry();
Map.Entry<K,V> pollFirstEntry();
NavigableMap<K,V> descendingMap();
NavigableSet<K> navigableKeySet();
NavigableSet<K> descendingKeySet();
           

put方法:

public V put(K key, V value) {
    Entry<K,V> t = root;
    /**
     * 如果根節點都為null,還沒建立起來紅黑樹,我們先new Entry并指派給root把紅黑樹建立起來,這個時候紅
     * 黑樹中已經有一個節點了,同時修改操作+1。
     */
    if (t == null) {
        compare(key, key); 
        root = new Entry<>(key, value, null);
        size = 1;
        modCount++;
        return null;
    }
    /**
     * 如果節點不為null,定義一個cmp,這個變量用來進行二分查找時的比較;定義parent,是new Entry時必須
     * 要的參數
     */
    int cmp;
    Entry<K,V> parent;
    // cpr表示有無自己定義的排序規則,分兩種情況周遊執行
    Comparator<? super K> cpr = comparator;
    if (cpr != null) {
        /**
         * 從root節點開始周遊,通過二分查找逐漸向下找
         * 第一次循環:從根節點開始,這個時候parent就是根節點,然後通過自定義的排序算法
         * cpr.compare(key, t.key)比較傳入的key和根節點的key值,如果傳入的key<root.key,那麼
         * 繼續在root的左子樹中找,從root的左孩子節點(root.left)開始:如果傳入的key>root.key,
         * 那麼繼續在root的右子樹中找,從root的右孩子節點(root.right)開始;如果恰好key==root.key,
         * 那麼直接根據root節點的value值即可。
         * 後面的循環規則一樣,當周遊到的目前節點作為起始節點,逐漸往下找
         *
         * 需要注意的是:這裡并沒有對key是否為null進行判斷,建議自己的實作Comparator時應該要考慮在内
         */
        do {
            parent = t;
            cmp = cpr.compare(key, t.key);
            if (cmp < 0)
                t = t.left;
            else if (cmp > 0)
                t = t.right;
            else
                return t.setValue(value);
        } while (t != null);
    }
    else {
        //從這裡看出,當預設排序時,key值是不能為null的
        if (key == null)
            throw new NullPointerException();
        @SuppressWarnings("unchecked")
        Comparable<? super K> k = (Comparable<? super K>) key;
        //這裡的實作邏輯和上面一樣,都是通過二分查找,就不再多說了
        do {
            parent = t;
            cmp = k.compareTo(t.key);
            if (cmp < 0)
                t = t.left;
            else if (cmp > 0)
                t = t.right;
            else
                return t.setValue(value);
        } while (t != null);
    }
    /**
     * 能執行到這裡,說明前面并沒有找到相同的key,節點已經周遊到最後了,我們隻需要new一個Entry放到
     * parent下面即可,但放到左子節點上還是右子節點上,就需要按照紅黑樹的規則來。
     */
    Entry<K,V> e = new Entry<>(key, value, parent);
    if (cmp < 0)
        parent.left = e;
    else
        parent.right = e;
    /**
     * 節點加進去了,并不算完,我們在前面紅黑樹原理章節提到過,一般情況下加入節點都會對紅黑樹的結構造成
     * 破壞,我們需要通過一些操作來進行自動平衡處置,如【變色】【左旋】【右旋】
     */
    fixAfterInsertion(e);
    size++;
    modCount++;
    return null;
}
           

對于新節點的插入有如下三個關鍵地方:

1、插入新節點總是紅色節點 。

   2、如果插入節點的父節點是黑色, 能維持性質 。

   3、如果插入節點的父節點是紅色, 破壞了性質. 故插入算法就是通過重新着色或旋轉, 來維持性質 。
           
Map接口以及那些實作類
Map接口以及那些實作類

左旋:

Map接口以及那些實作類

CurrentHashMap

在JDK1.7中ConcurrentHashMap采用了數組+Segment+分段鎖的方式實作。

1.Segment(分段鎖)

ConcurrentHashMap中的分段鎖稱為Segment,它即類似于HashMap的結構,即内部擁有一個Entry數組,數組中的每個元素又是一個連結清單,同時又是一個ReentrantLock(Segment繼承了ReentrantLock)。

2.内部結構

ConcurrentHashMap使用分段鎖技術,将資料分成一段一段的存儲,然後給每一段資料配一把鎖,當一個線程占用鎖通路其中一個段資料的時候,其他段的資料也能被其他線程通路,能夠實作真正的并發通路。如下圖是ConcurrentHashMap的内部結構圖:

Map接口以及那些實作類

從上面的結構我們可以了解到,ConcurrentHashMap定位一個元素的過程需要進行兩次Hash操作。

第一次Hash定位到Segment,第二次Hash定位到元素所在的連結清單的頭部。

3.該結構的優劣勢

壞處

這一種結構的帶來的副作用是Hash的過程要比普通的HashMap要長

好處

寫操作的時候可以隻對元素所在的Segment進行加鎖即可,不會影響到其他的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時支援Segment數量大小的寫操作(剛好這些寫操作都非常平均地分布在所有的Segment上)。

put添加操作

public V put(K key, V value) {
        Segment<K,V> s;
        //計算hash key值
        int hash = hash(key);
        //通過hash key值計算出存入Segment的位置
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          
             (segments, (j << SSHIFT) + SBASE)) == null) 
            //初始化Segment
            s = ensureSegment(j);
         //添加
        return s.put(key, hash, value, false);
}

           

Segment:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    //segment操作加鎖,使用嘗試擷取鎖方式。如果擷取失敗,進入scanAndLockForPut方法
    HashEntry<K,V> node = tryLock() ? null :
    scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
    HashEntry<K,V>[] tab = table;
    int index = (tab.length - 1) & hash;
    HashEntry<K,V> first = entryAt(tab, index);
    for (HashEntry<K,V> e = first;;) {
        if (e != null) {
        K k;
        if ((k = e.key) == key ||
            (e.hash == hash && key.equals(k))) {
            oldValue = e.value;
            if (!onlyIfAbsent) {
            e.value = value;
            ++modCount;
            }
            break;
        }
        e = e.next;
        }
        else {
        if (node != null)
            node.setNext(first);
        else
            node = new HashEntry<K,V>(hash, key, value, first);
        int c = count + 1;
        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    //擴容, 這是後續做解釋
            rehash(node);
        else
            setEntryAt(tab, index, node);
        ++modCount;
        count = c;
        oldValue = null;
        break;
        }
    }
    } finally {
    //釋放鎖
    unlock();
    }
    return oldValue;
}

           

上面幾個方法,ConcurrentHashMap在進行put操作時,先通過key找到承載的Segment對象位置,然後競争操作Segment的獨占鎖,以確定操作線程。擷取鎖方式很簡單,就是tryLock(),如果擷取鎖失敗,執行scanAndLockForPut方法

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // 疊代次數
    while (!tryLock()) {
    HashEntry<K,V> f; 
    if (retries < 0) {
        if (e == null) {
        if (node == null) // speculatively create node
            node = new HashEntry<K,V>(hash, key, value, null);
        retries = 0;
        }
        else if (key.equals(e.key))
        retries = 0;
        else
        e = e.next;
    }
        //超過疊代次數,阻塞
    else if (++retries > MAX_SCAN_RETRIES) {
        lock();
        break;
    }
    else if ((retries & 1) == 0 &&
         (f = entryForHash(this, hash)) != first) {
        e = first = f; // re-traverse if entry changed
        retries = -1;
    }
    }
    return node;
}

           

scanAndLockForPut 實作也比較簡單,循環調用tryLock,多次擷取,如果循環次數retries 次數大于事先設定定好的MAX_SCAN_RETRIES,就執行lock() 方法,此方法會阻塞等待,一直到成功拿到Segment鎖為止。

擴容問題

ConcurrentHashMap的擴容跟HashMap有點不同, ConcurrentHashMap的Segment槽是固定的16個,不變的

final Segment<K,V>[] segments;

而ConcurrentHashMap的擴容講的是Segment中的HashEntry數組擴容。當HashEntry達到某個臨界點後,會擴容2為之前的2倍, 原理跟HashMap擴容類似。

現在我們,在看會put方法中一個if分支

```java
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
    //擴容, 這是後續做解釋
    rehash(node);
else

           

rehash方法就是HashEntry擴充邏輯。當線程執行到rehash方法時,表示目前線程已經擷取到到目前Segment的鎖對象,這就表示rehash方法的執行是線程安全,不會存在并發問題。

ConcurrentHashMap的remove方法跟put方法操作一樣,先擷取segement對象後再操作,這裡就不重複了。

get操作:

public V get(Object key) {
        Segment<K,V> s; 
        HashEntry<K,V>[] tab;
        //1:計算key的hash值
        int h = hash(key);
        //2:确定在segment的位置,得到HashEntry數組
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            //3:得到資料連結清單,疊代,查找key對應的value值
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

           

細心的朋友會發現get方法并沒有擷取鎖的操作,這時就得讨論下執行get操作線程安全問題。

1:一線程執行put,另一個線程執行get

ConcurrentHashMap約定新添的節點是在連結清單的表頭, 是以如果先執行get,後執行put, get操作已經周遊到連結清單中間了, 不會影響put的安全執行。如果先執行put,這時候,就必須保證剛剛插入的表頭節點能被讀取,ConcurrentHashMap使用的UNSAFE.putOrderedObject指派方式保證。

2:一個線程執行put,并在擴容操作期間, 另一個線程執行get

ConcurrentHashMap擴容是新建立了HashEntry數組,然後進行遷移資料,最後面将 newTable指派給oldTable。如果 get 先執行,那麼就是在oldTable 上做查詢操作,不發送線程安全問題;而如果put 先執行,那麼 put 操作的可見性保證就是 oldTable使用了 volatile 關鍵字即可。

3:一線程執行remove,另一個線程執行get

ConcurrentHashMap的删除分2種情況, 1>删除節點在連結清單表頭。那操作節點就是HashEntry數組元素了,雖然HashEntry[] table 使用了volatile修飾, 但是, volatile并保證資料内部元素的操作可見性,是以隻能使用UNSAFE 來操作元素。2>删除節點中标中間, 那麼好辦, 隻需要保證節點中的next屬性是volatile修飾即可

總結:get方法之是以不需要加鎖,原因比較簡單,get為隻讀操作,不會改動map資料結構,是以在操作過程中,隻需要保證涉及讀取資料的屬性為線程可見即可,也即使用volatile修飾。

JDK1.8版本的CurrentHashMap的實作原理

jdk8版本的ConcurrentHashMap相對于jdk7版本,發送了很大改動,jdk8直接抛棄了Segment的設計,采用了較為輕捷的Node + CAS + Synchronized設計,保證線程安全。

Map接口以及那些實作類

看上圖ConcurrentHashMap的大體結構,一個node數組,預設為16,可以自動擴充,擴充速度為0.75

private static finalint DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75f;

           

每一個節點,挂載一個連結清單,當連結清單挂載資料大于8時,連結清單自動轉換成紅黑樹

ConcurrentHashMap

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    transient volatile Node<K,V>[] table;
}
           
static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
}

           
static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;
}

           
static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
 }

           

Node

ConcurrentHashMap核心内部類,它包裝了key-value鍵值對,所有插入ConcurrentHashMap的資料都包裝在這裡面。

TreeNode

樹節點類,當資料連結清單長度大于8時,會轉換為TreeNode。注意,此時的TreeNode并不是紅黑樹對象,它并不是直接轉換為紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的包裝。

TreeBin

TreeNode節點的包裝對象,可以認為是紅黑樹對象。它代替了TreeNode的根節點,ConcurrentHashMap的node“數組”中,存放就是TreeBin對象,而不是TreeNode對象。

在ConcurrentHashMap中使用了unSafe方法,通過直接操作記憶體的方式來保證并發處理的安全性,使用的是硬體的安全機制。

/*
     * 用來傳回節點數組的指定位置的節點的原子操作
     */
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    /*
     * cas原子操作,在指定位置設定值
     */
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    /*
     * 原子操作,在指定位置設定值
     */
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
           

來看下jdk8版本ConcurrentHashMap 的put操作

/*
     * 當添加一對鍵值對的時候,首先會去判斷儲存這些鍵值對的數組是不是初始化了,
     * 如果沒有的話就初始化數組
     *  然後通過計算hash值來确定放在數組的哪個位置
     * 如果這個位置為空則直接添加,如果不為空的話,則取出這個節點來
     * 如果取出來的節點的hash值是MOVED(-1)的話,則表示目前正在對這個數組進行擴容,複制到新的數組,則目前線程也去幫助複制
     * 最後一種情況就是,如果這個節點,不為空,也不在擴容,則通過synchronized來加鎖,進行添加操作
     *    然後判斷目前取出的節點位置存放的是連結清單還是樹
     *    如果是連結清單的話,則周遊整個連結清單,直到取出來的節點的key來個要放的key進行比較,如果key相等,并且key的hash值也相等的話,
     *          則說明是同一個key,則覆寫掉value,否則的話則添加到連結清單的末尾
     *    如果是樹的話,則調用putTreeVal方法把這個元素添加到樹中去
     *  最後在添加完成之後,會判斷在該節點處共有多少個節點(注意是添加前的個數),如果達到8個以上了的話,
     *  則調用treeifyBin方法來嘗試将處的連結清單轉為樹,或者擴容數組
     */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();//K,V都不能為空,否則的話跑出異常
        int hash = spread(key.hashCode());    //取得key的hash值
        int binCount = 0;    //用來計算在這個節點總共有多少個元素,用來控制擴容或者轉移為樹
        for (Node<K,V>[] tab = table;;) {    //
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)    
                tab = initTable();    //第一次put的時候table沒有初始化,則初始化table
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    //通過哈希計算出一個表中的位置因為n是數組的長度,是以(n-1)&hash肯定不會出現數組越界
                if (casTabAt(tab, i, null,        //如果這個位置沒有元素的話,則通過cas的方式嘗試添加,注意這個時候是沒有加鎖的
                             new Node<K,V>(hash, key, value, null)))        //建立一個Node添加到數組中區,null表示的是下一個節點為空
                    break;                   // no lock when adding to empty bin
            }
            /*
             * 如果檢測到某個節點的hash值是MOVED,則表示正在進行數組擴張的資料複制階段,
             * 則目前線程也會參與去複制,通過允許多線程複制的功能,一次來減少數組的複制所帶來的性能損失
             */
            else if ((fh = f.hash) == MOVED)    
                tab = helpTransfer(tab, f);
            else {
                /*
                 * 如果在這個位置有元素的話,就采用synchronized的方式加鎖,
                 *     如果是連結清單的話(hash大于0),就對這個連結清單的所有元素進行周遊,
                 *         如果找到了key和key的hash值都一樣的節點,則把它的值替換到
                 *         如果沒找到的話,則添加在連結清單的最後面
                 *  否則,是樹的話,則調用putTreeVal方法添加到樹中去
                 *  
                 *  在添加完之後,會對該節點上關聯的的數目進行判斷,
                 *  如果在8個以上的話,則會調用treeifyBin方法,來嘗試轉化為樹,或者是擴容
                 */
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {        //再次取出要存儲的位置的元素,跟前面取出來的比較
                        if (fh >= 0) {                //取出來的元素的hash值大于0,當轉換為樹之後,hash值為-2
                            binCount = 1;            
                            for (Node<K,V> e = f;; ++binCount) {    //周遊這個連結清單
                                K ek;
                                if (e.hash == hash &&        //要存的元素的hash,key跟要存儲的位置的節點的相同的時候,替換掉該節點的value即可
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)        //當使用putIfAbsent的時候,隻有在這個key沒有設定值得時候才設定
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {    //如果不是同樣的hash,同樣的key的時候,則判斷該節點的下一個節點是否為空,
                                    pred.next = new Node<K,V>(hash, key,        //為空的話把這個要加入的節點設定為目前節點的下一個節點
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {    //表示已經轉化成紅黑樹類型了
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,    //調用putTreeVal方法,将該元素添加到樹中去
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)    //當在同一個節點的數目達到8個的時候,則擴張數組或将給節點的資料轉為tree
                        treeifyBin(tab, i);    
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);    //計數
        return null;
    }

           

從put源碼可看,JDK8版本更多使用的cas程式設計方式控制線程安全, 必要時也會使用synchronized 代碼塊保證線程安全。

在put方法的詳解中,我們可以看到,在同一個節點的個數超過8個的時候,會調用treeifyBin方法來看看是擴容還是轉化為一棵樹

同時在每次添加完元素的addCount方法中,也會判斷目前數組中的元素是否達到了sizeCtl的量,如果達到了的話,則會進入transfer方法去擴容

/**
     * Replaces all linked nodes in bin at given index unless table is
     * too small, in which case resizes instead.
     * 當數組長度小于64的時候,擴張數組長度一倍,否則的話把連結清單轉為樹
     */
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
                System.out.println("treeifyBin方\t==>數組長:"+tab.length);
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY 64
                tryPresize(n << 1);        // 數組擴容
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {    //使用synchronized同步器,将該節點出的連結清單轉為樹
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;    //hd:樹的頭(head)
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)        //把Node組成的連結清單,轉化為TreeNode的連結清單,頭結點任然放在相同的位置
                                hd = p;    //設定head
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的連結清單放入容器TreeBin中
                    }
                }
            }
        }
    }
           

可以看到當需要擴容的時候,調用的時候tryPresize方法,看看trePresize的源碼

/**
     * 擴容表為指可以容納指定個數的大小(總是2的N次方)
     * 假設原來的數組長度為16,則在調用tryPresize的時候,size參數的值為16<<1(32),此時sizeCtl的值為12
     * 計算出來c的值為64,則要擴容到sizeCtl≥為止
     *  第一次擴容之後 數組長:32 sizeCtl:24
     *  第二次擴容之後 數組長:64 sizeCtl:48
     *  第二次擴容之後 數組長:128 sizeCtl:94 --> 這個時候才會退出擴容
     */
    private final void tryPresize(int size) {
            /*
             * MAXIMUM_CAPACITY = 1 << 30
             * 如果給定的大小大于等于數組容量的一半,則直接使用最大容量,
             * 否則使用tableSizeFor算出來
             * 後面table一直要擴容到這個值小于等于sizeCtrl(數組長度的3/4)才退出擴容
             */
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
//            printTable(tab);    調試用的
            /*
             * 如果數組table還沒有被初始化,則初始化一個大小為sizeCtrl和剛剛算出來的c中較大的一個大小的數組
             * 初始化的時候,設定sizeCtrl為-1,初始化完成之後把sizeCtrl設定為數組長度的3/4
             * 為什麼要在擴張的地方來初始化數組呢?這是因為如果第一次put的時候不是put單個元素,
             * 而是調用putAll方法直接put一個map的話,在putALl方法中沒有調用initTable方法去初始化table,
             * 而是直接調用了tryPresize方法,是以這裡需要做一個是不是需要初始化table的判斷
             */
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    //初始化tab的時候,把sizeCtl設為-1
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            /*
             * 一直擴容到的c小于等于sizeCtl或者數組長度大于最大長度的時候,則退出
             * 是以在一次擴容之後,不是原來長度的兩倍,而是2的n次方倍
             */
            else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                    break;    //退出擴張
            }
            else if (tab == table) {
                int rs = resizeStamp(n);
                /*
                 * 如果正在擴容Table的話,則幫助擴容
                 * 否則的話,開始新的擴容
                 * 在transfer操作,将第一個參數的table中的元素,移動到第二個元素的table中去,
                 * 雖然此時第二個參數設定的是null,但是,在transfer方法中,當第二個參數為null的時候,
                 * 會建立一個兩倍大小的table
                 */
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    /*
                     * transfer的線程數加一,該線程将進行transfer的幫忙
                     * 在transfer的時候,sc表示在transfer工作的線程數
                     */
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                /*
                 * 沒有在初始化或擴容,則開始擴容
                 */
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2)) {
                        transfer(tab, null);
                }
            }
        }
    }
           

在tryPresize方法中,并沒有加鎖,允許多個線程進入,如果數組正在擴張,則目前線程也去幫助擴容。

數組擴容的主要方法就是transfer方法

/**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     * 把數組中的節點複制到新的數組的相同位置,或者移動到擴張部分的相同位置
     * 在這裡首先會計算一個步長,表示一個線程處理的數組長度,用來控制對CPU的使用,
     * 每個CPU最少處理16個長度的數組元素,也就是說,如果一個數組的長度隻有16,那隻有一個線程會對其進行擴容的複制移動操作
     * 擴容的時候會一直周遊,知道複制完所有節點,沒處理一個節點的時候會在連結清單的頭部設定一個fwd節點,這樣其他線程就會跳過他,
     * 複制後在新數組中的連結清單不是絕對的反序的
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用來控制不要占用太多CPU
            stride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16
        /*
         * 如果複制的目标nextTab為null的話,則初始化一個table兩倍長的nextTab
         * 此時nextTable被設定值了(在初始情況下是為null的)
         * 因為如果有一個線程開始了表的擴張的時候,其他線程也會進來幫忙擴張,
         * 而隻是第一個開始擴張的線程需要初始化下目标數組
         */
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        /*
         * 建立一個fwd節點,這個是用來控制并發的,當一個節點為空或已經被轉移之後,就設定為fwd節點
         * 這是一個空的标志節點
         */
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;    //是否繼續向前查找的标志位
        boolean finishing = false; // to ensure sweep(清掃) before committing nextTab,在完成之前重新在掃描一遍數組,看看有沒完成的沒
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing) {
                    advance = false;
                }
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {        //已經完成轉移
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);    //設定sizeCtl為擴容後的0.75
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
                            return;
                    }
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)            //數組中把null的元素設定為ForwardingNode節點(hash值為MOVED[-1])
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {                //加鎖操作
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {        //該節點的hash值大于等于0,說明是一個Node節點
                                /*
                                 * 因為n的值為數組的長度,且是power(2,x)的,是以,在&操作的結果隻可能是0或者n
                                 * 根據這個規則
                                 *         0-->  放在新表的相同位置
                                 *         n-->  放在新表的(n+原來位置)
                                 */
                            int runBit = fh & n; 
                            Node<K,V> lastRun = f;
                            /*
                             * lastRun 表示的是需要複制的最後一個節點
                             * 每當新節點的hash&n -> b 發生變化的時候,就把runBit設定為這個結果b
                             * 這樣for循環之後,runBit的值就是最後不變的hash&n的值
                             * 而lastRun的值就是最後一次導緻hash&n 發生變化的節點(假設為p節點)
                             * 為什麼要這麼做呢?因為p節點後面的節點的hash&n 值跟p節點是一樣的,
                             * 是以在複制到新的table的時候,它肯定還是跟p節點在同一個位置
                             * 在複制完p節點之後,p節點的next節點還是指向它原來的節點,就不需要進行複制了,自己就被帶過去了
                             * 這也就導緻了一個問題就是複制後的連結清單的順序并不一定是原來的倒序
                             */
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;    //n的值為擴張前的數組的長度
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            /*
                             * 構造兩個連結清單,順序大部分和原來是反的
                             * 分别放到原來的位置和新增加的長度的相同位置(i/n+i)
                             */
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                        /*
                                         * 假設runBit的值為0,
                                         * 則第一次進入這個設定的時候相當于把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同為0的節點)設定到舊的table的第一個hash計算後為0的節點下一個節點
                                         * 并且把自己傳回,然後在下次進來的時候把它自己設定為後面節點的下一個節點
                                         */
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                        /*
                                         * 假設runBit的值不為0,
                                         * 則第一次進入這個設定的時候相當于把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同不為0的節點)設定到舊的table的第一個hash計算後不為0的節點下一個節點
                                         * 并且把自己傳回,然後在下次進來的時候把它自己設定為後面節點的下一個節點
                                         */
                                    hn = new Node<K,V>(ph, pk, pv, hn);    
                            }
                            setTabAt(nextTab, i, ln);    
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {    //否則的話是一個樹節點
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            /*
                             * 在複制完樹節點之後,判斷該節點處構成的樹還有幾個節點,
                             * 如果≤6個的話,就轉回為一個連結清單
                             */
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
           

到這裡,ConcurrentHashMap的put操作和擴容都介紹的差不多了,

下面的兩點一定要注意:

1·複制之後的新連結清單不是舊連結清單的絕對倒序。

2·在擴容的時候每個線程都有處理的步長,最少為16,在這個步長範圍内的數組節點隻有自己一個線程來處理

get方法:

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            //擷取table[i] 的node元素
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

           
//確定多線程可見,并且保證擷取到是記憶體中最新的table[i] 元素值
 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

           

get源碼也沒有加鎖操作,操作原理跟jdk1.7版本一樣,這裡就不累贅了。

場景1:多線程複合操作時是否能保證線程安全

答案是不能,原因: ConcurrentHashMap 使用鎖分離(jdk7)/cas(jdk8)方式保證并發環境下,添加/删除操作安全,但這進針對的是單個put 或者 remove方法,如果多個方法配合複合使用,依然需要額外加鎖。

場景2:多線程同時添加相同hash 碼值時轉換成紅黑樹時,是否存在并發問題

答案是可以保證線程安全,原因:ConcurrentHashMap 連結清單轉換成紅黑樹時,對轉換方法做加鎖防護了

resize() 方法

JDK1.7中的實作:

跟HashMap的 resize() 沒太大差別,都是在 put() 元素時去做的擴容,是以在1.7中的實作是獲得了鎖之後,在單線程中去做擴容(1.new個2倍數組 2.周遊old數組節點搬去新數組)。

JDK1.8中的實作:

jdk1.8的擴容支援并發遷移節點,從old數組的尾部開始,如果該桶被其他線程處理過了,就建立一個 ForwardingNode 放到該桶的首節點,hash值為-1,其他線程判斷hash值為-1後就知道該桶被處理過了。

計算size

JDK1.7中的實作:

static final class Segment<K,V> extends ReentrantLock implements Serializable {  
        private static final long serialVersionUID = 2249069246763182397L;  
    // 最大嘗試擷取鎖次數,tryLock可能會阻塞,準備鎖住segment操作擷取鎖。  
     在多處理器中,用一個有界的嘗試次數,保證在定位node的時候,可以從緩存直接擷取。  
        static final int MAX_SCAN_RETRIES =  
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;   
    //segment内部的Hash table,通路HashEntry,通過具有volatile的entryAt/setEntryAt方法  
        transient volatile HashEntry<K,V>[] table;   
     //segment的table中HashEntry的數量,隻有在lock或其他保證可見性的volatile reads中,才可以通路count  
    transient int count;  
    //在segment上所有的修改操作數。盡管可能會溢出,但它為isEmpty和size方法,  
    提供了有效準确穩定的檢查或校驗。隻有在lock或其他保證可見性的volatile reads 
    中,才可以通路  
        transient int modCount;  
        transient int threshold;   
        final float loadFactor;  
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {  
            this.loadFactor = lf;  
            this.threshold = threshold;  
            this.table = tab;  
        }  
}  
           

先采用不加鎖的方式,計算兩次,如果兩次結果一樣,說明是正确的,傳回。

如果兩次結果不一樣,則把所有 segment 鎖住,重新計算所有 segment的 Count 的和

JDK1.8中的實作:

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
           

sumCount有兩個重要的屬性baseCount和counterCells,如果counterCells不為空,那麼總共的大小就是baseCount與周遊counterCells的value值累加獲得的。

//先利用sumCount()計算,然後如果值超過int的最大值,就傳回int的最大值。
//但是有時size就會超過最大值,這時最好用mappingCount方法,傳回的一個估值
public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    //注:mappingcount值是傳回的一個估值,不是精準值
public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }
final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }


           

baseCount是從哪裡來的?

//當沒有線程争用時,使用這個變量計數
 private transient volatile long baseCount;
           

一個volatile變量,在addCount方法會使用它,而addCount方法在put結束後會調用,在put操作結束後,更新計數。

if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) 

           
static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}
           

在并發情況下,如果CAS修改baseCount失敗後,就會使用到CounterCell類,會建立一個對象,通常對象的volatile的value屬性是1。

if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
           

如果上面CAS操作也失敗了,在fullAddCount方法中,會繼續死循環操作,直到成功。那麼最終那個沒有被baseCount記錄到的資料就被CounterCell記錄了,結果也是傳回倆個資料相加的和。

ConcurrentHashMap的同步機制

前面分析了下ConcurrentHashMap的源碼,那麼,對于一個映射集合來說,ConcurrentHashMap是如果來做到并發安全,又是如何做到高效的并發的呢?

首先是讀操作,從源碼中可以看出來,在get操作中,根本沒有使用同步機制,也沒有使用unsafe方法,是以讀操作是支援并發操作的。

那麼寫操作呢?

分析這個之前,先看看什麼情況下會引起數組的擴容,擴容是通過transfer方法來進行的。而調用transfer方法的隻有trePresize、helpTransfer和addCount三個方法。

這三個方法又是分别在什麼情況下進行調用的呢?

·tryPresize是在treeIfybin和putAll方法中調用,treeIfybin主要是在put添加元素完之後,判斷該數組節點相關元素是不是已經超過8個的時候,如果超過則會調用這個方法來擴容數組或者把連結清單轉為樹。

·helpTransfer是在當一個線程要對table中元素進行操作的時候,如果檢測到節點的HASH值為MOVED的時候,就會調用helpTransfer方法,在helpTransfer中再調用transfer方法來幫助完成數組的擴容

·addCount是在當對數組進行操作,使得數組中存儲的元素個數發生了變化的時候會調用的方法。

是以引起數組擴容的情況如下:

·

隻有在往map中添加元素的時候,在某一個節點的數目已經超過了8個,同時數組的長度又小于64的時候,才會觸發數組的擴容。

·當數組中元素達到了sizeCtl的數量的時候,則會調用transfer方法來進行擴容

那麼在擴容的時候,可以不可以對數組進行讀寫操作呢?

事實上是可以的。當在進行數組擴容的時候,如果目前節點還沒有被處理(也就是說還沒有設定為fwd節點),那就可以進行設定操作。

如果該節點已經被處理了,則目前線程也會加入到擴容的操作中去。

那麼,多個線程又是如何同步處理的呢?

在ConcurrentHashMap中,同步處理主要是通過Synchronized和unsafe兩種方式來完成的。

·在取得sizeCtl、某個位置的Node的時候,使用的都是unsafe的方法,來達到并發安全的目的

·當需要在某個位置設定節點的時候,則會通過Synchronized的同步機制來鎖定該位置的節點。

·在數組擴容的時候,則通過處理的步長和fwd節點來達到并發安全的目的,通過設定hash值為MOVED

·當把某個位置的節點複制到擴張後的table的時候,也通過Synchronized的同步機制來保證現程安全

HashTable

HashTable和HashMap的實作原理幾乎一樣,差别無非是

HashTable不允許key和value為null

HashTable是線程安全的

但是HashTable線程安全的政策實作代價卻太大了,簡單粗暴,get/put所有相關操作都是synchronized的,這相當于給整個哈希表加了一把大鎖。

多線程通路時候,隻要有一個線程通路或操作該對象,那其他線程隻能阻塞,相當于将所有的操作串行化,在競争激烈的并發場景中性能就會非常差。