天天看點

源碼分析-ConcurrentHashMap文檔域方法

文檔

doc文檔

一個支援并發的提取和修改的散清單。這個類和hashtable準守相同的規範,并且每個版本都對應相同的功能規範。雖然這是一個線程安全的類,但是他并不依賴于一個整體的鎖,沒有一個鎖可以鎖住整個元素而靜止所有通路。這個類和hashtable可以互操作。

提取操作如get。不會阻塞,是以有可能被更新方法鎖覆寫,提取反映了最近完全操作的結果。對于聚合操作如果putAll和clear,并發的提取可能隻能反映一部分也結果(聚合操作不是原子性的)。相似的疊代器和枚舉傳回散清單基于某個點的狀态,或者疊代器和枚舉建立時的狀态。他們不會抛出

ConcurrentModificationException.然後疊代器被設計成沒成隻能被一個線程通路。

允許的并發數通過可選的concurrencyLevel來指定,預設值為16,内部單元的大小。散清單在内部分區使得運作訓示數量的無争用的更新操作。由于表内元素的放置是随機的,是以并發的情況不可預測。理想情況下,你應該指定一個推薦的值允許多少個線程并發的通路表。使用大的值會浪費空間和時間,小的值會導緻線程争用。但在一個數量級内的低估或者高估不會有非常明顯的差異。如果隻有一個線程允許修改,而其他線程将隻讀時,1是适當的值。調整散清單是一個非常慢的操作,是以盡可能預先估計給一個合适的大小傳遞給構造器。

這個表和他的視圖和它的疊代器實作了Map和Iterator的全部功能。

和HashTable一樣,不同于HashMap,這個類允許null作為鍵或者元素。

源碼文檔

基本的政策是将表分為幾個小的分區。每個分區是一個并發可讀的散清單。為了減小占用空間,隻建構一個分區,而其他的分區在需要的時候在建構。為了保持懶惰構造的可見性,所有的分區表必須是volatile通路的。通過unsafe方法的segmnetAt實作。他提供了AtomicReferenceArrays的功能但是減少了中間的調用環節。此外,volatile寫元素和鍵值對的next域的所操作使用了更快捷的懶惰集合的寫模式。通過(putOrderedObject)因為寫總是需要釋放鎖的。這樣做維持了表序列的一緻性更新。

曆史記錄:早起的實作嚴重依賴于final域,他避免了volatile的讀操作但是存在大量的空間占用問題。一些遺留的設計用于相容序列化的可用性(比如強制構造分區0);

概述

總的來說ConcurrentHashMap就是有一系列的Segment

靜态域

static final int DEFAULT_INITIAL_CAPACITY = ;//預設初始大小
    static final float DEFAULT_LOAD_FACTOR = f;//預設裝填因子
    static final int DEFAULT_CONCURRENCY_LEVEL = ;//預設Segment數量
    static final int MAXIMUM_CAPACITY =  << ;//預設表的最大容量
    static final int MIN_SEGMENT_TABLE_CAPACITY = ;//Segment的最小數量
    static final int MAX_SEGMENTS =  << ;//Segment的最大容量
    static final int RETRIES_BEFORE_LOCK = ;//上鎖前Entry擷取次數,用于size和containsValue。

    private static final sun.misc.Unsafe UNSAFE;//以下的類都用于UNSAFE。這幾個寫的内容有幾個部分是出于序列化的時候的相容性需要。
    private static final long SBASE;
    private static final int SSHIFT;
    private static final long TBASE;
    private static final int TSHIFT;
    private static final long HASHSEED_OFFSET;
    private static final long SEGSHIFT_OFFSET;
    private static final long SEGMASK_OFFSET;
    private static final long SEGMENTS_OFFSET;
           

非靜态域

final int segmentMask;//hashCode的高位來選擇應該散列到哪個Segment中。
    final int segmentShift;//hashCode的移位大小。
    final Segment<K,V>[] segments;//Segments 數組

    transient Set<K> keySet;//以下為視圖項。
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;
           

方法

構造器

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > ) || initialCapacity <  || concurrencyLevel <= )
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = ;//這個兩個參數使用用于計算目前的hashCode應該屬于哪一個segment。
        int ssize = ;
        while (ssize < concurrencyLevel) {//ssize取當不小于設定值的2的階乘數。
            ++sshift;
            ssize <<= ;
        }
        this.segmentShift =  - sshift;
        this.segmentMask = ssize - ;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;//c用來計算segment中Entry[]的大小。
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)//同樣cap也是Segment中Entry[]的大小,也需要是2的階乘數。
            cap <<= ;
        // create segments and segments[0]僅僅為了相容性讨論。UNSAFE.putOrderedObject。也是出于相容性需要。
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }
           

