目錄
- 1 簡介
- 2. HashMap
-
- 2.1 JDK1.7中的實作
-
- 2.1.1 設計思想
- 2.1.2 Entry的定義
- 2.1.3 Entry數組
-
- 2.1.3.1 threshold和loadFactor
- 2.1.3.2 擴容
-
- 2.1.3.2.1 擴容條件
- 2.1.3.2.2 擴容過程
- 2.1.3.2.3 擴容時的rehash
- 2.1.3.2.4 擴容存在的問題
- 2.1.4 計算數組下标
-
- 2.1.4.1 哈希函數
-
- 2.1.4.1.1 為什麼要對hashSeed為0時的String類型的對象做特殊處理?
- 2.1.4.1.2 為什麼要采用位運算?
- 2.1.4.1.3 把上述的位移運算移動的位數稍微改一下會怎麼樣?
- 2.1.4.2 根據哈希值确定下标
- 2.1.5 put()函數
- 2.1.6 get()函數
- 2.1.7 fail-fast機制
- 2.1.8 keySey()、entrySet()和values()
- 2.1.9 總結
- 2.2 JDK1.8中的實作
-
- 2.2.1 設計思想
- 2.2.2 Node節點的定義
- 2.2.3 哈希函數
- 2.2.4 擴容過程
-
- 2.2.4.1 為什麼原table數組中下标為i下的節點隻會轉移到新數組下标i和(i + n)處?
- 2.2.4.2 頭插法效率比尾插法要更高,JDK1.8為什麼要将頭插法變為尾插法?
- 2.2.5 節點樹化和去樹化的條件
-
- 2.2.5.1 為什麼連結清單長度為8時發生樹化?
- 2.2.5.2 為什麼連結清單長度為6時發生去樹化?
- 2.2.5.3 以什麼作為紅黑樹節點的比較條件?
- 2.2.6 總結
- 3. ConcurrentHashMap
-
- 3.1 JDK1.7中的實作
-
- 3.1.1 設計思想
- 3.1.2 HashEntry的定義
- 3.1.2 Segment的定義
-
- 3.1.2.1 scanAndLockForPut()函數和scanAndLock()函數
- 3.1.2.2 rehash()函數
-
- 3.1.2.2.1 為什麼需要克隆HashEntry節點?為什麼僅需要克隆部分HashEntry節點?
- 3.1.3 構造函數
-
- 3.1.3.1 為什麼在構造函數中僅初始化下标為0的Segment?
- 3.1.3.2 為什麼在構造函數中調用UNSAFE.putOrderedObject()方法給segments數組的0号下标初始化?
- 3.1.4 lazySet
- 3.1.5 聚合函數的實作
- 3.1.6 總結
- 3.2 JDK1.8中的實作
-
- 3.2.1 設計思想
- 3.2.2 節點定義
- 3.2.3 sizeCtl的含義
- 3.2.4 哈希函數
- 3.2.5 put()函數
-
- 3.2.5.1 initTable()函數
- 3.2.5.2 helpTransfer()函數
-
- 3.2.5.2.1 一個小錯誤
-
- 3.2.5.2.1.1 條件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
- 3.2.5.2.1.2 條件二:sc == rs + 1
- 3.2.5.2.1.3 條件三:sc == rs + MAX_RESIZERS
- 3.2.5.2.1.4 條件四:transferIndex <= 0
- 3.2.5.2.2 transfer()函數
- 3.2.5.3 addCount()函數
- 3.2.6 get()函數
-
- 3.2.6.1 TreeBin中的find()函數
- 3.2.6.2 ForwardingNode中的find()函數
- 3.2.6.3 ReservationNode中的find()函數
- 3.2.6.4 Node中的find()函數
- 3.2.7 疊代器的實作
-
- 3.2.7.1 設計思想
- 3.2.7.2 類結構
- 3.2.7.3 缺陷
- 3.2.8 總結
- 4. 寫在最後
1 簡介
HashMap是一種存儲鍵值對的資料結構,設計目的是能夠以O(1)的時間複雜度存儲鍵值對和根據鍵擷取值。
在HashMap出現之前,Java程式員隻能使用Hashtable。Hashtable是一個線程安全的HashMap,put()和get()方法均使用了synchronized關鍵字對整個Hashtable加鎖。在不需要線程安全保證的情況下,Hashtable的性能不如HashMap;但在需要線程安全保證的情況下,Hashtable的性能不如ConcurrentHashMap。是以,無論是否需要保證線程安全性,Hashtable都已經不再被建議使用了。
從設計的角度來說,Hashtable是JDK設計者的一個失敗的設計,強制用synchronized加全局鎖的形式保證了線程安全性。而現在Hashtable被HashMap和ConcurrentHashMap全面代替,HashMap完全不加鎖,不保證線程安全性,ConcurrentHashMap采用分段鎖的思想實作,保證了線程安全性。根據開發者的實際需要,可以采用HashMap和ConcurrentHashMap,而不是将線程安全性強制綁定在Hashtable中。
2. HashMap
在JDK1.7及其之前的版本中,HashMap的底層資料結構是數組+連結清單。在JDK1.8及其之後的版本中,HashMap的底層資料結構是數組+連結清單+紅黑樹。
2.1 JDK1.7中的實作
2.1.1 設計思想
- 一個健值對被封裝成一個Entry對象。
- 使用一個Entry數組來存儲所有健值對。
- 根據key來計算哈希值确定該鍵值對在Entry數組中的下标。
- 利用連結清單法來解決哈希沖突。
- 當健值對數目過多時,Entry數組會擴容。
2.1.2 Entry的定義
final K key;
V value;
Entry<K,V> next;
int hash;
發生哈希沖突時,在Entry數組的同一個下标處會放置多個Entry,這多個Entry是通過next屬性連結成一個連結清單結構的。hash存儲的是key對應的哈希值,因為哈希函數的計算過程一般比較耗時,是以将其存儲在Entry中避免多次重複計算。
2.1.3 Entry數組
static final Entry<?,?>[] EMPTY_TABLE = {};
transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;
用構造函數建立一個空的HashMap時,其table數組的大小是0,在調用put()方法往該HashMap中存儲值時,table數組才會被初始化。
2.1.3.1 threshold和loadFactor
當table數組是EMPTY_TABLE時,threshold用來計算第一次調用put()方法時table數組需要擴容的大小。
在其他情況下,threshold是table數組的擴容門檻值,其值等于capacity乘以loadFactor,其中capacity指的是table數組的大小,loadFactor指的是HashMap的負載率。
threshold和loadFactor可以通過HashMap的構造函數顯示指定,如果不指定,則采用預設值,threshold的預設值是16,loadFactor的預設值是0.75。
loadFactor預設值選取為0.75是一種時間複雜度和空間複雜度的平衡。過高的loadFactor會導緻table數組中某個下标下的連結清單長度過長,在get()操作的時候時間複雜度過高;過低的loadFactor會導緻table數組頻繁擴容,浪費更多的空間。
2.1.3.2 擴容
2.1.3.2.1 擴容條件
- 插入新節點前的鍵值對數目size 大于等于擴容門檻值 threshold。
- 新節點存放的數組下标不為 null。
2.1.3.2.2 擴容過程
- 建立一個數組,大小是原table數組的2倍。
- 将原table數組中的Entry都轉移至新的數組中。
2.1.3.2.3 擴容時的rehash
由于哈希函數的計算比較耗時,在擴容過程中,一般不會重新計算Entry的hash屬性,但某些情況下也是可能重新計算Entry的hash屬性的。具體是否進行rehash由initHashSeedAsNeeded(int capacity)方法的傳回值決定,其中傳入的參數capacity是擴容時新數組的大小。
final boolean initHashSeedAsNeeded(int capacity) {
boolean currentAltHashing = hashSeed != 0;
boolean useAltHashing = sun.misc.VM.isBooted() &&
(capacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
boolean switching = currentAltHashing ^ useAltHashing;
if (switching) {
hashSeed = useAltHashing
? sun.misc.Hashing.randomHashSeed(this)
: 0;
}
return switching;
}
預設情況下,hashSeed為0,即currentAltHashing為false。
我們可以通過設定系統變量jdk.map.althashing.threshold來設定Holder.ALTERNATIVE_HASHING_THRESHOLD的值。
- 當我們不設定jdk.map.althashing.threshold或者設定jdk.map.althashing.threshold為-1時,Holder.ALTERNATIVE_HASHING_THRESHOLD的值為Integer.MAX_VALUE。
- 當我們設定jdk.map.althashing.threshold為其餘負數時,HashMap會抛出IllegalArgumentException異常。
- 當我們設定jdk.map.althashing.threshold為正數時,其值就是Holder.ALTERNATIVE_HASHING_THRESHOLD的值。
也就是說預設情況下,useAltHashing始終都是false。
是以,在預設情況下switching是false,也就是不開啟rehash過程。
如果我們設定jdk.map.althashing.threshold為一個不大的正數,當擴容時新數組的大小大于等于該值時,useAltHashing會變為true,switching也會變為true。此時就會改變hashSeed的值,進而進行rehash過程。
2.1.3.2.4 擴容存在的問題
因為JDK1.7版本的節點采用頭插法,是以在擴容時連結清單節點在原table中的順序和新數組中的順序是相反的。
也正是因為采用頭插法,是以在多線程同時擴容時可能出現循環連結清單,使得擴容過程陷入死循環。
循環連結清單如何出現?這裡列舉一種情況:
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next; // 語句1
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}
當線程1和線程2同時執行到語句1處,線程1繼續執行,而線程2在執行完語句1後失去了CPU時間片暫停執行。線上程1執行完整個擴容過程後,線程2獲得CPU時間片繼續執行,此時線程2的擴容過程就會出現循環連結清單陷入死循環。
但我們從線程安全的角度來看,這似乎并不是一個問題,HashMap本就設計為一個線程不安全的類,多線程通路時我們需要對整個HashMap加鎖,是以循環連結清單的問題也就不會出現了。
2.1.4 計算數組下标
2.1.4.1 哈希函數
final int hash(Object k) {
int h = hashSeed;
if (0 != h && k instanceof String) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}
簡單總結一下上述哈希函數的邏輯:
- 如果哈希種子(hashSeed)不為0,且傳入的是一個String類型的對象,調用一種特殊的哈希函數來計算哈希值。
- 如果不滿足上述條件,會對哈希種子進行一系列詭異的位運算,包括與傳入對象的hashCode異或,位移等等。
計算一個對象的哈希值的計算量着實不小,我們也不知道JDK的設計者為什麼要這樣計算,但是知道了這個函數是一個哈希函數,我們可以反推測下述幾個問題。
2.1.4.1.1 為什麼要對hashSeed為0時的String類型的對象做特殊處理?
推測是因為String類型的hashCode計算容易産生哈希沖突,故而調用sun.misc.Hashing.stringHash32()函數來做特殊處理。而hashSeed是一個開關,一般情況下hashSeed為0,這個開關是關上的。我們可以設定環境變量jdk.map.althashing.threshold來打開這個開關。具體如何設定該值,見2.1.3.2.3 擴容時的rehash。
2.1.4.1.2 為什麼要采用位運算?
推測是哈希函數要求極高的計算效率,位運算更接近計算機底層的資料表示方法,計算效率更高。
2.1.4.1.3 把上述的位移運算移動的位數稍微改一下會怎麼樣?
如何設計哈希沖突少、計算速度快的哈希函數本來就是一個玄學問題,将上述位移運算的位數稍微改一下影響不大。我推測JDK裡取的這幾個位移值一定是經過了大量的測試,在哈希沖突少和計算速度快之間取得了良好的平衡,是一種最佳實踐。
2.1.4.2 根據哈希值确定下标
假設我們的數組長度是n,那麼如何才能使哈希值均勻地分布在數組的每一個位置上,以盡可能地減少哈希沖突呢?取模運算!如何高效地進行取模運算呢?如果我們将數組的長度n規定為2的整數次幂,那麼将哈希值與(n - 1)進行位與運算就是對n進行取模運算!這就解釋了為什麼數組的長度必須是2的整數次幂,而且擴容的時候每次都将數組大小翻倍。
2.1.5 put()函數
在2.1.3 Entry數組中已經提到,HashMap剛初始化的時候table數組的長度為0,而是在第一次往HashMap中添加值的時候table數組才會有容量,這個過程在函數inflateTable()裡:
private void inflateTable(int toSize) {
int capacity = roundUpToPowerOf2(toSize);
threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
table = new Entry[capacity];
initHashSeedAsNeeded(capacity);
}
其中,roundUpToPowerOf2(toSize)的作用是尋找到大于等于toSize的最小值,該值恰好是2的整數次幂。而initHashSeedAsNeeded(capacity)方法已經在2.1.3.2.3 擴容時的rehash中分析,是初始化hashSeed用的。
和Hashtable和ConcurrentHashMap不同,HashMap是允許存放null為key和value值的,key為null的Entry一定存放在table數組下标為0的位置,是以在put()函數中會對key為null的情況做特殊處理,調用putForNullKey()方法來添加key為null的Entry:
private V putForNullKey(V value) {
for (Entry<K,V> e = table[0]; e != null; e = e.next) {
if (e.key == null) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
addEntry(0, null, value, 0);
return null;
}
在這之後,put()方法會根據2.4 計算數組下标所描述的那樣,計算key對應的數組下标,将其用頭插法插入對應的連結清單。在插入的過程中可能會發生2.1.3.2 擴容中描述的數組擴容操作。
2.1.6 get()函數
和put()函數相對應,get()函數也會對key為null的情況做特殊處理,這段邏輯在getForNullKey()方法裡:
private V getForNullKey() {
if (size == 0) {
return null;
}
for (Entry<K,V> e = table[0]; e != null; e = e.next) {
if (e.key == null)
return e.value;
}
return null;
}
之後,根據key計算出數組下标,然後周遊對應的連結清單尋找到正确的value值,比較key是否相同是,先比較hash值是否相同,再用==比較是否是同一個對象,再用equals()方法比較兩個對象是否相同。
2.1.7 fail-fast機制
當我們在用疊代器周遊HashMap時,如果對HashMap做了結構性修改,如添加一個Entry或者删除一個Entry,将會抛出ConcurrentModificationException異常。
HashMap使用一個成員變量modCount來實作這個機制:modCount初始化為0,每次添加一個Entry或者删除一個Entry都會使modCount加1。當建立疊代器的時候會記錄目前的modCount值到疊代器的成員變量expectedModCount中,在疊代過程中一旦發現modCount和expectedModCount不相等,就會抛出ConcurrentModificationException異常。
當然,在疊代的過程中我們可以調用疊代器的remove()方法來删除HashMap中的元素,因為在疊代器的remove()方法中重新讀取modCount值到成員變量expectedModCount中,其代碼如下:
public void remove() {
if (current == null)
throw new IllegalStateException();
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
Object k = current.key;
current = null;
HashMap.this.removeEntryForKey(k);
expectedModCount = modCount; // 重新給expectedModCount指派,防止抛出ConcurrentModificationException異常
}
2.1.8 keySey()、entrySet()和values()
HashMap的keySet()、entrySet()和values()方法傳回的不是HashMap中key的一份拷貝、entry的一份拷貝和value的一份拷貝,而是和HashMap指向同一個記憶體位址的一份引用。是以如果對keySet()、entrySet()和values()的傳回值删除元素會影響原有的HashMap。(不支援添加元素操作)
2.1.9 總結
将table數組繪制成一張圖:

JDK1.7的HashMap源碼總共1189行,資料結構是數組+連結清單,為提高運算效率,應用了大量移位運算。同時,在多線程擴容時,因為采用了頭插法,可能産生循環連結清單,造成 transfer() 方法陷入死循環。當然,本文隻是枚舉了循環連結清單産生的一種情況,還有其餘情況可能會産生循環連結清單問題。
2.2 JDK1.8中的實作
2.2.1 設計思想
JDK1.8的HashMap設計思想和JDK1.7的基本相同,但是引入了紅黑樹這一新的資料結構來提高哈希沖突過于嚴重時的性能。另外,JDK1.8引入了接口default方法、lambda表達式和Spliterator等新的概念,在HashMap中也添加了相應的方法和新的内部類。
2.2.2 Node節點的定義
在JDK1.7中key-value被封裝成為Entry,而在JDK1.8中被封裝成為Node,這兩者除了名字不一樣,其餘都是相同的。Node節點依然擁有下述四個屬性:
final int hash;
final K key;
V value;
Node<K,V> next;
由于JDK1.8新引入了紅黑樹節點,是以還多了一種Node的子類TreeNode,其中增加了5個屬性:
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
其中parent用來标記節點的父親節點,left和right分别用來标記節點的左孩子和右孩子,而prev則用來标記節點的前一個節點(由于TreeNode繼承自Node,是以也有next屬性,順着next屬性周遊可以得到一條連結清單,比如A.next = C,那麼有C.prev = A,這就是prev屬性的作用),red用來标記節點顔色是否是紅色。
關于許多紅黑樹的操作都封裝在TreeNode中,由于紅黑樹的各種操作過于複雜且過于死闆,在這裡就不再分析。我們隻需要知道紅黑樹是一種二叉搜尋樹,在其中尋找值和添加值的時間複雜度都是O(logn),至于具體實作,不是HashMap的重點。
2.2.3 哈希函數
我們可以回到2.4.1 哈希函數回顧一下JDK1.7的哈希函數,可以看到特别複雜,而JDK1.8的哈希函數得到了簡化:
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
這裡用h異或運算(h >>> 16),主要是為了讓h的高位參與哈希函數的運算。在2.1.4.2 根據哈希值确定下标中我們知道最後是利用哈希函數得到的值和table數組長度減1做位與運算得到數組下标的,也就是說隻有哈希值的低位在起作用,以table數組長度為16舉例,此時table數組減1的二進制表示法為00000000 00000000 00000000 00001111,此時隻有哈希值的低4位會對數組下标産生影響。為了讓高位也能起作用,将h右移16位和h做異或運算得到的結果作為哈希值。
之是以JDK1.8的哈希函數能比JDK1.7的簡化許多,是因為JDK1.8引入了紅黑樹這一資料結構,對于哈希沖突過于嚴重的情況有了很好的處理,是以可以适當地簡化哈希函數來提高性能,畢竟JDK1.7中計算哈希值是一個比較耗性能的操作。
2.2.4 擴容過程
JDK1.8的HashMap的整個擴容過程和JDK1.7是相同的:
- 建立一個大小為原table數組2倍的新數組。
- 将原table數組中每個下标下的節點都轉移至 新數組中。
但在具體實作細節上,JDK1.8相比JDK1.7有略微不同:
- JDK1.7采用的是連結清單的頭插法,新節點插在連結清單頭部,在擴容時會導緻連結清單反轉;而JDK1.8采用的是連結清單的尾插法,新節點插在連結清單的尾部,在擴容前後連結清單不會反轉。
- JDK1.7是從原table數組中一個一個Entry地轉移到新數組,且可能發生rehash操作。對于連結清單節點,JDK1.8中是先将原table數組中某個下标i下的節點進行分類,一部分節點轉移至新數組的下标i位置,另一部分節點轉移至新數組的下标(i + n)位置,其中n為原table數組的大小,将這兩部分節點分别建構成連結清單後再轉移至新數組對應的下标位置i和(i + n)處。而對于紅黑樹節點,也是和連結清單節點相同的操作,但需要額外維護一個prev指針,再根據新生成的兩個連結清單中的節點個數是否小于等于UNTREEIFY_THRESHOLD來判斷是否需要将紅黑樹退化為普通的連結清單節點Node。另外,JDK1.8在擴容時不會發生rehash。
2.2.4.1 為什麼原table數組中下标為i下的節點隻會轉移到新數組下标i和(i + n)處?
因為擴容操作每次将數組大小變為原數組大小的2倍,進而始終保證了數組大小為2的整數次幂。
假設原數組大小為16,那麼計算數組下标時,是和00000000 00000000 00000000 00001111做位與運算。
擴容後,計算數組下标時,是和00000000 00000000 00000000 00011111做位與運算,擴容前後的下标值隻有最低的第4位會有差别,即要麼依然是下标i,要麼是下标(i + 16)。
2.2.4.2 頭插法效率比尾插法要更高,JDK1.8為什麼要将頭插法變為尾插法?
在2.1.3.2.4 擴容存在的問題中我們已經分析過,在多線程環境下擴容,JDK1.7可能會出現循環連結清單進而陷入死循環。而JDK1.8采用尾插法這一機制後就避免了這個問題。
另一方面,JDK1.8引入了紅黑樹的概念,當某個連結清單中節點數過多時,會轉化成紅黑樹節點,而紅黑樹節點的插入效率是O(logn),這就保證了即使是尾插法,效率也不會比頭插法差太多。
2.2.5 節點樹化和去樹化的條件
在插入新節點前,某個數組下标的連結清單長度大于等于TREEIFY_THRESHOLD(預設值為8)時,将新節點插入到連結清單尾部,并對整個連結清單進行樹化操作,是以真正發生樹化操作時,連結清單的長度為9。樹化操作定義在函數treeifyBin()中:
final void treeifyBin(Node<K,V>[] tab, int hash) {
int n, index; Node<K,V> e;
if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
resize();
else if ((e = tab[index = (n - 1) & hash]) != null) {
TreeNode<K,V> hd = null, tl = null;
do {
TreeNode<K,V> p = replacementTreeNode(e, null);
if (tl == null)
hd = p;
else {
p.prev = tl;
tl.next = p;
}
tl = p;
} while ((e = e.next) != null);
if ((tab[index] = hd) != null)
hd.treeify(tab);
}
}
上述代碼的邏輯概括如下:
- 如果tab數組還沒有被初始化,或者tab數組的長度小于MIN_TREEIFY_CAPACITY(預設值是64),将進行數組的擴容操作而不是将連結清單節點轉化為紅黑樹節點。
- 确認了tab數組長度大于等于MIN_TREEIFY_CAPACITY後,先将原連結清單節點全部轉化為TreeNode的雙向連結清單,再将雙向連結清單轉化為紅黑樹。
去樹化在2.1.3.2 擴容過程中已有提及。
2.2.5.1 為什麼連結清單長度為8時發生樹化?
在key的hashCode()實作良好的基礎上,預設負載率loadFactor為0.75時,某個數組下标下Node數目服從下述泊松分布:
0: 0.60653066
1: 0.30326533
2: 0.07581633
3: 0.01263606
4: 0.00157952
5: 0.00015795
6: 0.00001316
7: 0.00000094
8: 0.00000006
可以看到,某個數組下标下Node數目為8的機率僅有0.00000006,極低,是以樹化的操作并不會經常發生,因為将Node轉化為TreeNode是一個耗費時間和空間的過程。
但是如果某個數組下标下Node數目到達了8,繼續往該下标下新增節點時,就會發生樹化操作。因為檢索連結清單的時間複雜度是O(n),而檢索紅黑樹的時間複雜度是O(logn)。
2.2.5.2 為什麼連結清單長度為6時發生去樹化?
去樹化界限為6,比樹化界限8要小,在樹化和去樹化之間留有一定的門檻值主要是為了避免頻繁地發生樹化和去樹化操作,因為這兩個操作比較耗時。
2.2.5.3 以什麼作為紅黑樹節點的比較條件?
- 以Node對應的hash值作為比較條件。
- 如果Node對應的hash值相等,且key實作了Comparable接口,則調用其compareTo()方法作為比較條件。
- 如果Node對應的hash值相等,且key沒有實作Comparable接口,或者compareTo()的比較結果是相等的,則用System.identityHashCode()作為比較條件,每個key對象的System.identityHashCode()是不可能相等的。
2.2.6 總結
将table數組繪制成一張圖:
JDK1.8的HashMap總共2398行,資料結構是數組+連結清單+紅黑樹,位運算出神入化。
3. ConcurrentHashMap
ConcurrentHashMap是保證線程安全的HashMap,但不是絕對線程安全,僅僅是相對的線程安全,即對ConcurrentHashMap單次的操作是線程安全的,我們在調用的時候不需要進行額外的保障措施,但是對于一些特定順序的連續調用,就可能需要在調用端使用額外的同步手段來保證調用的正确性。
為了說明ConcurrentHashMap不是絕對線程安全的,我們來看下面這個例子:
public class HashMapConcurrencyTest1 {
public static void main(String[] args) throws InterruptedException {
testConcurrency(new HashMap<>()); // 11209
testConcurrency(new ConcurrentHashMap<>()); // 11010
}
private static void testConcurrency(Map<Integer, Integer> map) throws InterruptedException {
map.put(1, 0);
Runnable runnable = () -> {
for (int i = 0; i < 10000; i++) {
map.put(1, map.get(1) + 1); // 語句1
}
};
Thread a = new Thread(runnable, "threadA");
Thread b = new Thread(runnable, "threadB");
a.start();
b.start();
a.join();
b.join();
System.out.println(map.get(1));
}
}
從上述運作結果我們可以看到,無論是HashMap還是ConcurrentHashMap都不是絕對線程安全的(理論上正确的輸出結果應該是20000)。HashMapConcurrencyTest1出問題的根本原因在于,即使是ConcurrentHashMap也無法保證下述語句的線程安全性,因為這涉及到兩個獨立的操作put()和get()。如果要保證絕對線程安全需要用synchronized()加鎖,改進後的代碼如下:
public class HashMapConcurrencyTest2 {
public static void main(String[] args) throws InterruptedException {
testConcurrency(new HashMap<>()); // 20000
testConcurrency(new ConcurrentHashMap<>()); // 20000
}
private static void testConcurrency(Map<Integer, Integer> map) throws InterruptedException {
map.put(1, 0);
Runnable runnable = () -> {
synchronized (map) {
for (int i = 0; i < 10000; i++) {
map.put(1, map.get(1) + 1);
}
}
};
Thread a = new Thread(runnable, "threadA");
Thread b = new Thread(runnable, "threadB");
a.start();
b.start();
a.join();
b.join();
System.out.println(map.get(1));
}
}
上述例子展現不出HashMap和ConcurrentHashMap線上程安全性上的不同,隻能說明兩者都不是絕對線程安全的,但不能說明ConcurrentHashMap是相對線程安全的,HashMap不是相對線程安全的。我們再來看一個例子:
public class HashMapConcurrencyTest3 {
public static void main(String[] args) throws InterruptedException {
test(new HashMap<>()); // 973
test(new ConcurrentHashMap<>()); // 1000
}
private static void test(Map<String, String> map) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
Runnable runnable = () -> {
for (int i = 0; i < 100; i++) {
String s = Thread.currentThread().getName() + i;
map.put(s, s);
}
countDownLatch.countDown();
};
for (int i = 0; i < 10; i++) {
new Thread(runnable, "Thread" + i + ": ").start();
}
countDownLatch.await();
System.out.println(map.size());
}
}
在HashMapConcurrencyTest3中,HashMap表現出了線程不安全性,而ConcurrentHashMap則表現出了線程安全性,解釋如下:
發生哈希沖突時,2個線程在HashMap的table數組同一個下标位置插傳入連結表節點時,可能出現線程1執行node1.next = node2後,讓出時間片,線程2執行node1.next = node3,這樣node2節點就丢失了,就表現出了線程不安全性。而ConcurrentHashMap則保證了單次put()操作的線程安全性,是利用分段鎖的思想實作的,一次隻會有一個線程進入table數組的同一個下标位置進行插入節點的操作,也就實作了線程安全性。
JDK1.7和JDK1.8的ConcurrentHashMap都采用分段鎖的思想實作,但是JDK1.8的分段鎖的鎖粒度更小,且引入了紅黑樹這一資料結構優化哈希沖突時ConcurrentHashMap的性能。和Hashtable一樣,ConcurrentHashMap不允許存放key為null或者value為null的鍵值對。
3.1 JDK1.7中的實作
3.1.1 設計思想
核心思想是分段鎖,具體闡述如下:
- 将HashMap中的Entry數組用Segment數組來代替,Segment繼承自ReentrantLock,是加鎖的最小次元。
- 每一個Segment持有一個HashEntry數組,該HashEntry數組相當于HashMap中的table數組。每個Segment持有的HashEntry數組大小不一定一樣大。
- 将HashMap根據Segment分成若幹段,put()和get()函數根據key先定位到Segment數組的下标,再定位到該Segment中HashEntry數組中的下标。
- put()函數隻允許一個線程對同一個Segment進行操作,這就保證了put()操作的相對線程安全性。
3.1.2 HashEntry的定義
HashEntry是ConcurrentHashMap中的一個靜态内部類,有4個屬性:
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
value和next屬性被定義成volatile,是為了保證并發情況下的可見性,關于volatile的記憶體意義,不是本文的重點。
3.1.2 Segment的定義
Segment繼承自ReentrantLock,擁有以下幾個屬性:
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
我們完全可以把一個Segment當作一個HashMap來對待,其中table數組被定義成volatile,保證了并發情況下的可見性。count标記Segment持有的鍵值對數目。modCount、threshold和loadFactor和HashMap中的含義相同,modCount表示Segment發生結構性修改的次數,threshold表示Segment中的table數組的擴容門檻值,loadFactor表示Segment的負載率。table、count、modCount和threshold都被标記為transient,在預設的序列化實作中這4個參數會被忽略。
3.1.2.1 scanAndLockForPut()函數和scanAndLock()函數
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
private void scanAndLock(Object key, int hash) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
scanAndLockForPut()和scanAndLock()長得差不多,作用也差不多,都是用于擷取鎖時的線程自旋操作。這個自旋受到最大自旋次數MAX_SCAN_RETRIES的限制,如果自旋次數大于MAX_SCAN_RETRIES,會直接調用lock()方法阻塞目前線程直到擷取鎖。在自旋的過程中,每隔1次會檢查一下連結清單的首節點是否發生了變化,如果發生了變化說明有其他線程擷取到了鎖在修改了連結清單首節點,需要重新擷取連結清單首節點并重置自旋次數重新進入自旋流程。
差別在于scanAndLockForPut()用于put()函數的自旋操作,有傳回值,如果key在table數組中存在,則傳回null;如果不存在,傳回一個新的HashEntry節點。實際上這個傳回值傳回什麼對put()函數不會有任何的影響,具體可以見put()函數裡的邏輯:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { // 如果scanAndLockForPut()傳回的是null,會進入這個if語句,不需要建立HashEntry節點
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first); // 如果在scanAndLockForPut()傳回了一個HashEntry,則将該HashEntry作為連結清單首節點
else
node = new HashEntry<K,V>(hash, key, value, first); // 如果tryLock()成功,則建立一個HashEntry将其作為連結清單首節點
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 擴容操作
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
在put()方法中我們可以看到,如果tryLock()成功,不會進入scanAndLockForPut()方法自旋等待。一旦進入scanAndLockForPut()方法自旋等待,無論傳回結果是否為null,都不影響後續代碼邏輯。如果scanAndLockForPut()傳回結果不為null,說明key不存在,會将該HashEntry置為連結清單首節點;如果scanAndLockForPut()傳回結果為null,則不需要建立HashEntry節點,修改原來的節點對應的value值即可。
3.1.2.2 rehash()函數
在put()函數中我們看到,當新加入的節點後的節點數目大于threshold時,table數組會進行擴容操作,即rehash()函數:
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
總結一下上述代碼的邏輯:
- 新數組的大小為原table數組的2倍。
- 将原table數組中的節點全部遷移至新數組中,由于rehash()時已經擷取到鎖,不用考慮多線程情況下的線程安全問題。這個遷移操作和2.1.3.2 擴容中的遷移操作不同,這個遷移操作會建立部分HashEntry節點:對于連結清單末尾處某段連續的節點,這段節點全部遷移至新數組同一個下标處,這部分節點會一起遷移;而其餘節點則會被克隆,采用頭插法,一個一個地遷移至新數組,在預設loadFactor等于0.75的情況下,這部分需要克隆的節點約占1/6。
3.1.2.2.1 為什麼需要克隆HashEntry節點?為什麼僅需要克隆部分HashEntry節點?
ConcurrentHashMap的讀操作是不需要加鎖的,當我們在rehash()過程中,可能正有其他線程在通路原table數組,如果我們改變原table數組中HashEntry的next指針,可能會導緻這些讀線程通路錯亂,是以需要克隆HashEntry節點。而對于那些位于連結清單末尾處的某段連續的HashEntry連結清單,這段連結清單中的節點全部遷移至新數組同一個下标處,其next指針不需要發生任何改變,是以不需要克隆,這是為了提高性能。
3.1.3 構造函數
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 如果輸入參數非法,直接抛出IllegalArgumentException異常
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 如果并發級别concurrencyLevel超出了最大并發數限制,将并發級别設定為最大并發數MAX_SEGMENTS
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
// ssize表示Segment數組的大小,必須大于等于期望的并發級别concurrencyLevel, 且ssize必須是2的整數次幂
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
// segmentMask用于确定key位于Segment數組的哪個位置
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c表示一個Segment中至少需要多少個桶
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// cap是一個Segment中的哈希桶數目,必須大于等于c,且是2的整數次幂
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
經過上述構造函數,下述3個成員變量被指派:
// segmentMask和segmentShift用于根據給定的key的hash值定位到Segment的位置
final int segmentMask;
final int segmentShift;
// 經過構造函數,segments的第0号位置已經被初始化,其餘位置未被初始化
final Segment<K,V>[] segments;
3.1.3.1 為什麼在構造函數中僅初始化下标為0的Segment?
主要是為了性能上的考慮,其餘的Segment隻會在被使用到的時候才會調用ensureSegment()方法進行初始化,該方法如下:
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
總結上述方法的邏輯如下:
- k指的是我們需要初始化segments數組中第k個元素。
- 新的Segment是從segments[0]複制而來的,并使用CAS對segments[k]指派,確定線程安全性。
3.1.3.2 為什麼在構造函數中調用UNSAFE.putOrderedObject()方法給segments數組的0号下标初始化?
先來看一下UNSAFE.putObjectVolatile()和UNSAFE.putOrderedObject()的差別:
// 存儲變量的引用到對象的指定的偏移量處,使用volatile的存儲語義
public native void putObjectVolatile(Object o, long offset, Object x);
// 有序、延遲版本的putObjectVolatile方法,不保證值的改變被其他線程立即看到。
public native void putOrderedObject(Object o, long offset, Object x);
再來看看Segment數組segments的定義:
segments是一個final修飾的屬性,編譯器會在final域的寫之後,構造函數return之前,插入一個StoreStore屏障,這個屏障保證了segments數組在構造函數外的可見性,是以我們僅需調用UNSAFE.putOrderedObject(),而無需調用更耗費性能的UNSAFE.putObjectVolatile()方法。
3.1.4 lazySet
由UNSAFE.putOrderedObject()的記憶體語義可知,該操作不保證可見性,但是如果其後面緊跟着unlock()操作,就可以由unlock()操作來保證記憶體可見性,無需調用更耗費性能的UNSAFE.putOrderedObject()方法。這就是ConcurrentHashMap中所謂的lazySet的概念,即調用UNSAFE.putOrderedObject()方法填充的屬性隻有在unlock()操作之後才能保證可見性。
3.1.5 聚合函數的實作
在多線程環境下,有許多線程在通路各個不同的Segment,如何實作一些聚合函數呢?比如size()。核心思想是:首先嘗試着不加鎖統計size()的數目,如果統計不成功,再重新統計。如果經過若幹次嘗試後依然失敗,則對所有Segment加鎖再統計。
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
// 重試次數過多,直接對segments全部加鎖統計size
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 隻有本次modCount總數和上一次modCount總數相同時,說明沒有發生結構性修改,才會跳出循環
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
3.1.6 總結
将table數組繪制成一張圖:
JDK1.7的ConcurrentHashMap源碼總共1620行,采用數組+連結清單+分段鎖實作,并采用了大量移位操作提高運算速度。
3.2 JDK1.8中的實作
3.2.1 設計思想
- 和HashMap一樣,在JDK1.7ConcurrentHashMap的基礎上引入了紅黑樹這一資料結構來優化哈希沖突下的性能。
- 抛棄了Segment的概念,但依然采用分段鎖的思想,但是鎖粒度不再是Segment,而是每一個數組下标中的元素,即鎖粒度更細。
- 不采用ReentrantLock加鎖,直接采用原生的synchronized關鍵字加鎖。
- 采用一種全新的TableStack資料結構,用基于棧的思想解決多線程環境下的周遊問題。是以在擴容操作遷移節點時,我們不需要克隆節點,直接操作原節點即可。
- 為lambda表達式和Spliterator提供了方法和類的支援。
3.2.2 節點定義
- Node:最普通的連結清單節點。
- TreeNode:紅黑樹節點。
- ForwardingNode:擴容操作遷移節點時的轉發節點,hash值為MOVED=-1。
- ReservationNode:compute()和computeIfAbsent()方法使用的标記節點,hash值為RESERVED=-3。
- TreeBin:持有紅黑樹對象的節點,hash值為TREEBIN=-2。相比于JDK1.8的HashMap,TreeBin的引入是因為在紅黑樹的插入和删除節點過程中,紅黑樹的根節點會發生變化,不便于對根節點進行synchronized加鎖。
TreeNode、ForwardingNode、ReservationNode和TreeBin都是Node的子類。
3.2.3 sizeCtl的含義
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
sizeCtl是ConcurrentHashMap的一個屬性,用volatile關鍵詞修飾,用于控制table數組初始化和擴容操作。根據注釋,sizeCtl可分為以下幾種含義:
- 當sizeCrl為-1時,表明table數組正在被初始化。
- 當sizeCtl為其他負數時,表明table數組正在擴容,且其值為-(1 + 正在參與擴容的線程數量)。
- 當sizeCtl為0時,表明此時table數組為null,使用預設的初始化大小,即16。
- 當sizeCtl為正數,且table數組為null時,sizeCtl代表table的初始化大小。
- 當sizeCtl為正數,且table數組不為null時,sizeCtl代表下一次擴容的擴容門檻值。
實際上,上述第2點是有誤的,正确的描述應該是:
當sizeCtl為其他負數時,其高x位代表此次擴容操作數組的一個标記(與table數組的大小有關),低(32-x)位代表(1+正在參與擴容的線程數),其中x的值為RESIZE_STAMP_BITS,在ConcurrentHashMap的實作中是一個常量16。
3.2.4 哈希函數
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
相比于2.2.3 哈希函數中提及的JDK1.8中HashMap的哈希函數,多了和HASH_BITS(預設值是0x7fffffff)的位與運算。由3.2.2 節點定義可知,在JDK1.8版本的ConcurrentHashMap中有哈希值為負數的節點,HASH_BITS正是為了将符号位置為0,排除這些節點的哈希值。
3.2.5 put()函數
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 對于ConcurrentHashMap,key和value都不能為null
if (key == null || value == null) throw new NullPointerException();
// spread()方法就是HashMap中的hash()方法
int hash = spread(key.hashCode());
// binCount用于記錄ConcurrentHashMap中的鍵值對數目
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 如果table數組 還沒有被初始化,則先初始化table數組
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果遇到了ForwardingNode,則先幫助遷移節點,再在下一次進入循環時放置值
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// synchronized對table數組中第i位置處的元素f加鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 如果是連結清單節點,插傳入連結表末尾,binCount是計數用的,記錄插入新節點前連結清單中的節點數
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 如果插入新節點前,連結清單中的節點數大于等于 TREEIFY_THRESHOLD,則樹化
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
put()函數的基本思想可以總結如下:
- 根據傳入key的哈希值确定數組下标i。
- 如果是第一次調用put()函數,即table數組還沒有被初始化,則初始化table數組,這個過程在initTable()函數中。
- 如果table[i]為null,利用CAS操作将table[i]置為新Node節點。
- 如果table[i]為ForwardingNode,則參與轉移節點,這個過程在helpTransfer()函數中。
- 否則對table[i]加synchronized鎖,如果table[i]是TreeBin,則往該TreeBin持有的紅黑樹中添加新節點;如果table[i]隻是普通的Node節點,往連結清單末尾添加,如果插入新節點前,連結清單中的節點數大于等于 TREEIFY_THRESHOLD,則樹化。
- 調用addCount()方法将節點數加1。
3.2.5.1 initTable()函數
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 如果sizeCtl小于0,說明已經有線程在初始化table數組,目前線程yield(),讓出CPU
Thread.yield(); // lost initialization race; just spin
// CAS操作将sizeCtl變量置為-1,表示table數組正在被初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 如果sc大于0,則将sc作為table數組初始容量;否則,将table數組初始容量設定為16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// sc存儲下一次擴容門檻值,是0.75n,采用位運算實作
sc = n - (n >>> 2);
}
} finally {
// 将sizeCtl重置為下一次擴容門檻值
sizeCtl = sc;
}
break;
}
}
return tab;
}
initTable()函數考慮了多線程通路情況下的線程安全性,總結一下這個函數的邏輯:
隻要table數組沒有被初始化,則一直循環。在循環過程中,如果發現sizeCtl小于0,這說明已經有現車在初始化table數組,目前線程調用Thread.yield()方法主動讓出CPU時間片。如果sizeCtl大于等于0,嘗試使用CAS操作将sizeCtl置為-1,向其他線程表明目前線程正在初始化數組。
3.2.5.2 helpTransfer()函數
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length); // 當tab.length為16時,rs為32795
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// CAS操作設定正在擴容的線程數加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab); // 參與擴容
break;
}
}
return nextTab;
}
return table;
}
helpTransfer()函數的邏輯總結如下:
确認傳入的參數f節點是一個ForwardingNode節點,且其nextTable屬性不為null,表明目前table數組正處于轉移節點的過程中。調用resizeStamp()函數計算出一個戳标記rs,該戳标記是根據table數組的長度生成的,作為本次擴容操作的一個戳标記。接着會進入一個while循環,隻要轉移節點的過程沒有完成,且滿足某種特定條件(即下文分析的條件一~條件四),就會利用CAS操作設定sizeCtl的值加1,表明參與轉移節點的線程數增加1。
3.2.5.2.1 一個小錯誤
在helpTransfer()方法和addCount()方法中,出現過下述代碼:
// 條件一
(sc >>> RESIZE_STAMP_SHIFT) != rs ||
// 條件二
sc == rs + 1 ||
// 條件三
sc == rs + MAX_RESIZERS ||
// 條件四
transferIndex <= 0
條件一~條件四分别代表什麼含義呢?
從helpTransfer()方法和addCount()方法這兩個調用點中我們可以看到兩點:
- 判斷這4個條件時,sc肯定是一個負數。
- 當第一個線程參與擴容時,sc的值被置為(rs << RESIZE_STAMP_SHIFT) + 2。
3.2.5.2.1.1 條件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
其中,sc代表的是sizeCtl的值,RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS=16。
rs由下述函數計算所得:
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
rs代表的就是此次擴容操作table數組的一個标記(隻與table數組的大小有關),在RESIZE_STAMP_BITS為16的情況下,rs一定是個正數。
此時sizeCtl為負數,且其高16位與rs不相等,說明此次擴容已經結束(因為擴容戳标記rs已經發生改變)。
3.2.5.2.1.2 條件二:sc == rs + 1
由前述分析可知,能走到這個條件時,sc一定是個負數,rs一定是個正數,這個條件在正常的ConcurrentHashMap代碼邏輯裡是永遠不可能滿足的!
3.2.5.2.1.3 條件三:sc == rs + MAX_RESIZERS
由條件二中的分析可知,sc是負數,rs是正數,這種情況在正常的ConcurrentHashMap代碼裡也是永遠不可能出現的。
3.2.5.2.1.4 條件四:transferIndex <= 0
這個條件比較好了解,即原table數組裡的所有元素都已經有線程在負責遷移至新的table數組裡,本線程無需再繼續參與擴容。(後文會對transferIndex的含義做分析)
既然條件二和條件三永遠無法滿足,為什麼還要這麼寫呢?
真相就是這是JDK1.8的一個bug,正确的寫法應該是:
條件二:sc == (rs << RESIZE_STAMP_SHIFT) +1,此時代表沒有任何線程在進行擴容操作,目前線程無需再繼續參與擴容。
條件三:sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,此時進行擴容操作的線程數達到了MAX_RESIZERS - 1,目前線程無法再繼續參與擴容。
該bug在JDK12中被修複,證據連結:
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
3.2.5.2.2 transfer()函數
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 計算每個線程參與移動的哈希桶數量stride
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果nextTab還沒有被初始化,先初始化nextTab
if (nextTab == null) { // initiating
try {
// 每次擴容數組容量乘以2
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 如果發生OOM 異常,說明無法再繼續擴容,設定擴容門檻值sizeCtl為Integer.MAX_VALUE,防止下次再進到擴容邏輯裡
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// transferIndex 是一個volatile類型的變量,用以辨別目前線程從哪個桶開始遷移
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 遷移原數組中的元素時,從數組末往數組首周遊,advance用以辨別目前線程是否可以去當
// 前數組左側尋找待遷移的桶,finishing代表遷移是否已經結束
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// 當i仍然在周遊目前線程需要遷移的桶,或者目前遷移已經結束時,無需去目前數組的左側尋找待遷移的桶
if (--i >= bound || finishing)
advance = false;
// 如果待遷移的第一個桶已經小于等于0,說明所有的桶都有線程在負責遷移,無需去目前數組的左側繼續尋找待遷移的桶
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 用CAS操作設定目前線程負責遷移的桶号
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 遷移已經結束
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 用CAS操作設定sizeCtl減1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 當n等于16時,(resizeStamp(16)) << 16 + 2 = -2145714174
// 表示成二進制是:10000000 00011011 00000000 00000010,其高
// 16位的值和resizeStamp(16)恰好相等;其低位為2,表示現在隻
// 有1個線程在幫助擴容。如果不是隻有1個線程在擴容,沒必要去檢查擴容
// 是否完成,直接return即可。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// CAS 操作設定位置i的節點為fwd,即該位置已遷移完畢。
// 如果設定成功,則advance為true,表明可以去尋找下一個待遷移的位置。
// 如果設定失敗,則advance為false,表明不能去尋找下一個待遷移的位置
// 注意外層是一個while死循環,是以即使目前設定失敗,還會循環進入到這裡繼續CAS嘗試,直到成功為止。
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// synchronized加鎖後開始遷移節點
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
// fh 大于等于 0,說明是正常的連結清單節點
// 這個過程在JDK1.7的ConcurrentHashMap 也出現過,先把末尾部分将移至
// 新數組的節點串在一起轉移,再周遊前面的節點一個個轉移到兩個連結清單上,最後
// 再将兩個連結清單挂接在新數組中。
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 這裡為什麼要克隆Node節點?在3.1.2.2.1 為什麼需要克隆HashEntry節點?為什麼僅需要克隆
// 部分HashEntry節點?中有解釋,是相同的原因。
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 對于樹節點,先根據next指針周遊原table數組中的節點,并
// 利用其在新table數組中的位置将其分成兩個連結清單,再将新的兩個
// 連結清單挂接至新table數組中。
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
總結一下transfer()函數的邏輯:
首先明确transferIndex的概念,其定義如下:
可以看到,transferIndex是ConcurrentHashMap中的一個volatile類型的成員變量,確定了多個線程通路時的記憶體可見性。每一個線程參與轉移節點的過程,負責轉移的哈希桶數目都是确定的,假如每個線程負責轉移的哈希桶數目是16個,那麼目前線程負責轉移的哈希桶下标就是[transferIndex - 16, transferIndex - 1],并更新transferIndex的值為transferIndex - 16。
在轉移每個哈希桶時,都會給這個哈希桶首節點加synchronized鎖(對于紅黑樹,是給TreeBin加鎖),轉移節點的過程不複雜,對于連結清單節點的轉移,和JDK1.7版本的ConcurrentHashMap是一緻的;對于紅黑樹節點的轉移,先将其分成兩個連結清單,在分别判斷是否需要去樹化,這和JDK1.8版本的HashMap是一緻的。
3.2.5.3 addCount()函數
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 如果counterCells數組不為null,或者說CAS嘗試修改 baseCount 失敗
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 該方法和Striped64的longAccumulate()方法一模一樣
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
// check <= 1,說明不是增加節點的操作,無需擴容,是以直接 return。
return;
// sumCount()将baseCount和counterCells中存儲的值都加在一起
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果元素數大于等于sizeCtl,且table數組不為null,且table數組容量小于MAXIMUM_CAPACITY,則需要進行擴容操作
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
addCount()方法是記錄ConcurrentHashMap中的鍵值對數目的,将鍵值對數目存放在baseCount和counterCells中,這兩個屬性的定義如下:
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
如果CAS設定baseCount失敗,則随機擷取一個counterCells中某下标位置的synchronized鎖,将該下标位置的計數加1。這其實是分段鎖思想的提現,在sumCount()中我們會将baseCount和所有counterCells中的計數值都加在一起來統計ConcurrentHashMap中的鍵值對數目:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
值得注意的是,size()函數也是直接調用sumCount()函數實作的:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
而sumCount()函數沒有加鎖,是以在多線程環境下通過size()函數得到的鍵值對數目是不準确的。
addCount()函數不僅在putVal()函數中被調用,在remove()函數等其他位置也會被調用,如果是在putVal()中被調用,其check為binCount,是大于0的一個值,會進入擴容邏輯的判斷中。
3.2.6 get()函數
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// spread()函數對應HashMap中的hash()函數
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// 有3種節點的hash值是小于0的,TreeBin、ForwardingNode和ReservationNode,這三個類均繼承自Node,重寫了find()方法
return (p = e.find(h, key)) != null ? p.val : null;
//走到這,說明是連結清單,周遊連結清單尋找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get()函數是不需要加鎖的,對于哈希值為負數的節點調用其特有的find()方法去查找;對于哈希值為正數的連結清單節點,周遊連結清單查找。
3.2.6.1 TreeBin中的find()函數
TreeBin是自帶讀寫鎖的節點,來看看其中的2個屬性:
volatile Thread waiter;
volatile int lockState;
讀線程是不需要加鎖的,寫線程是需要擷取獨占鎖的,waiter表示目前正在等待擷取獨占鎖的寫線程,lockState表示目前的鎖狀态。
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
// 如果目前節點的狀态中有WAITER和WRITER狀态,說明樹在重構中。
// 則根據next指針,以周遊連結清單的方式來周遊樹,來尋找節點。(這樣
// 做比在紅黑樹中尋找節點要低效些,但不至于阻塞)
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
// 如果目前節點的狀态隻有READER,則CAS擷取到鎖
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
// 用TreeNode的find()方法來尋找節點
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
// 重置變量lockState,如果此時既有WAITER狀态也有READER狀态,
// 但是沒有WRITER 狀态,則喚醒等待線程waiter。
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
總結上述find()函數的邏輯:
如果有線程擷取了寫鎖或者正在等待擷取寫鎖,讀線程不會參與競争鎖狀态,會沿着節點的next指針周遊尋找節點。沿着連結清單的next指針尋找節點雖然比較慢,但至少不會阻塞。
否則,利用CAS操作将讀線程數量加1,如果CAS成功則調用紅黑樹的find()函數查找節點,這是O(logn)的時間複雜度。
3.2.6.2 ForwardingNode中的find()函數
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
// 在ForwardingNode節點指向的下一個數組nextTable中尋找節點。死循環是
// 為了避免擴容操作導緻的目前ForwardingNode指向的數組不是最終數組。
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
// 遞歸調用,此時的e可能是TreeBin、Node、TreeNode、ReservationNode中的任何一個。
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
總結一下上述函數的邏輯:
ForwardingNode節點是一個轉發節點,是以隻負責将find()操作轉移至nextTable屬性中去查找。
3.2.6.3 ReservationNode中的find()函數
Node<K,V> find(int h, Object k) {
return null;
}
ReservationNode()僅在computeIfAbsent()等函數中作為标記節點使用,我們不會調用其find()函數。
3.2.6.4 Node中的find()函數
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
3.2.7 疊代器的實作
3.2.7.1 設計思想
在疊代器周遊的過程中,可能有線程正在對目前ConcurrentHashMap進行擴容操作,是以會出現部分節點在原table數組,部分節點在新table數組中的情況。ConcurrentHashMap基于一種TrackStack的棧節點構成一個棧。當出現數組擴容時轉移節點時,如果遇到了ForwardingNode節點,會将目前數組的周遊狀态(包括目前數組周遊到的索引index、目前數組的大小baseSize、目前數組的周遊邊界baseLimit)壓入棧中,并在新table數組的索引index和index + baseSize處分别周遊這兩個桶中的節點。在周遊完新數組中的這兩個桶之後,再從棧中取出原table數組的狀态繼續周遊第index + 1位置。
3.2.7.2 類結構
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-u7flBsq4-1612106552359)(/Users/qyh/Library/Application Support/typora-user-images/image-20210131154445226.png)]
由上圖可知,所有的疊代器都基于Traverser實作,來看一下Traverser中的屬性:
Node<K,V>[] tab; // current table; updated if resized
Node<K,V> next; // the next entry to use
// stack代表着我們需要使用的棧,spare中存儲的是一些廢棄不用的TrackStack對象,需要使用的時候可以從spare中擷取,而無需建立
TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
int index; // index of bin to use next
int baseIndex; // current index of initial table
int baseLimit; // index bound for initial table
final int baseSize; // initial table size
TrackStack代表一個棧幀,其定義如下:
static final class TableStack<K,V> {
int length;
int index;
Node<K,V>[] tab;
TableStack<K,V> next;
}
3.2.7.3 缺陷
利用棧結構實作疊代器有一個缺陷在于:假設目前周遊到了table數組中index位置,另外的線程往[0, index - 1]處添加了元素,那麼這個新添加的元素将永遠沒有機會被周遊到。當然這個缺陷是被Doug Lea考慮到了的,是一種可以接受的缺陷。
3.2.8 總結
将table數組繪制成一張圖:
JDK1.8的ConcurrentHashMap代碼對于volatile、synchronized、CAS等和Java記憶體與鎖相關的知識運用得淋漓盡緻,但也存在着些許小bug。
4. 寫在最後
本文基本上總結了JDK1.7和JDK1.8中HashMap和ConcurrentHashMap的實作思想和細節,用以個人輸出總結。雖然配圖較少,但我覺得我還是把我知道的關鍵點都說清楚了,撒花。