- 參考文獻:
- 本篇文章主要來源于這篇部落格,本人對ConcurrentHashMap的源碼之前沒有學,雖然看了書但是不涉及這些,是以大部分都來源于觀看大神部落格的感悟,如果大家需要,可以看一看,也可以點選跳轉,大神之作!
- Java多線程進階(二四)—— J.U.C之collections架構:ConcurrentHashMap(2) 擴容
- jdk不同版本下HashMap和ConcurrentHashMap的對比
- 一些面試問題總結,三太子佳作,也有借鑒!!
- 計數,遷移分析透徹,思路清晰,也有借鑒,可以觀看
- 前言
- CHM類簡介
- CHM基本資料結構
-
- CHM類的結點定義
- CHM類的構造器
- CHM類的屬性
- CHM類的put()源碼解析💦💦
-
- CHM類的putVal()中的四種情況💦💦
- CHM的get()源碼分析:
- CHM的find()源碼分析:
- CHM的計數相關方法:
-
- CHM的addCount()源碼分析
- CHM的fullAddCount()源碼分析
- CHM類的擴容機制💦💦💦💦
-
- CHM類的擴容基本思路
- CHM的擴容時機
- CHM的擴容之資料遷移原理💦💦💦
- 感謝各位的暴擊三連哦~
前言
- 衆所周知,Map是非常經典的資料結構,常用于在記憶體中存放資料;
- 本篇主要想讨論 ConcurrentHashMap 這樣一個并發容器,不過在讨論前,我認為有必要了解它的由來,以及它所謂的前身"HashMap"。了解其發展,有助于我們更深刻的認識并掌握 ConcurrentHashMap;
- 如果對于HashMap不了解的,可以先去學習HashMap;
- HashMap:
- 底層是基于 數組 + 連結清單(紅黑樹) 組成的;
- 其實在不同的jdk版本它中的實作方式變化并不是很大,尤其是jdk1.8到現在的16基本沒有變化,而jdk1.7以前也是變化不大,是以我們主要關注點就是:jdk1.7➡➡ jdk1.8 的HashMap的某一方面的優化 ;
- 并發場景下:jdk1.7 中的HashMap存在的三大問題: ☪☪☪
- 資料重複;
- 死循環:
- 具體可以看一些别的部落格:這一篇是有對死循環的圖解,感覺不錯
- 資料丢失: 在多線程下put操作時,執行addEntry(hash, key, value, i),可能産生哈希沖突,造成資料的丢失;
- jdk1.8中的HashMap解決了兩大問題:
- 解決了jdk1.7中的HashMap在并發場景下造成 死循環 的問題;
- 解決了jdk1.7中HashMap 查詢效率低 的問題(通過紅黑樹);
-
當然,可以用HashTable,或者Collections.synchronizedMap 來解決并發問題,但是效率不高,是以我們今天的主角就出現了!!!
【注】:
1、為什麼Collections.synchronizedMap的效率不高呢?== (同步代碼塊)==
答:我們先看一下Collections.synchronizedMap是怎麼實作線程安全的,在SynchronizedMap内部維護了一個普通對象Map,還有 排斥鎖mutex,我們在調用這個方法的時候就需要傳入一個Map,mutex參數可以傳也可以不傳。建立出synchronizedMap之後,再操作map的時候,就會對這些方法上鎖(如下),也就是我們說的同步代碼塊,是以性能不高;
2、為什麼Hashtable的效率不高呢?
答:它對資料操作的時候都會通過synchronized上鎖,也就是我們在講synchronized說的 同步方法;
CHM類簡介
-
ConcurrentHashMap是jdk1.5版本JUC引入的一個同步集合工具類,這是一個線程安全的HashMap;
[注]:不同版本的ConcurrentHashMap,内部實作機制的大幅度變化主要來源于jdk1.7—>jdk1.8,是以在這裡我們先說一下jdk1.7和jdk1.8下的ConcurrentHashMap有何變化。(本文所有的讨論基于jdk1.8和jdk16;差别不大,個别有差異的已經标注出來)
- jdk1.7更新到1.8,ConcurrentHashMap的變化:這一對比的總結來自一位大神的總結,可以大緻看看!
-
鎖方面: 由分段鎖(Segment繼承自ReentrantLock)更新為 CAS+synchronized實作;
【注】:CAS性能很高,但是我知道synchronized性能可不咋地,為啥jdk1.8更新之後反而多了synchronized?
答:synchronized之前一直都是重量級的鎖,性能差,但是後來java官方(jdk1.5還是1.6)對它進行了優化:針對 synchronized 擷取鎖的方式,JVM 使用了鎖更新的優化方式,就是先使用偏向鎖優先同一線程,然後再次擷取鎖,如果失敗,就更新為 CAS 輕量級鎖,如果失敗就會短暫自旋,防止線程被系統挂起。最後如果以上都失敗就更新為重量級鎖。是以是一步步更新上去的,最初也是通過很多輕量級的方式鎖定的(偏向鎖–>CAS輕量級鎖–>自旋–>重量級鎖)。
- 資料結構層面: 将Segment變為了Node,減小了鎖粒度,使每個Node獨立,由原來預設的并發度16變成了每個Node都獨立,提高了并發度;
-
hash沖突: 1.7中發生hash沖突采用連結清單存儲,1.8中先使用連結清單存儲,後面滿足條件後會轉換為紅黑樹來優化查詢;
[注]: Hashmap中的連結清單大小超過八個時會自動轉化為紅黑樹,當删除小于六時重新變為連結清單,為啥呢?
答:根據泊松分布,在負載因子預設為0.75的時候,單個hash槽内元素個數為8的機率小于百萬分之一,是以将7作為一個分水嶺,等于7的時候不轉換,大于等于8的時候才進行轉換,小于等于6的時候就化為連結清單。
- 查詢複雜度: jdk1.7中連結清單查詢複雜度為O(N),jdk1.8中紅黑樹優化為O(logN));
-
- 我們首先看一下ConcurrentHashMap 的繼承關系(第一篇參考部落格的圖):
- 我們可以看出它繼承了兩個接口和一個抽象類:
- AbstractMap: 這是一個java.util包下的抽象類,提供Map接口的主要方法實作,以最大限度地減少實作Map這類資料結構時所需的工作量;
- 一般來講,如果需要重複造輪子 自己來實作一個Map,那一般就是繼承AbstractMap
- Serializable接口: 其實對某一個類或接口沒有特别的含義,就是對其進行序列化,以友善存儲或傳輸,有興趣可以看一下Serializable的相關介紹;
- ConcurrentMap接口: 是在JDK1.5時随着J.U.C包引入的,這個接口其實就是提供了一些 針對Map的原子操作:
- 我們看一下其到底實作了一些什麼方法:
- AbstractMap: 這是一個java.util包下的抽象類,提供Map接口的主要方法實作,以最大限度地減少實作Map這類資料結構時所需的工作量;
package java.util.concurrent;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
public interface ConcurrentMap<K,V> extends Map<K,V> {
//傳回指定key對應的值;如果Map不存在該key,則傳回defaultValue💦💦
default V getOrDefault(Object key, V defaultValue) { ...}
//周遊Map的所有Entry,并對其進行指定的aciton操作💦💦
default void forEach(BiConsumer<? super K, ? super V> action) {...}
//如果Map不存在指定的key,則插入<K,V>;否則,直接傳回該key對應的值💦💦
V putIfAbsent(K key, V value);
//删除與<key,value>完全比對的Entry,并傳回true;否則,傳回false💦💦
boolean remove(Object key, Object value);
//如果存在key,且值和oldValue一緻,則更新為newValue,并傳回true;否則,傳回false💦💦
boolean replace(K key, V oldValue, V newValue);
//如果存在key,則更新為value,傳回舊value;否則,傳回null💦💦
V replace(K key, V value);
//周遊Map的所有Entry,并對其進行指定的funtion操作💦💦
default void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {...}
//如果Map不存在指定的key,則通過mappingFunction計算出value并插入💦💦
default V computeIfAbsent(K key , Function<? super K, ? extends V> mappingFunction{...}
//如果Map存在指定的key,則通過mappingFunction計算出value并替換舊值💦💦
default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {...}
//根據指定的key,查找value;然後根據得到的value和remappingFunction重新計算出新值,并替換舊值💦💦
default V compute(K key , BiFunction<? super K, ? super V, ? extends V> remappingFunction) {...}
//如果key不存在,則插入value;否則,根據key對應的值和remappingFunction計算出新值,并替換舊值💦💦
default V merge(K key, V value , BiFunction<? super V, ? super V, ? extends V> remappingFunction) {...}
CHM基本資料結構
- 我們接下來看一下jdk1.8的ConcurrentHashMap的内部資料結構(依舊第一篇部落格的圖,問就是大神太強了):
- ConcurrentHashMap内部維護了一個Node類型的數組:table
-
table一共包含 4 種不同類型的桶,不同的桶用不同顔色表示,這是jdk1.8版本的特點;
[注]:TreeBin結點所連接配接的是一顆紅黑樹,紅黑樹的結點用TreeNode表示,是以ConcurrentHashMap中實際上一共有 五 種不同類型的Node結點;
這裡我們思考一個問題,為什麼沒有直接用TreeNode呢?
主要是因為紅黑樹的操作比較複雜,包括建構、左旋、右旋、删除,平衡等操作,用一個代理結TreeBin來包含這些複雜操作,其實是一種 “職責分離”的思想,另外TreeBin中也包含了一些加/解鎖操作。
-
- 數組的每一個位置table[i]代表一個桶,當插入鍵值對時,會根據鍵的hash值映射到不同的桶位置;
- 【注】:
- 在jdk1.8之前,ConcurrentHashMap采用了分段【Segment】鎖的設計思路,以減少熱點域的沖突,在jdk16中依舊有内部類Segment,目的是 為了序列化以及相容以前的jdk版本;
- jdk1.8不再延續,轉而直接對每個桶加鎖,并用“紅黑樹”連結沖突結點(有興趣的可以看紅黑樹的相關講解);
CHM類的結點定義
- 前面一部分我們講了ConcurrentHashMap的資料結構,知道其實它包括了 5 種結點。我們分别來看一下: 1️⃣ Node結點:
- Node是其它四種類型結點的父類;
- 預設連結到table[i],即桶上的結點就是Node結點;
- 當出現hash沖突時,Node結點會首先以連結清單的形式連結到table上:
-
當結點數量超過一定數目時,連結清單會轉化為紅黑樹;
【注】:因為連結清單查找的時間複雜度為O(n),而紅黑樹是一種平衡二叉樹,其平均時間複雜度為O(logn),這樣就提高了效率。
-
/*
* 普通的Entry結點, 以連結清單形式儲存時才會使用, 存儲實際的資料.
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; //通過key計算hash值,通過hash值找相應的桶
final K key;
volatile V val; //volatile修飾,保證可見性、有序性、但和原子性可沒什麼關系
volatile Node<K,V> next; //volatile修飾,連結清單指針💛
Node(int hash, K key, V val) { //構造器
this.hash = hash;
this.key = key;
this.val = val;
}
Node(int hash, K key, V val, Node<K,V> next) {
this(hash, key, val);
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
//這裡我放的jdk16版本,Helpers我并不太了解,但看其源碼就是為JUC包下的一個工具類,幫助toString,如果以後深入了解,再來糾正
public final String toString() { return Helpers.mapEntryToString(key, val); }
public final V setValue(V value) { throw new UnsupportedOperationException(); }
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
//連結清單查找,不同類型的結點有不同的find()方法🧡
Node<K,V> find(int h, Object k) {...}
2️⃣ TreeNode結點:
- TreeNode是紅黑樹的結點,TreeNode不會直接連結到桶上面,而是由TreeBin連結,TreeBin會指向紅黑樹的根結點;
// 紅黑樹結點,存儲實際資料
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links 紅黑樹連結
TreeNode<K,V> left;
TreeNode<K,V> right;
/* needed to unlink next upon deletion ,prev指針是為了友善删除,
删除連結清單的非頭結點時,需要知道它的前驅結點才能删除,是以直接提供一個prev指針
*/
TreeNode<K,V> prev;
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next , TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
//以目前結點(this)為根結點,開始周遊查找指定key.
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {...}
3️⃣TreeBin結點: (hash = -2)
- TreeBin相當于TreeNode的代理結點;TreeBin會直接連結到 table[i] 上,該結點提供了一系列紅黑樹相關的操作,以及加鎖、解鎖操作:
/*
TreeNode的代理結點(相當于封裝了TreeNode的容器,提供針對紅黑樹的轉換操作和鎖控制)
hash值固定為-2
*/
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root; // 紅黑樹結構的根結點
volatile TreeNode<K,V> first; // 連結清單結構的頭結點
volatile Thread waiter; // 最近的一個設定WAITER辨別位的線程
volatile int lockState; // 整體的鎖狀态辨別位,0為初始态
// values for lockState
static final int WRITER = 1; // 二進制001,紅黑樹的寫鎖狀态
static final int WAITER = 2; // 二進制010,紅黑樹的等待擷取寫鎖狀态(優先鎖,當有鎖等待,讀就不能增加了)
// 二進制100,紅黑樹的讀鎖狀态,讀可以并發,每多一個讀線程,lockState都加上一個READER值,
static final int READER = 4;
/*
在hashCode相等并且不是Comparable類型時,用此方法判斷大小.
*/
static int tieBreakOrder(Object a, Object b) {
int d;
if (a == null || b == null ||
(d = a.getClass().getName().
compareTo(b.getClass().getName())) == 0)
d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1);
return d;
}
// 将以b為頭結點的連結清單轉換為紅黑樹
TreeBin(TreeNode<K,V> b) {...}
// 通過lockState,對紅黑樹的根結點➕寫鎖.
private final void lockRoot() {
if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))
contendedLock(); // offload to separate method ,Possibly blocks awaiting root lock.
}
//釋放寫鎖
private final void unlockRoot() { lockState = 0; }
// 從根結點開始周遊查找,找到“相等”的結點就傳回它,沒找到就傳回null,當存在寫鎖時,以連結清單方式進行查找,後會面會介紹
final Node<K,V> find(int h, Object k) {... }
/**
* 查找指定key對應的結點,如果未找到,則直接插入.
* @return 直接插入成功傳回null, 替換傳回找到的結點的oldVal
*/
final TreeNode<K,V> putTreeVal(int h, K k, V v) {...}
/*
删除紅黑樹的結點:
1. 紅黑樹規模太小時,傳回true,然後進行 樹 -> 連結清單 的轉化,最後删除;
2. 紅黑樹規模足夠時,不用變換成連結清單,但删除結點時需要加寫鎖;
*/
final boolean removeTreeNode(TreeNode<K,V> p) {...}
// 以下是紅黑樹的經典操作方法,改編自《算法導論》💦💦💦💦
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root , TreeNode<K,V> p) { ...}
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root , TreeNode<K,V> p) {...}
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root , TreeNode<K,V> x) {...}
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x) { ... }
static <K,V> boolean checkInvariants(TreeNode<K,V> t) {...} //遞歸檢查紅黑樹的正确性
private static final long LOCKSTATE= U.objectFieldOffset(TreeBin.class, "lockState");
}
4️⃣ForwardingNode結點: (hash = -1)
- ForwardingNode結點僅僅在 擴容 時才會使用
/**
* ForwardingNode是一種臨時結點,在擴容進行中才會出現,hash值固定為-1,且不存儲實際資料。
* 如果舊table數組的一個桶table[i]中全部的結點都遷移到了新table中,則在這個桶中放置一個ForwardingNode,
即table[i]=ForwardingNode。
* 讀操作碰到ForwardingNode時,将操作轉到擴容後的新table數組上去執行---》find()方法;寫操作碰見它時,則嘗試幫助擴容。
*/
static final class ForwardingNode<K, V> extends Node<K, V> {
final Node<K, V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null);
this.nextTable = tab;
}
// 在新的數組(nextTable)上進行查找
Node<K, V> find(int h, Object k) {...}
5️⃣ReservationNode結點 (hash=-3)
/**
* 保留結點.
* hash值固定為-3, 不儲存實際資料
* 隻在computeIfAbsent和compute這兩個函數式API中充當占位符加鎖使用
*/
static final class ReservationNode<K, V> extends Node<K, V> {
ReservationNode() {
super(RESERVED, null, null, null);
}
Node<K, V> find(int h, Object k) {
return null;
}
}
CHM類的構造器
- ConcurrentHashMap提供了五個構造器:
-
這五個構造器内部最多隻是計算了下table的初始容量大小,并沒有進行實際的建立table數組的工作:
【注】:這時候有一個問題:什麼時候建立table數組呢?
答:ConcurrentHashMap采用了一種“懶加載”的模式,隻有到首次插入鍵值對的時候,才會真正的去初始化table數組;
-
- 接下來我們分别看一下這五個構造器:
//💕空構造器💕:
public ConcurrentHashMap() { }
//💕💕指定table初始容量的構造器💕💕
/**
* 指定table初始容量的構造器.
* tableSizeFor會傳回大于入參(initialCapacity + (initialCapacity >>> 1) + 1)的最小2次幂值,也就是說初始值
* 必須是2的整數倍,并且不能過小
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
//💕💕💕根據已有的Map構造💕💕💕
/**
* 根據已有的Map構造ConcurrentHashMap.
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//💕💕💕💕指定table初始容量和負載因子的構造器💕💕💕💕
/**
* 指定table初始容量和負載因子的構造器.
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//💕💕💕💕💕指定table初始容量、負載因子、并發級别的構造器💕💕💕💕💕
/**
* 指定table初始容量、負載因子、并發級别的構造器.
* 注意:concurrencyLevel隻是為了相容JDK1.8以前的版本,并不是實際的并發級别,loadFactor也不是實際的負載因子
* 這兩個都失去了原有的意義,僅僅對初始容量有一定的控制作用
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
long size = (long) (1.0 + (long) initialCapacity / loadFactor);
int cap = (size >= (long) MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int) size);
this.sizeCtl = cap;
}
CHM類的屬性
- 常量 :
//最大容量.
private static final int MAXIMUM_CAPACITY = 1 << 30;
//預設初始容量
private static final int DEFAULT_CAPACITY = 16;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//負載因子,為了相容JDK1.8以前的版本而保留,JDK1.8中的ConcurrentHashMap的負載因子恒定為0.75
private static final float LOAD_FACTOR = 0.75f;
//連結清單--->樹的門檻值:即連結結點數大于8時, 連結清單轉換為樹.
static final int TREEIFY_THRESHOLD = 8;
//樹--->連結清單的門檻值:即樹結點樹小于6時,樹轉換為連結清單.
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 在連結清單--->樹之前,還會有一次判斷:
* 即隻有鍵值對數量大于MIN_TREEIFY_CAPACITY,才會發生轉換。
* 這是為了避免在Table建立初期,多個鍵值對恰好被放入了同一個連結清單中而導緻不必要的轉化。
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 在樹--->連結清單之前,還會有一次判斷:
* 即隻有鍵值對數量小于MIN_TRANSFER_STRIDE,才會發生轉換.
*/
private static final int MIN_TRANSFER_STRIDE = 16;
//用于在擴容時生成唯一的随機數.
private static int RESIZE_STAMP_BITS = 16;
//可同時進行擴容操作的最大線程數.
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* hash值的意義
*/
static final int MOVED = -1; // 辨別ForwardingNode結點(在擴容時才會出現,不存儲實際資料)
static final int TREEBIN = -2; // 辨別紅黑樹的根結點
static final int RESERVED = -3; // 辨別ReservationNode結點()
static final int HASH_BITS = 0x7fffffff; // 辨別Node結點,自然數0,1,2...
//CPU核心數,擴容時使用
static final int NCPU = Runtime.getRuntime().availableProcessors();
- 字段 :
//Node數組,辨別整個Map,首次插入元素時建立,大小總是2的幂次.
transient volatile Node<K, V>[] table;
//擴容後的新Node數組,隻有在擴容時才非空.
private transient volatile Node<K, V>[] nextTable;
/**
* sizeCtl:控制table的初始化和擴容.很重要💦💦💦💦
* 0 : 初始預設值
* -1 : 有線程正在進行table的初始化
* >0 : table初始化時使用的容量,或初始化/擴容完成後的門檻值threshold
* -(1 + nThreads) : 記錄正在執行擴容任務的線程數
*/
private transient volatile int sizeCtl;
//擴容時需要用到的一個下标變量.
private transient volatile int transferIndex;
//計數基值,當沒有線程競争時,計數将直接加到該變量上。類似于LongAdder的base變量
private transient volatile long baseCount;
//計數數組,出現并發沖突時使用,這時候計數不能直接加到baseCount變量上,類似于LongAdder的cells數組
private transient volatile CounterCell[] counterCells;
// 自旋辨別位,用于CounterCell[]擴容時使用。類似于LongAdder的cellsBusy變量
private transient volatile int cellsBusy;
// 視圖相關字段
private transient KeySetView<K, V> keySet;
private transient ValuesView<K, V> values;
private transient EntrySetView<K, V> entrySet;
CHM類的put()源碼解析💦💦
- put方法是ConcurrentHashMap類的核心方法,也是我們必須要了解的,接下來讓我們看一下它是如何操作的:
/*
插入鍵值對,<K,V>均不能為null.
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
-
其内部調用了putVal(K key, V value, boolean onlyIfAbsent) (default)
[注]:
1、為什麼concurrentHashMap、Hashtable等不允許key/value為null呢?而HashMap可以呢?
答:(有大佬能更好的解釋,我再改正)
原因1:這是因為ConcurrentHashMap和Hashtable都是支援并發的,這樣會有一個問題,當你通過get(k)擷取對應的value時,如果擷取到的是null時,你無法判斷,它是put(k,v)的時候value就為null,還是這個key從來沒有做過映射。
而HashMap是非并發的,可以通過contains(key)來進行校驗。而支援并發的Map在調用m.contains(key)和m.get(key)時m可能已經不同了
2、什麼是快速失敗機制(fail-fast)呢?
答:在用疊代器周遊一個集合對象時,如果周遊過程中對集合對象的内容進行了修改(增加、删除、修改),則會抛出 Concurrent Modification Exception。
2.1快速失敗機制(fail-fast)的原理是啥?
答:疊代器在周遊時直接通路集合中的内容,在周遊過程中使用一個 modCount 變量。集合在被周遊期間如果内容發生變化,就會改變modCount的值。 每當疊代器使用hashNext()/next()周遊下一個元素之前,都會檢測modCount變量是否為expectedmodCount值,是的話就傳回周遊;否則抛出Concurrent Modification Exception異常,終止周遊。 (注意:不能依賴于這個異常是否抛出而進行并發操作的判斷,因為會有類似CAS操作的“ABA”問題,是以這個異常隻建議用于檢測并發修改的bug)。
3、什麼是安全失敗機制(fail—safe)呢?
答:采用安全失敗機制的集合容器,在周遊時不是直接在集合上通路的,而是先複制原有集合内容,在拷貝的集合上進行周遊。由于循環的時候是對原集合的“複制品”進行周遊,是以在你周遊的過程中對原集合的更改都不會被疊代器檢測到,是以不會報錯。
4、快速失敗機制(fail-fast)和安全失敗機制(fail—safe)的适用場所呢?
答:
java.util包下的集合類都是快速失敗(fail-fast)的,不能在多線程下發生并發修改(疊代過程中被修改)算是一種安全機制,但是預防不了 ABA問題;
java.util.concurrent包下的容器都是安全失敗(fail—safe),可以在多線程下并發使用,并發修改,不過要想适用于并發場景,需要額外的控制管理;
- 一不小心說多了,接着往下看:
/**
* 實際的插入操作
* @param onlyIfAbsent true:僅當key不存在時,才插入
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException(); //key-value任一為空抛出空指針異常
int hash = spread(key.hashCode()); // 再次計算hash值,防止并發情況的影響
// static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS;}
/*
* binCount 的用法:
* 1、使用連結清單儲存時,binCount記錄table[i]這個桶中所儲存的結點數;
* 2、使用紅黑樹儲存時,binCount==2,保證put後更改計數值時能夠進行擴容檢查,同時不觸發紅黑樹化操作
*/
int binCount = 0;
// 自旋插入結點,直到成功
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
// CASE1: 首次初始化table —— 懶加載💦💦💦
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// CASE2: table[i]對應的桶為null,則直接插入結點💦💦💦
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// [注]:上面table[i]的索引i的計算方式:[ key的hash值 & (table.length-1) ],為什麼要這樣計算呢?
// 配合這種索引計算方式可以實作key的均勻分布,減少hash沖突(前提table容量必須為2的幂次,這也是為什麼容量必須是2的幂次的原因)
//tabAt(tab, i)方法是調用Unsafe類的方法檢視值,保證每次擷取到的值都是最新的
if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null))) // cas操作:插入一個連結清單結點,成功傳回
break;
// CASE3: 發現ForwardingNode結點(通過MOVED辨別ForwardingNode結點存在),說明此時table正在擴容,則嘗試協助資料遷移💦💦💦
} else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); //這個方法後面會詳細介紹,很重要🧡🧡🧡
// CASE4: 出現hash沖突,也就是table[i]桶中已經有了結點💦💦💦
else {
V oldVal = null;
synchronized (f) { // 對table[i]結點上鎖,避免并發情況
if (tabAt(tab, i) == f) { // 再判斷一下節點 是不是在這個 table[i]上
//static final int HASH_BITS = 0x7fffffff; // 辨別Node結點,自然數
//CASE4.1: (fh >= 0)--->table[i]是連結清單結點💦
if (fh >= 0) {
binCount = 1; //記錄結點數,超過門檻值後,需要轉為紅黑樹,提高查找效率
for (Node<K, V> e = f; ; ++binCount) {
K ek;
// 找到“相等”的結點,判斷是否需要更新value值(onlyIfAbsent為false則可以插入)
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val; //記錄key沖突的原來對應的value值
if (!onlyIfAbsent) //onlyIfAbsent标志僅當不存在時才插入
e.val = value;
break;
}
//如果沒有找到,則直接插在尾部即可
Node<K, V> pred = e;
if ((e = e.next) == null) { // “尾插法”插入新結點
pred.next = new Node<K, V>(hash, key,value, null);
break;
}
}
// CASE4.2: 上面判斷不是連結清單結點,則繼續判斷table[i]是不是紅黑樹結點💦
} else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2; //紅黑樹對應的基值,我認為哈是包括了treebin和root,是以以2為基礎
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) { //插入
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
// CASE4.3: 不是連結清單節點也不是紅黑樹結點,則看是不是ReservationNode結點用于計算用的💦(jdk1.8版本沒有這個判斷)
}else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 超過門檻值,連結清單 -> 紅黑樹 轉換,💦💦添加節點後轉化💦💦💦
treeifyBin(tab, i);
if (oldVal != null) // 表明本次put操作隻是替換了舊值,不用更改計數值
return oldVal;
break;
}
}
}
addCount(1L, binCount); //表明本次操作增加了新值, 計數值加1
return null;
}
- 其實我們可以發現,putVal(K key, V value, boolean onlyIfAbsent) 的邏輯很清晰的:
- 首先根據key計算hash值;
- 然後通過hash值與table容量進行運作,計算得到key映射到table上的索引
- 最後加入結點,這裡要略微複雜一些;
- 【注】 :
- 計算索引的方式是
;i = (n - 1) & hash
-
,n - 1 == table.length - 1
的大小必須為2的幂次的原因就在這裡;table.length
- 這是因為當table.length為2的幂次時,(table.length-1) 的二進制形式的特點是除最高位外全部是1,配合這種索引計算方式可以實作key在table中的均勻分布 ,減少hash沖突——出現hash沖突時,結點就需要以連結清單或紅黑樹的形式連結到table[i],這樣無論是插入還是查找都需要額外的時間。
- 計算索引的方式是
- 我們通過一個流程圖進一步看這個方法(很不容易的!!!大哥們好好看看!!):
CHM類的putVal()中的四種情況💦💦
1️⃣、首次初始化table —— 懶加載
-
之前分析構造器的時候以及put()源碼的注釋中都說了,ConcurrentHashMap在構造的時候并不會始化table數組,首次初始化就在這裡通過 initTable() 完成:
【注】:如果多個線程同時調用initTable初始化Node數組怎麼辦?讓我們看并發程式設計大師Doug Lea的源碼:
答:在初始化數組時使用了樂觀鎖CAS操作來決定到底是哪個線程有資格進行初始化;
volatile變量(sizeCtl):它是一個标記位,用來告訴其他線程這個坑位有沒有人在,其線程間的可見性由volatile保證,CAS操作保證了設定sizeCtl标記位的原子性,保證了隻有一個線程能設定成功
/**
* 初始化table, 使用sizeCtl作為初始化容量.
*/
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) { //自旋直到初始化成功(每次循環都擷取最新的Node數組引用)
if ((sc = sizeCtl) < 0) // sizeCtl<0 :說明table已經正在初始化/擴容,是以讓出CPU時間片
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 将sizeCtl更新成-1,表示正在初始化中
//如果CAS操作成功了,代表本線程将負責初始化工作
try {
//再次檢查table數組是否為空
if ((tab = table) == null || tab.length == 0) {
//在初始化Map時,sizeCtl代表數組大小,預設16,是以此時n預設為16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; //Node數組nt
table = tab = nt; //将nt其指派給table,tab變量
sc = n - (n >>> 2); // n - (n >>> 2) = n - n/4 = 0.75n(前面說了loadFactor已在JDK1.8廢棄)
}
} finally {
//将計算後的sc(12)直接指派給sizeCtl,表示達到12長度就擴容,由于這裡隻會有一個線程在執行,直接指派即可,沒有線程安全問題
//隻需要保證可見性
sizeCtl = sc;
}
break;
}
}
return tab;
}
-
initTable(): 将sizeCtl字段的值(ConcurrentHashMap對象在構造時設定)作為table的大小(其實這裡就是預設值,因為創造前先用CAS操作将sizeCtl變為-1了);
* 【注】:這裡的n - (n >>> 2),其實就是0.75 * n,sizeCtl 的值最終需要變更為0.75 * n,相當于設定了門檻值threshold;
2️⃣、table[i]對應的桶為空: 直接CAS操作占用桶table[i];
3️⃣、發現ForwardingNode結點,說明此時table正在擴容,則嘗試協助進行資料遷移;
- ForwardingNode結點是ConcurrentHashMap中的五類結點之一,相當于一個占位結點,表示目前table正在進行擴容,目前線程可以嘗試協助資料遷移;
- 後面會對helpTransfer()調用的核心方法transfer()進行分析,我這裡簡單了解一下helpTransfer():
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //sizeCtl加一,表示多一個線程進來協助擴容
transfer(tab, nextTab); //擴容,後面詳細講解
break;
}
}
return nextTab;
}
return table;
}
- 此方法涉及大量複雜的位運算,我們簡單了解即可(主要是我看别的部落格沒怎麼分析,我就沒有過多深究,有點不思進取,哈哈哈~),此時sizeCtl變量用來标示HashMap正在擴容,當其準備擴容時,會将sizeCtl設定為一個負數,(例如數組長度為16時)其二進制表示為:
-
這個二進制表示什麼呢(我們将其分成:高16位、低16位)?
答:
高16位:其中最高位表示符号位,1表示負數,其餘的表示數組長度的一個位算法标示(有點像epoch的作用,表示目前遷移朝代為數組長度X)
低16位:表示有幾個線程正在做遷移,剛開始為2,線程遷移完成會使其進行-1操作,那為什麼剛開始的 時候為2呢?這也是我們在前面分析字段中的sizeCt l -(1 + nThreads) : 記錄正在執行擴容任務的線程數,也就是說如果為2的時候代表就一個線程正在做遷移。==
4️⃣、出現hash沖突,也就是table[i]桶中已經有了結點
- 當兩個不同key映射到同一個 table[i] 桶中時,就會出現這種情況:
- 當table[i]的結點類型為Node——連結清單結點時,就會将新結點以“尾插法”的形式插傳入連結表的尾部;
- 當table[i]的結點類型為TreeBin——紅黑樹代理結點時,就會将新結點通過紅黑樹的插入方式插入;
- 到此,我們的put方法就告一段落,至于擴容和遷移我們後面詳細講:
CHM的get()源碼分析:
-
ConcurrentHashMap最常用,最重要的方法:一個是put()一個是get(),既然put() 分析的差不多了,那我們接下來看看get():
【注】:假設桶table[3]的節點正在遷移,突然有一個線程進來調用get方法,正好key又散列到桶table[3],此時怎麼辦?
答:這時其hash為MOVED,即-1,是以此時判斷線程正在遷移,這時候用find方法去查找值;
/**
* 根據key查找對應的value值
* @return 查找不到則傳回null
* @throws NullPointerException 如果key為空,則抛出異常
*/
public V get(Object key) {
Node<K, V>[] tab;
Node<K, V> e, p;
int n, eh;
K ek;
int h = spread(key.hashCode()); // 重新計算key的hash值
//(e = tabAt(tab, (n - 1) & h)) != null找到對應的桶
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
//CASE1:看table[i]目前節點是不是要找的值
if ((eh = e.hash) == h) { // table[i]就是待查找的桶,直接找值
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
//CASE2:目前節點不是,則判斷是不是特殊節點,如果是find查找
} else if (eh < 0) // hash值 < 0 , 說明遇到特殊結點(非連結清單結點), 調用find方法查找
return (p = e.find(h, key)) != null ? p.val : null;
//CASE3:目前不是特殊節點,則連結清單周遊查找
while ((e = e.next) != null) { // 周遊table[i]中的結點,因為前面通過eh判斷過不是特殊節點了,則直接按連結清單查找
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 通過源碼對比,我們發現 get() 方法相比 put() 要簡單許多,我們接下來再描述一遍其流程:
- 首先:根據key的hash值計算映射到table的哪個桶,table[i];
- 其次:如果table[i]的key和待查找key相同,那直接傳回(這時候不用判斷是不是特殊節點);
- 最後:如果table[i]對應的結點是特殊結點(hash值小于0),則通過 find() 查找,如果不是特殊節點,則按連結清單查找;
- 用一個類比說:就是先找桶,找到桶,看桶上的結點是不是自己需要的的,如果是就不用打開桶了,直接傳回,如果不是我們就看桶裡面的結點是不是,而打開桶的方式有兩種,一種就是平常的按連結清單找,另一種就是借助工具 find() 找;
- 接下來,我們看一下這個工具,不同的結點有不同的實作方式呢(是不是感覺很矯情)~
CHM的find()源碼分析:
- 上文已經說過,不同的結點有不同的find()實作,這裡的find(),是去新數組newtable中查找的 接下來我們就分别看一下:
①: Node結點的查找 (hash>=0
- 當 槽table[i] 被普通Node結點占用,說明是連結清單連結的形式,直接從連結清單頭開始查找:
/*
* 連結清單查找.
* h:hash
* k:key
*/
Node<K, V> find(int h, Object k) {
Node<K, V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
② :TreeBin結點的查找 (hash=-2) 💦💦💦
- TreeBin的查找比較特殊,我們知道當桶table[i]被TreeBin結點占用時,說明連結的是一棵紅黑樹;
- 由于紅黑樹的插入、删除等操作會涉及整個結構的調整,是以通常存在讀寫并發操作的時候,是需要加鎖的;
/**
* 從根結點開始周遊查找,找到“相等”的結點就傳回它,沒找到就傳回null
* 當存在寫鎖或等待擷取寫鎖時,以連結清單方式進行查找
* 也就是說,隻有讀鎖時,才紅黑樹查找
*/
final Node<K, V> find(int h, Object k) {
if (k != null) {
for (Node<K, V> e = first; e != null; ) {
int s;
K ek;
/**
* 兩種特殊情況下以連結清單的方式進行查找:
* 1. WRITER---》有線程正持有寫鎖,這樣做能夠不阻塞讀線程
* 2. WAITER ---》有線程等待擷取寫鎖,不再繼續加讀鎖,相當于“寫優先”模式
*/
if (((s = lockState) & (WAITER | WRITER)) != 0) {
if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next; // 連結清單形式查找,找到立即傳回
}
//這時候就是按紅黑樹找了,讀線程數量加1(讀讀),讀狀态進行累加, READER==4
else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) {
TreeNode<K, V> r, p;
try {
p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); //紅黑樹的根節點非空才能找
} finally {
Thread w;
// 如果去除目前讀線程狀态,LOCKSTATE依舊表示為有寫線程w因為讀鎖而阻塞并有讀線程,則告訴寫線程,它可以嘗試擷取寫鎖了,就是條件2
if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER | WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
③ :ForwardingNode結點的查找 (hash=-1)
- ForwardingNode是一種臨時結點,在擴容進行中才會出現,是以查找也在擴容的table ----》nextTable 上進行 (連結清單周遊):
/**
* 在新的擴容table—-》nextTable上進行查找
*/
Node<K, V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer:
for (Node<K, V>[] tab = nextTable; ; ) {
Node<K, V> e;
int n;
if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (; ; ) {
int eh;
K ek;
if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K, V>) e).nextTable;
continue outer;
} else
return e.find(h, k); //連結清單周遊
}
if ((e = e.next) == null)
return null;
}
}
}
④ :ReservationNode結點的查找
- ReservationNode是保留結點,不儲存實際資料,是以直接傳回null:
CHM的計數相關方法:
- 接下來,我們看一下ConcurrentHashMap是如何計算鍵值對的數目的:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
- 通過size()源碼 ,我們發現該方法實際調用了sumCount():
final long sumCount() {
CounterCell[] as = counterCells;
CounterCell a;
long sum = baseCount; //基礎值
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
- 可以看到,最終鍵值對的數目其實是通過下面這個公式計算的: [注]:這裡可以看一下LongAdder,這也是第一篇參考文章的部落客寫的,這就是ConcurrentHashMap的計數思路:ConcurrentHashMap的計數其實延用了LongAdder分段計數的思路,隻不過ConcurrentHashMap并沒有在内部直接使用LongAdder,而是差不多copy了一份和LongAdder類似的代碼:
// 計數基值,當沒有線程競争時,計數将加到該變量上。類似于LongAdder的base變量
private transient volatile long baseCount;
// 計數數組,出現并發沖突時使用。類似于LongAdder的cells數組
private transient volatile CounterCell[] counterCells;
// 自旋辨別位,用于CounterCell[]擴容時使用。類似于LongAdder的cellsBusy變量
private transient volatile int cellsBusy;
-
CounterCell 這個槽對象,當出現并發沖突時,每個線程會根據自己的hash值找到對應的槽位置:
【注】:在設計中,使用了分而治之的思想,将每一個計數都分散到各個countCell對象裡面,使競争最小化,又使用了CAS操作,就算有競争,也可以對失敗了的線程進行其他的處理。樂觀鎖的實作方式與悲觀鎖不同之處就在于樂觀鎖可以對競争失敗了的線程進行其他政策的處理,而悲觀鎖隻能等待鎖釋放,是以這裡使用CAS操作對競争失敗的線程做了其他處理,很巧妙的運用了CAS樂觀鎖。
/**
* 計數槽.
* 類似于LongAdder中的Cell内部類
*/
static final class CounterCell {
volatile long value;
CounterCell(long x) {
value = x;
}
}
CHM的addCount()源碼分析
- 之前的putval() 的最後,當插入新的結點後,通過 addCount() 将計數值為加1,我們接下來看看 addCount() 的具體實作:
/**
* 更改計數值,并檢查長度是否達到門檻值
*/
private final void addCount(long x, int check) {
CounterCell[] as; //計數桶
long b, s;
// !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x):嘗試更新baseCount
//1、如果counterCells不為null,則代表已經初始化了,直接進入if語句塊
//2、若競争不嚴重,counterCells有可能還未初始化,為null,先嘗試CAS操作遞增baseCount值
if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
//進入此語句塊有兩種可能:
//1.counterCells被初始化完成了,不為null
//2.CAS操作遞增baseCount值失敗了,說明出現并發沖突,則将計數值累加到Cell槽
CounterCell a;
long v;
int m;
boolean uncontended = true; //标志是否存在競争
//1.先判斷計數桶是否初始化,如果as=null,說明沒有,進入語句塊
//2.判斷計數桶長度是否為空,若是進入語句塊
//3.這裡做了一個線程變量随機數,若桶的這個位置為空,進入語句塊(根據線程hash值計算槽索引)
//4.到這裡說明桶已經初始化了,且随機的這個位置不為空,嘗試CAS操作使桶加1,失敗進入語句塊
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended); // 槽更新也失敗, 則會執行fullAddCount
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) { // 檢測是否擴容
Node<K, V>[] tab, nt;
int n, sc;
while (s >= (long) (sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount(); //統計容器大小
}
}
}
- 總結:
- 首先,如果counterCells為null,說明之前一直沒有出現過沖突,直接将值累加到baseCount上即可;
-
否則,嘗試更新counterCells[i]中的值 ( a = as[ThreadLocalRandom.getProbe() & m]) == null ),更新成功就退出, 如果失敗說明槽中也出現了并發沖突,可能涉及槽數組,即counterCells的擴容,是以調用fullAddCount方法。
【注】:由此可見,統計容器大小其實是用了兩種思路:
1、CAS方式直接遞增:線上程競争不大的時候,直接使用CAS操作遞增baseCount值即可,這裡說的競争不大指的是CAS操作不會失敗的情況
2、分而治之桶計數:若出現了CAS操作失敗的情況,則證明此時有線程競争了,計數方式從CAS方式轉變為分而治之的桶計數方式
CHM的fullAddCount()源碼分析
- 上文說過,當出現了并發沖突,則不會再用CAS方式來計數了,直接使用桶方式,從上面的addCount方法可以看出來,此時的countCell是為空的(或者不為空但CAS更新失敗),最終一定會進入fullAddCount方法來進行初始化桶(這一篇部落格對計數這一塊更清晰一些):
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
...
//如果計數桶!=null,證明已經初始化,此時不走此語句塊
if ((as = counterCells) != null && (n = as.length) > 0) {
...
}
//進入此語句塊進行計數桶的初始化
//CAS設定cellsBusy=1,表示現在計數桶Busy中...
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//若有線程同時初始化計數桶,由于CAS操作隻有一個線程進入這裡
boolean init = false;
try { // Initialize table
//再次确認計數桶為空
if (counterCells == as) {
//初始化一個長度為2的計數桶👌👌
CounterCell[] rs = new CounterCell[2];
//h為一個随機數,與上1則代表結果為0、1中随機的一個
//也就是在0、1下标中随便選一個計數桶,x=1,放入1的值代表增加1個容量
rs[h & 1] = new CounterCell(x);
//将初始化好的計數桶指派給ConcurrentHashMap
counterCells = rs;
init = true;
}
} finally {
//最後将busy辨別設定為0,表示不busy了
cellsBusy = 0;
}
if (init)
break;
}
//若有線程同時來初始化計數桶,則沒有搶到busy資格的線程就先來CAS遞增baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
-
到這裡就完成了計數桶的初始化工作,之後的計數都将會使用計數桶方式來統計總數;💦💦💦
[注]:從上面的分析中我們知道,計數桶初始化的長度為2,在競争大的時候肯定是不夠用的,是以一定有計數桶的擴容操作,是以現在就有兩個問題了:
- 什麼條件下會進行計數桶的擴容?
- 擴容操作是怎麼樣的?
- 我們做一個假設,如果此時是用計數桶方式進行計數:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//此時顯然會在計數桶數組中随機選一個計數桶
//然後使用CAS操作将此計數桶中的value+1
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//若CAS操作失敗,證明有競争,進入fullAddCount方法
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
}
- 進入fullAddCount方法:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//計數桶初始化好了,一定是走這個if語句塊
if ((as = counterCells) != null && (n = as.length) > 0) {
//從計數桶數組随機選一個計數桶,若為null表示該桶位還沒線程遞增過
if ((a = as[(n - 1) & h]) == null) {
//檢視計數桶busy狀态是否被辨別
if (cellsBusy == 0) { // Try to attach new Cell
//若不busy,直接new一個計數桶
CounterCell r = new CounterCell(x); // Optimistic create
//CAS操作,标示計數桶busy中
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
//在鎖下再檢查一次計數桶為null
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//将剛剛建立的計數桶指派給對應位置
rs[j] = r;
created = true;
}
} finally {
//标示不busy了
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//走到這裡代表計數桶不為null,嘗試遞增計數桶
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
//若CAS操作失敗了,到了這裡,會先進入一次,然後再走一次剛剛的for循環
//若是第二次for循環,collide=true,則不會走進去
else if (!collide)
collide = true;
//計數桶擴容,一個線程若走了兩次for循環,也就是進行了多次CAS操作遞增計數桶失敗了
//則進行計數桶擴容,CAS标示計數桶busy中
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
//确認計數桶還是同一個
if (counterCells == as) {// Expand table unless stale
//将長度擴大到2倍
CounterCell[] rs = new CounterCell[n << 1];
//周遊舊計數桶,将引用直接搬過來
for (int i = 0; i < n; ++i)
rs[i] = as[i];
//指派
counterCells = rs;
}
} finally {
//取消busy狀态
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//初始化計數桶...
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
- 看到這裡我們可以解決那兩個問題了:
-
1、什麼條件下會進行計數桶的擴容?
答:在CAS操作遞增計數桶失敗了3次之後,會進行擴容計數桶操作,注意此時同時進行了兩次随機定位計數桶來進行CAS遞增的,是以此時可以保證大機率是因為計數桶不夠用了,才會進行計數桶擴容;
-
2、擴容操作是怎麼樣的?
答:計數桶長度增加到兩倍長度,資料直接周遊遷移過來,由于計數桶不像HashMap資料結構那麼複雜,有hash算法的影響,加上計數桶隻是存放一個long類型的計數值而已(也沒有CAS,也可能造成并發問題把),是以直接指派引用即可;
-
【注】:在這裡我們再次總結一下計數中的并發技巧:
- 1、利用CAS遞增baseCount值來感覺是否存線上程競争,若競争不大直接CAS遞增baseCount值即可,性能與直接baseCount++差别不大;
- 2、若存線上程競争,則初始化計數桶,若此時初始化計數桶的過程中也存在競争,多個線程同時初始化計數桶,則沒有搶到初始化資格的線程直接嘗試CAS遞增baseCount值的方式完成計數,最大化利用了線程的并行。此時使用計數桶計數,分而治之的方式來計數,此時兩個計數桶最大可提供兩個線程同時計數,同時使用CAS操作來感覺線程競争,若兩個桶情況下CAS操作還是頻繁失敗(失敗3次),則直接擴容計數桶,變為4個計數桶,支援最大同時4個線程并發計數,以此類推…同時使用位運算和随機數的方式"負載均衡"一樣的将線程計數請求接近均勻的落在各個計數桶中。
CHM類的擴容機制💦💦💦💦
CHM類的擴容基本思路
- JDK1.8中,ConcurrentHashMap最複雜的部分就是 擴容、資料遷移, 因為涉及多線程的合作和rehash();
- 我們先看一下一般的HashMap如何擴容的。
- HashMap的擴容一般包含兩個步驟:
- ①、table數組的擴容:
- table數組的擴容,一般就是建立一個2倍大小的槽數組,這個過程通過由一個單線程完成,且不允許出現并發。
- ②、資料遷移:
- 所謂資料遷移,就是把舊table中的各個槽中的結點重新配置設定到新table中;
-
比如,單線程情況下,可以周遊原來的table,然後put()到新table中;
【注】:這一過程通常涉及到槽中key的rehash(),因為 key映射到桶的位置與table的大小有關,新table的大小變了,key映射的位置一般也會變化,是以我們需要重新計算hash值;
- ①、table數組的擴容:
- ConcurrentHashMap在處理 rehash() 的時候,并不會重新計算每個key的hash值,而是利用了一種很巧妙的方法。
- ConcurrentHashMap内部的table數組的大小必須為2的幂次,這是為什麼呢? 💦💦
- ㈠ 、讓key均勻分布,減少沖突; 💦
- ㈡、當table數組的大小為2的幂次時,通過key.hash & table.length-1這種方式計算出的索引i,當table擴容後(2倍),新的索引要麼在原來的位置i,要麼是i+n。 💦
- 如果索引變為
,那麼其變化後的索引最高位一定是i+n
(因為1
的最高位一定是1)。table.length-1
- 如果索引變為
- 我們通過一個例子說明:
- 擴容前,table數組大小為16,key1和key2映射到同一個索引5;
- 擴容後,table數組的大小變成 2*16=32 ,key1的索引不變,key2的索引變成 5+16=21。
- ConcurrentHashMap内部的table數組的大小必須為2的幂次,這是為什麼呢? 💦💦
- ConcurrentHashMap 在擴容時,這樣處理能帶來什麼好處呢? 💦💦💦
- 這種處理方式非常利于擴容時多個線程同時進行的資料遷移操作,因為舊table的各個桶中的結點遷移不會互相影響,是以就可以用 “分治” 的方式,将整個table數組劃分為很多部分,每一部分包含一定區間的桶,每個資料遷移線程處理各自區間中的結點,對多線程同時進行資料遷移非常有利
- 接下來我們進一步分析ConcurrentHashMap 的擴容機制;
CHM的擴容時機
- 通過前面相關介紹,我們知道,當往table[i]中插入結點時,如果連結清單的結點數目超過一定門檻值(8),就會觸發連結清單 -> 紅黑樹的轉換,這樣提高了 查找效率:
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 連結清單 -> 紅黑樹 轉換
- 接下來我們分析這個 連結清單 -> 紅黑樹 的轉換操作,treeifyBin(tab, i):
/*
* 連結清單 -> 紅黑樹 轉換
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
if (tab != null) {
// CASE 1: table的容量 < MIN_TREEIFY_CAPACITY時,直接進行table擴容,不進行紅黑樹轉換,MIN_TREEIFY_CAPACITY預設為64
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
// CASE 2: table的容量 ≥ MIN_TREEIFY_CAPACITY時,進行相應桶的連結清單 -> 紅黑樹的轉換
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) { //同步,對相應的桶的對象加鎖
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//周遊連結清單,建立紅黑樹
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);//結點類型轉換
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 以TreeBin類型包裝,并連結到table[index]中
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
- 通過 treeifyBin(Node<K,V>[] tab, int index) 源碼可以看出,連結清單 -> 紅黑樹這一轉換并不是一定會進行的:
- 當桶的容量 < MIN_TREEIFY_CAPACITY(64),CurrentHashMap 會首先選擇擴容(調用 tryPresize() 把數組長度擴大到原來的兩倍),而非立即轉成紅黑樹;
- 當桶的容量 >= MIN_TREEIFY_CAPACITY(64),則選擇 連結清單 -> 紅黑樹;
- 接下來我們看一下 tryPresize() 如何執行擴容:
/*
* 嘗試對table數組進行擴容
* @param 待擴容的大小
*/
private final void tryPresize(int size) { //jdk16
// 視情況将size調整為2的整數次幂,與0.5 * MAXIMUM_CAPACITY來比較 , tableSizeFor求二次幂
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//CASE 1: table還未初始化,則先進行初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c; //取最大值
if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// CASE2: c <= sc說明已經被擴容過了;n >= MAXIMUM_CAPACITY說明table數組已達到最大容量
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// CASE3: 進行table擴容
else if (tab == table) {
int rs = resizeStamp(n);
// 這個CAS操作可以保證,僅有一個線程會執行擴容
if (U.compareAndSetInt(this , SIZECTL , sc , (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
- 前兩種情況不讨論了,對于CASE3,jdk16相比jdk1.8已經有了優化,去掉了一個if判斷,直接通過CAS和移位運算進行擴容操作,保證了隻有一個線程會執行擴容,jdk中if判斷是否是單線程,分别進行了transfer(),隻不過調用參數不一樣,我們通過一張圖比較: 【注】:ConcurrentHashMap運用CAS操作,将擴容操作的并發性能實作最大化,在擴容過程中,就算有線程調用get查詢方法,也可以安全的查詢資料,若有線程進行put操作,還會協助擴容,利用sizeCtl标記位和各種volatile變量進行CAS操作達到多線程之間的通信、協助,在遷移過程中隻鎖一個Node節點,即保證了線程安全,又提高了并發性能。
CHM的擴容之資料遷移原理💦💦💦
- 通過tryPresize() 我們發現調用了transfer方法,該方法可以被多個線程同時調用,是“資料遷移”的核心操作方法, 接下來我們看一看
/**
* 資料轉移和擴容.
* 每個調用tranfer的線程會對目前舊table中[transferIndex-stride, transferIndex-1]位置的結點進行遷移
*
* @param tab 舊table數組
* @param nextTab 新table數組
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride可了解成“步長”,即“資料遷移”時,每個線程要負責舊table中的多少個桶,根據幾核的CPU決定“步長”,最少16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE預設16
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 第二個參數,nextable為null說明第一次擴容
try {
@SuppressWarnings("unchecked")
// 建立新table數組,擴大一倍,32,n還為16
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // 處理記憶體溢出(OOME)的情況
sizeCtl = Integer.MAX_VALUE; //将表示容量的sizeCtl 設定為最大值,然後傳回
return;
}
nextTable = nextTab; //設定nextTable變量為擴容後的數組
transferIndex = n; // [transferIndex-stride, transferIndex-1]:表示目前線程要進行資料遷移的桶區間
}
int nextn = nextTab.length;
// ForwardingNode結點,當舊table的某個桶中的所有結點都遷移完後,用該結點占據這個桶
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 辨別一個桶的遷移工作是否完成,advance == true 表示可以進行下一個位置的遷移
boolean advance = true;
// 最後一個資料遷移的線程将該值置為true,并進行本輪擴容的收尾工作
boolean finishing = false; // to ensure sweep before committing nextTab
// i辨別桶索引, bound辨別邊界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 每一次自旋前的預處理,主要是為了定位本輪處理的桶區間
// 正常情況下,預處理完成後:i == transferIndex-1:右邊界;bound == transferIndex-stride:左邊界
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSetInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// CASE1:目前是處理最後一個tranfer任務的線程或出現擴容沖突
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 所有桶遷移均已完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 擴容線程數減1,表示目前線程已完成自己的transfer任務
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判斷目前線程是否是本輪擴容中的最後一個線程,如果不是,則直接退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
/**
* 最後一個資料遷移線程要重新檢查一次舊table中的所有桶,看是否都被正确遷移到新table了:
* ①正常情況下,重新檢查時,舊table的所有桶都應該是ForwardingNode;
* ②特殊情況下,比如擴容沖突(多個線程申請到了同一個transfer任務),此時目前線程領取的任務會廢棄,那麼最後檢查時,
* 還要處理因為廢棄而沒有被遷移的桶,把它們正确遷移到新table中
*/
i = n; // recheck before commit
}
}
// CASE2:舊桶本身為null,不用遷移,直接嘗試放一個ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// CASE3:該舊桶已經遷移完成,直接跳過
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// CASE4:該舊桶未遷移完成,進行資料遷移
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// CASE4.1:桶的hash>0,說明是連結清單遷移
if (fh >= 0) {
/**
* 下面的過程會将舊桶中的連結清單分成兩部分:ln鍊和hn鍊
* ln鍊會插入到新table的槽i中,hn鍊會插入到新table的槽i+n中
*/
int runBit = fh & n; // 由于n是2的幂次,是以runBit要麼是0,要麼高位是1
Node<K,V> lastRun = f; // lastRun指向最後一個相鄰runBit不同的結點
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 以lastRun所指向的結點為分界,将連結清單拆成2個子連結清單ln、hn
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // ln連結清單存入新桶的索引i位置
setTabAt(nextTab, i + n, hn); // hn連結清單存入新桶的索引i+n位置
setTabAt(tab, i, fwd); // 設定ForwardingNode占位
advance = true; // 表示目前舊桶的結點已遷移完畢
}
// CASE4.2:紅黑樹遷移
else if (f instanceof TreeBin) {
/**
* 下面的過程會先以連結清單方式周遊,複制所有結點,然後根據高低位組裝成兩個連結清單;
* 然後看下是否需要進行紅黑樹轉換,最後放到新table對應的桶中
*/
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 判斷是否需要進行 紅黑樹 <-> 連結清單 的轉換
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); // 設定ForwardingNode占位
advance = true; // 表示目前舊桶的結點已遷移完畢
}
else if (f instanceof ReservationNode) //jdk16特有,1.8沒有
throw new IllegalStateException("Recursive update");
}
}
}
}
}
- 這個方法複雜一些,我們把它一步一步分解開來看,後續我會看别的文章繼續補充:
①、我們先看 tranfer() 的開頭,會計算出一個stride變量的值,這個stride其實就是每個線程處理的桶區間,也就是步長: 💦
// stride可了解成“步長”,即“資料遷移”時,每個線程要負責舊table中的多少個桶,根據CPU決定步長
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE預設16
stride = MIN_TRANSFER_STRIDE; // subdivide range
- 根據是否是多核的CPU确定步長,不過最小步長為16,因為MIN_TRANSFER_STRIDE的預設值為16;
②、接下來就是進行擴容,當首次擴容時,将table數組的容量變成原來的2倍: 💦
if (nextTab == null) { // 第二個參數,即新的table為null說明第一次擴容
try {
@SuppressWarnings("unchecked")
// 建立新table數組,擴大一倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // 處理記憶體溢出(OOME)的情況
sizeCtl = Integer.MAX_VALUE; //将表示容量的sizeCtl 設定為最大值,然後傳回
return;
}
nextTable = nextTab; //擴容後的數組nextTable
transferIndex = n; // [transferIndex-stride, transferIndex-1]表示目前線程要進行資料遷移的桶區間
}
int nextn = nextTab.length;
- 注意上面的transferIndex變量,這是一個字段,table[transferIndex-stride, transferIndex-1]就是目前線程要進行資料遷移的桶區間,這個桶區間可以用擴容時的下标變量表示:
private transient volatile int transferIndex;
- 整個transfer()幾乎都在一個自旋操作中完成,從右往左 開始進行資料遷移,transfer的退出點是當某個線程處理完最後的table區段——table[0,stride-1]。
- transfer()主要包含4個遷移分類,即對4種不同情況進行處理,上面的源碼已經标注,現在我們按照難易程度來更詳細的解釋下各個分類③---->⑥所做的事情:
③、CASE2 : 桶table[i]為空:
- 當舊table的桶table[i] == null,說明原來這個桶就沒有資料,那就直接嘗試通過CAS操作放置一個ForwardingNode,表示這個桶已經處理完成,CAS設定成功則advance為true,表示該桶已經完成遷移。
// CASE2:舊桶本身為null,不用遷移,直接嘗試放一個ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
【注】:ForwardingNode我們在上一篇提到過,主要做占用,多線程進行資料遷移時,其它線程看到這個桶中是ForwardingNode結點,就知道這一段範圍的桶有線程已經在資料遷移了。另外,當最後一個線程完成遷移任務後,會周遊所有桶,看看是否都是ForwardingNode,如果是,那麼說明整個擴容/資料遷移的過程就完成了。
④、CASE3 : 桶table[i]不為空,但已經遷移完:
- 桶已經被ForwardingNode結點占用了,表示該桶的資料都遷移完了,當然标志位advance設為true:
// CASE3:該舊桶已經遷移完成,直接跳過
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
⑤、CASE4 : 桶table[i]未遷移完成: 💦
-
如果舊桶的資料未遷移完成,就要進行遷移,這裡根據桶中結點的類型分為:連結清單遷移、紅黑樹遷移;
⒈、連結清單遷移: 💦
- 連結清單遷移的過程大緻分為2步:
- 首先會周遊一遍原連結清單,找到最後一個相鄰runBit不同的結點(為了充當連結清單頭),runBit是根據key.hash和舊table長度n進行與運算得到的值,由于table的長度為2的幂次,是以runbit隻可能為0或最高位為1
- 其次會進行第二次連結清單周遊,按照第一次周遊找到的結點為界,将原連結清單分成2個子連結清單,再連結到新table的槽中。可以看到,新table的索引要麼是i,要麼是i+n,這裡就利用了前文中說的ConcurrentHashMap的rehash特點。
- 連結清單遷移的過程大緻分為2步:
// CASE4.1:桶的hash>0,說明是正常結點,則連結清單遷移
if (fh >= 0) {
/**
* 下面的過程會将舊桶中的連結清單分成兩部分:ln鍊和hn鍊
* ln鍊會插入到新table的槽i中,hn鍊會插入到新table的槽i+n中
*/
int runBit = fh & n; // 由于n是2的幂次,是以runBit要麼是0,要麼高位是1,fh是桶上的哈希值
Node<K,V> lastRun = f; // lastRun指向最後一個相鄰runBit不同的結點
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n; //b要麼是0,要麼高位是1
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}else {
hn = lastRun;
ln = null;
}
// 以lastRun所指向的結點為分界,将連結清單拆成2個子連結清單ln、hn,頭插法
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // ln連結清單存入新桶的索引i位置
setTabAt(nextTab, i + n, hn); // hn連結清單存入新桶的索引i+n位置
setTabAt(tab, i, fwd); // 設定ForwardingNode占位
advance = true; // 表示目前舊桶的結點已遷移完畢
}
⒉、紅黑樹遷移: 💦
- 紅黑樹的遷移按照連結清單周遊的方式進行,當連結清單結點超過/小于門檻值時,涉及紅黑樹<->連結清單的互相轉換 :
else if (f instanceof TreeBin) {
/**
* 下面的過程會先以連結清單方式周遊,複制所有結點,然後根據高低位組裝成兩個連結清單;
* 然後看下是否需要進行紅黑樹轉換,最後放到新table對應的桶中
*/
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 判斷是否需要進行 紅黑樹 <-> 連結清單 的轉換
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : //判斷門限,如果大于則繼續用樹,樹沒改變則直接傳回t
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); // 設定ForwardingNode占位
advance = true; // 表示目前舊桶的結點已遷移完畢
}
⑥、CASE5: 目前是最後一個遷移任務或出現擴容沖突:
- 我們剛才說了,調用transfer() 的線程會自動領用某個區段的桶,進行資料遷移操作,當區段的初始索引i變成負數的時候,說明目前線程處理的其實就是最後剩下的桶,并且處理完了,然後操作如下:
- 首先會更新sizeCtl變量,将擴容線程數減1,然後會做一些收尾工作:
- 其次設定table指向擴容後的新數組;
-
最後周遊一遍舊數組,確定每個桶的資料都遷移完成——被ForwardingNode占用;
[注]:可能在擴容過程中,出現擴容沖突的情況,比如多個線程領用了同一區段的桶,這時任何一個線程都不能進行資料遷移。
if (i < 0 || i >= n || i + n >= nextn) { // CASE1:目前是處理最後一個tranfer任務的線程或出現擴容沖突
int sc;
if (finishing) { // 所有桶遷移均已完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //門檻值為newTable的0.75倍
return;
}
// 擴容線程數減1,表示目前線程已完成自己的transfer任務
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判斷目前線程是否是本輪擴容中的最後一個線程,如果不是,則直接退出,如果是則設定标志位finishing
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
/**💙💙💙💙💙💙
* 最後一個資料遷移線程要重新檢查一次舊table中的所有桶,看是否都被正确遷移到新table了:
* ①正常情況下,重新檢查時,舊table的所有桶都應該是ForwardingNode;
* ②特殊情況下,比如擴容沖突(多個線程申請到了同一個transfer任務),此時目前線程領取的任務會廢棄,那麼最後檢查時,
* 還要處理因為廢棄而沒有被遷移的桶,把它們正确遷移到新table中
*/
i = n; // recheck before commit
}
}