HashEntry

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);//putOrderedObject可以儲存檔案到指定位置,但是不提供可見性,如果要提供可見性需要對域用修飾符volatile修飾。
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
           

HashEntry整體上比較簡單,但是有兩點需要說明一下。

首先,HashEntry和HashMap中的Entry是不同的,HashEntry并不是Map中的Entry。這裡并沒有實作任何接口。因為Map中的entry功能是比較多的。而這裡的Entry隻是提供了一個連結清單容器的功能。而且這樣的設計也比較好,Map中雖然有Entry的接口但是未必要去實作。

其次,域的可見性的問題。這裡這四個域都可以保持可見性,當然volatile的可見性是顯而易見的。final本身也是提供一定可見性的。這是final一個比較靈活運用的地方。比如同樣的set或者map中容器内的元素,如果元素的域是final的,則可以保證這個容器在存放這樣的元素時是線程安全的,即使容器本身沒有同步。因為同樣的final不可以被設定兩變。這個在之前《并發程式設計實戰》上說過。當然從語義上說這裡key和hashcode也不應該是可變的,而且對于Entry的通路操作也進行了同步,因為不是所有的域都是final的

Segment

Segment源碼文檔

Segment會儲存表的Entry清單,并且會始終保持一緻的狀态。是以通過對table域加上volatile的修飾符,read操作無鎖的進行讀取。在膨脹過程中,需要儲存重複的節點,以保證這些舊的清單可以被讀取者進行讀取。

這個類僅僅對于修改操作定義了鎖。除非特意注明,這個class方法執行per-segment 版本的的currentHashMap方法。其他方法被內建進ConcurrentHashMap方法。這些修改方法通過scanAndLock和scanAndLockForPut方法實作受控的自旋鎖。這個主要是為了減少緩存未命中的情況(這在hashTable中是很常見的問題)。我們并不真的使用這樣的方式去找到node。node因為他們必須在有鎖的情況下獲得以確定更新的一緻性。但是這樣做通常比進行重新定位要快的多。scanAndLockForput 建立了一個新的節點如果需要放置的節點沒有被找到的情況下。

static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() >  ?  : ;
           

這是标志了在嘗試擷取鎖之前最小的後期數量。在scanAndLock和scanAndLockForput 中有用。具體在方法中說明。

transient volatile HashEntry<K,V>[] table;
        transient int count;
        transient int modCount;
        transient int threshold;//table的的大小如果超過這個值就需要進行膨脹了。
        final float loadFactor;
           

put方法

首先來看put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - ) & hash;//散列的方式下面統一說。這裡隻要知道是為了計算下标就好。
                HashEntry<K,V> first = entryAt(tab, index);//找到第一個Entry
                for (HashEntry<K,V> e = first;;) {//隻要周遊去找所有的Entry看是否有符合條件的。
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||//判斷是否符合目前的條件
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {//onlyIfAbsent表示隻有在不存在的時候修改,為false就是可以修改已存在的值。
                                e.value = value;
                                ++modCount;
                            }
                            break;//退出循環
                        }
                        e = e.next;
                    }
                    else {//當周遊之後沒有發現,則需要進行插入Entry的操作。
                        if (node != null)//node是在加鎖之前周遊了節點沒有發現節點而建立了節點傳回的,是以隻要把這個檔期節點設定為首節點就好。注意這裡僅僅是設定了node的next,因為還沒有進行size的檢查。
                            node.setNext(first);
                        else//node為空說明直接獲得了鎖,沒有構造新的節點。是以需要先構造新的節點。
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + ;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)//判斷目前的table的大小是否需要進行膨脹。
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);設定table的首節點為noe
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
           

這裡還包括兩個小的方法:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);//table中首節點的位置。
            HashEntry<K,V> e = first;//e為疊代器。
            HashEntry<K,V> node = null;//node為當沒有找到節點的時候需要傳回的新構造的Entry。
            int retries = -; // negative while locating node//目前重試的次數,當為-1的時候說明正在經曆定位元素的節點。當這個值增長到一定數值的時候,MAX_SCAN_RETRIES則說明目前清單比較穩定了,開始上鎖。
            while (!tryLock()) {//隻要沒有獲得鎖就一直進行循環
                HashEntry<K,V> f; // to recheck first below//用來标記f節點是否是首節點。如果不是則說明出現不一緻情況需要重新擷取f并重新進行掃描。
                if (retries < ) {//retries為-1,表示目前還沒有實作過重試。
                    if (e == null) {//e為null說明則說明null已經疊代到尾部了。
                        if (node == null) // speculatively create node //需要建立新的元素。
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = ;//
                    }
                    else if (key.equals(e.key))//找到一個已存在的元素。也将重試次數置為0,開始等待目前清單是否為0。等待上鎖。
                        retries = ;
                    else//找下一個元素
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {//在兩次重試的過程中連結清單沒有發生變化,說明比較穩定,開始上鎖。上鎖成功後退出循環。
                    lock();
                    break;
                }
                else if ((retries & ) ==  &&// 再次查詢目前hash值對應的talbe的位置是否還是先前儲存的first。如果不是則說明清單被修改,重置參數。
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -;
                }
            }
            return node;
        }
           

rehash的文檔

在一新的table中重分類節點,因為我們使用2的階層的拓展,是以每一個元素的位置要麼在原先的位置,要麼需要後移2的階乘倍。通過找到就節點可以重用的情況來避免clone的發生。統計意義上來說,在預設的的域值下,隻有大約六分之一的節點需要克隆。替換的節點隻要沒有任何讀取者的引用指向題目,則是可以被垃圾回收器回收的。(可能由于并發的周遊操作),Entry的通路使用數組下标因為題目被volatile修飾。

private void rehash(HashEntry<K,V> node) {

            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << ;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];//這裡有一個疑問問什麼要在這裡加上一個強制類型轉換。
            int sizeMask = newCapacity - ;
            for (int i = ; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];//e代表了舊表中的散清單table下标i位置的頭節點,當然如果是空的,則代表table的該位置沒有散列項。
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list//對于隻有一個散列項的元素隻要移動單獨的節點就好了。
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;//從這裡開始的11行。目的是為了找到連結清單結尾散列到相同新下标的節點。對于這部分元素進行整體移動,就免去了重新構造新節點的代價。
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {//這部分就是講除去最後一小段已經集體移動的節點之外,對于之前的所有節點進行複制放入新table的位置中。
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node//把需要添加的新元素添加到原表
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;//對table賦新的表;
        }
           

首先segment再散列或者說膨脹隻是對單個Segment進行的。是以如果一個表進行多次變換後Segment中table的大小很可能是不一樣的。當然如果散列函數做的比較好應該各個Segment的大小應該是比較接近的。

其次對于原segment的中的原實際上是進行了複制,或者移動的,不過由于隻有在put操作中才會進行再散列,而且put又是上了鎖的,是以可以保證程式被正确同步,但是對于讀操作得到的引用未必是目前的引用了。這一點需要注意。

最後,這中間找連結清單的最後一段進行複制的操作。這個是從統計意義上說更效率。

remove

final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);//如果上鎖識别,則掃描鍊table中的hash表以保持緩存命中。
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - ) & hash;//計算下标
                HashEntry<K,V> e = entryAt(tab, index);//e為tab[index]的連結清單首節點
                HashEntry<K,V> pred = null;//為單項連結清單的某節點的前驅節點
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||//如果如果找到對映的節點則準備進行設定。
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {//這裡不太明白為什麼會有value為空的情況Concurrent并不運作value為null。然後進行删除,需要考慮e是頭結點的情況
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
           

這裡比較常見的判斷方法是先用key==号判斷,也就是兩者是同一個節點,或者使用key.equals和hashcode來判斷。這樣比較全面,效率兼顧安全。大部分的hash方面的問題都是這樣做的。

此外我其實還有一點不是很明确。就是關于volatile的修飾後記憶體可見性的問題。當然volatile原理上說是記憶體緩存禁止标志位。但是從這裡來看似乎并不完全是這樣,或者我原先的了解有一定問題。我本來了解就是既然沒有緩存,就每次讀取直接從記憶體中讀取資料了。

但是從這裡來看scanAndLock沒有任何影響,主要的目的就是在于使緩存重新整理,也就是說不停的從記憶體中讀取。但是上鎖之後實際上還是進行了讀操作的,那上鎖之後的讀操作到底是從緩存中讀取的還是從記憶體中讀取的呢?我了解應該是記憶體。如果是記憶體那這樣掃描還有什麼意義呢。。到目前為止我還是不是很了解。But this code come from Doug Lea。是以大概還是我哪裡了解的有問題。。

private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < ) {
                    if (e == null || key.equals(e.key))
                        retries = ;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & ) ==  &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -;
                }
            }
        }
           

散列過程

  1. hashSeed的産生過程:

hashSeed的産生過程基本上和HashMap中的是類似的是需要根據

sun.misc.Hashing.randomHashSeed(instance)

産生的,這需要虛拟機産生啟動後才能使用,是以使用一個靜态類來初始化。此外這裡還有一個是可以通過jdk.map.althashing.threshold來控制的域值需要使用一個靜态的Holder類來進行處理。之前在分析HashMap的時候沒有詳細說,這裡盡我所能說明白怎麼去控制這些參數以及他們發揮作用的原理。對于String需要單獨産生散列種子。

我們從後往前說,首先産生散列種子需要的是一個随機數。這個随機數是由randomHashSeed針對每個ConcurrentHashMap的執行個體産生的。這裡分兩種情況,預設情況下傳回0,如果設定了ALTERNATIVE_HASHING(并且虛拟機已經啟動,則會使用sun.misc.Hashing.randomHashSeed來産生一個随機數)。這裡為什麼不直接産生,是為了給使用者提供一些自定義的功能,在虛拟機啟動的時候可以通過設定一些參數來控制散列種子的産生。這是java的慣用手法凡是根hash産生有關的部分都使用了類似的方式。

private transient final int hashSeed = randomHashSeed(this);

    private static int randomHashSeed(ConcurrentHashMap instance) {
        if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
            return sun.misc.Hashing.randomHashSeed(instance);
        }

        return ;
    }
           

其次這裡的ALTERNATIVE_HASHING是根據什麼産生的。Holder靜态内部了用于産生這個ALTERNATIVE_HASHING。

不同的HashMap中使用的ALTERNATIVE_HASHING的方法都有些不同,這裡主要是控制設定的如果門檻值,如果門檻值過小則需要随機産生散列種子。是以通常情況下都會對String 做單獨處理。如果在啟動jvm的時候設定了jdk.map.althashing.threshold,Holder會比較jdk.map.althashing.threshold與MAXIMUM_CAPACITY的大小。如果他小于ConcurrentHashMap的最大值MAXIMUM_CAPACITY,則為ALTERNATIVE_HASHINGtrue。至于為什麼這樣做我現在還沒法回答。如果不設定則預設為Integer.MAX_VALUE,也就是不需要設定。換句話說,如果設定的域值小于java的ConcurrentHashMap的最大值MAXIMUM_CAPACITY則需要重新計算散列種子。

private static class Holder {

        /**
        * Enable alternative hashing of String keys?
        *
        * <p>Unlike the other hash map implementations we do not implement a
        * threshold for regulating whether alternative hashing is used for
        * String keys. Alternative hashing is either enabled for all instances
        * or disabled for all instances.
        */
        static final boolean ALTERNATIVE_HASHING;

        static {
            // Use the "threshold" system property even though our threshold
            // behaviour is "ON" or "OFF".
            String altThreshold = java.security.AccessController.doPrivileged(
                new sun.security.action.GetPropertyAction(
                    "jdk.map.althashing.threshold"));

            int threshold;
            try {
                threshold = (null != altThreshold)
                        ? Integer.parseInt(altThreshold)
                        : Integer.MAX_VALUE;

                // disable alternative hashing if -1
                if (threshold == -) {
                    threshold = Integer.MAX_VALUE;
                }

                if (threshold < ) {
                    throw new IllegalArgumentException("value must be positive integer.");
                }
            } catch(IllegalArgumentException failed) {
                throw new Error("Illegal value for 'jdk.map.althashing.threshold'", failed);
            }
            ALTERNATIVE_HASHING = threshold <= MAXIMUM_CAPACITY;
        }
    }
           
  1. hashCode的産生過程:

ConcurrentHashMap的産生過程和HashMap中是有所不同的,因為ConcurrentHashM實際上是需要根據hashCode的位來計算其屬于哪個Segment的。是以對每一位的散列都需要進行精細的控制。中間這一系列的位移操作主要目的就是使得散列效果被平均的配置設定到各個位上。因為如果是直接用散列的話也許低位不一樣,但是高位是相似的,這樣使用高位去定位Segment的時候回把大量的Entry定位到一個Segment中,并發性能降低。

此外對于String需要單獨的産生随機數種子,這是String的特性決定的。

private int hash(Object k) {
        int h = hashSeed;

        if (( != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  ) ^ ;
        h ^= (h >>> );
        h += (h <<   );
        h ^= (h >>>  );
        h += (h <<   ) + (h << );
        return h ^ (h >>> );
    }
           
  1. 定位到Segment

Segment是不同的預設情況下Segment通過下面的式子計算出來的,預設情況下segmentShift為28,segmentMask為15,也就是說僅僅使用高四位來進行散列到16個Segment中。

int j = (hash >>> segmentShift) & segmentMask;
           

方法

put

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;// 計算散列到那個Segment。
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck//獲得Segment,但是這裡不太明白工作原理。為何要多出這三行,因為是final類的程式是以這裡應該是沒有可見性問題才對。而且很奇怪的是在在remove中就沒有這部分,而是直接使用SegmentForHash
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);//确定了segment之後會把任務委托給Segment的put來做。這部分之前說過這裡不詳細說明了。
    }
           

不過這裡UNSAFE的問題還是有一些需要說明。UNSAFE了解的不太透徹嘗試解釋一下,有問題以後再修改。

首先,UNSAFE獲得的偏移量、首位址、或者增量位址都是針對一個具體的*.class來說的。這些變量都是固定的,可能對于不同的虛拟機會有所不同,但是一旦獲得之後就是常量了。但是如果在使用這些常量進行獲得、交換或者替換等原子性操作的時的時候需要綁定具體的執行個體。

首先看下這幾個UNSAFE獲得常量:

static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            TBASE = UNSAFE.arrayBaseOffset(tc);//獲得是該類的首位址
            SBASE = UNSAFE.arrayBaseOffset(sc);
            ts = UNSAFE.arrayIndexScale(tc);//獲得是該類的增量位址
            ss = UNSAFE.arrayIndexScale(sc);
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(//獲得的是域的偏移位址
                ConcurrentHashMap.class.getDeclaredField("hashSeed"));
            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentShift"));
            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentMask"));
            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segments"));
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((ss & (ss-)) !=  || (ts & (ts-)) != )//這裡主要是判斷Segments和table是不是2的指數。
            throw new Error("data type scale not a power of two");
        SSHIFT =  - Integer.numberOfLeadingZeros(ss);//這個計算的是偏移量。實際上在後續的計算中使用SSHIFT而不使用ss,因為位操作速度更快。這樣就需要保證但是這裡我不明的一點是既然是根據單獨的類生成了那應該是保證是保證是固定的值為何還要判斷是否是2的階乘。此外這裡為何一定要求是2的階乘。總之問題多多。主要還是java記憶體怎麼設定的不太了解。ConcurrentHashMap中大量運用UNSAFE來設定和擷取,不太明白這樣做和直接讀取有什麼差別。
        TSHIFT =  - Integer.numberOfLeadingZeros(ts);
    }
           

remove、replace

兩者方法類似,就是定位Segment然後委托給Segment來操作就好。

public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }
           

疊代器

疊代器的内容都差不多。因為是map是以有很多衍生的類,這裡隻看下最主要的,因為實作相對比較簡單,我也沒有看到非正常的問題。

abstract class HashIterator {
        int nextSegmentIndex;
        int nextTableIndex;
        HashEntry<K,V>[] currentTable;
        HashEntry<K, V> nextEntry;
        HashEntry<K, V> lastReturned;

        HashIterator() {
            nextSegmentIndex = segments.length - ;
            nextTableIndex = -;
            advance();
        }

        /**
         * Set nextEntry to first node of next non-empty table
         * (in backwards order, to simplify checks).
         */
        final void advance() {//advance的任務是将節點設定到下一個連結清單的頭結點,這裡有兩種情況一種是目前table[]中的某個連結清單周遊完,也有可能是Segment中的所有連結清單周遊完畢,是以這裡要兩種情況讨論。Map總的來說是從後往前周遊的。每次切換Segment都要切換nextTableIndex 為下一個talbe的長度。當所有Segment周遊完畢也就結束周遊了。
            for (;;) {
                if (nextTableIndex >= ) {
                    if ((nextEntry = entryAt(currentTable,
                                             nextTableIndex--)) != null)
                        break;
                }
                else if (nextSegmentIndex >= ) {
                    Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
                    if (seg != null && (currentTable = seg.table) != null)
                        nextTableIndex = currentTable.length - ;
                }
                else
                    break;
            }
        }

        final HashEntry<K,V> nextEntry() {//nextEntry會依照目前節點向後移動,直到移動到null說明目前的連結清單周遊完成需要移動到下一個連結清單的頭結點這個任務由advance完成。
            HashEntry<K,V> e = nextEntry;
            if (e == null)
                throw new NoSuchElementException();
            lastReturned = e; // cannot assign until after null check
            if ((nextEntry = e.next) == null)
                advance();
            return e;
        }
           

其他

視圖比較麻煩也沒有特别要說的這裡不詳細說了。

序列化和之前的相容性有關也比較麻煩。以後有機會再說吧。