天天看點

JDK1.7&1.8中ConcurrentHashMap解析

一.總體概述

HashMap在開發中很常用,但HashMap存在一個弊端就是線程不安全,解決辦法就是使用Hashtable代替或使用Collections.synchronizedMap(m);将HashMap轉換為線程安全的,但這兩種方法雖說實作了線程安全,但是并發性能比較差,因為是全表加鎖,那麼并發界的大牛Doug Lea就為我們提供了ConcurrentHashMap解決這個問題,不僅實作線程安全,還保證了并發的,一舉兩得。

建議:建議先看完JDK1.7及JDK1.8的HashMap源碼,再來檢視ConcurrentHashMap源碼,會簡單很多。

  • JDK1.7HashMap:https://blog.csdn.net/qq_36625757/article/details/89577865
  • JDK1.8HashMap:https://blog.csdn.net/qq_36625757/article/details/90038751

二.JDK1.7實作

JDK1.7&1.8中ConcurrentHashMap解析

在ConcurrentHashMap中不同于普通的HashMap,不僅有數組+連結清單的概念,還多了一個Segment數組,一個Segment包含多個HashEntry數組(HashMap中的數組) ,這樣就能通過分段加鎖,解決多線程下的安全性問題,又不至于像Hashtable那樣全表加鎖,導緻性能下降。 

1.構造函數

static final int DEFAULT_INITIAL_CAPACITY = 16;//初始HashEntry[]容量大小
static final float DEFAULT_LOAD_FACTOR = 0.75f;//預設加載因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16;//預設并發級别
static final int MAXIMUM_CAPACITY = 1 << 30;//最大HashEntry[]數組大小
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//Segment最小容量
static final int MAX_SEGMENTS = 1 << 16; //最大segment容量
static final int RETRIES_BEFORE_LOCK = 2;//鎖之前重試次數
final Segment<K,V>[] segments;//Segment數組

//空參構造
public ConcurrentHashMap() {
	this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
						 float loadFactor, int concurrencyLevel) {
	if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
		throw new IllegalArgumentException();
	//并發等級大于一個segment最大容量,就為一個segment最大容量
	if (concurrencyLevel > MAX_SEGMENTS)
		concurrencyLevel = MAX_SEGMENTS;
	int sshift = 0;
	int ssize = 1;//記錄需要建立的Segment數組大小
	while (ssize < concurrencyLevel) {//concurrencyLevel預設為16
		++sshift;
		ssize <<= 1;//每次循環ssize*=2^1,循環完畢後為16,這裡依舊是在找大于且最接近concurrencyLevel的2的幂次方數
	}
	this.segmentShift = 32 - sshift;
	this.segmentMask = ssize - 1;
	//校驗建立的HashEntry[]長度
	if (initialCapacity > MAXIMUM_CAPACITY)
		initialCapacity = MAXIMUM_CAPACITY;
	//初始時c=16/16=1
	int c = initialCapacity / ssize;
	if (c * ssize < initialCapacity)
		++c;
	//計算每一個Segment中HashEntry[]長度
	int cap = MIN_SEGMENT_TABLE_CAPACITY;
	while (cap < c)
		cap <<= 1;
	// create segments and segments[0]  建構一個Segment,傳入加載因子,門檻值,建立的HashEntry[]數組,作為Segment[]的第一個元素
	Segment<K,V> s0 =
		new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
						 (HashEntry<K,V>[])new HashEntry[cap]);
	//建立Segment[]
	Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
	UNSAFE.putOrderedObject(ss, SBASE, s0);// ordered write of segments[0] 通過原子操作,将s0添加到Segment[0]上
	this.segments = ss;//指派成員變量
}

static final class Segment<K,V> extends ReentrantLock implements Serializable {
	private static final long serialVersionUID = 2249069246763182397L;
	static final int MAX_SCAN_RETRIES =Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    //Segment對應的HashEntry數組,使用volatile保證可見性
	transient volatile HashEntry<K,V>[] table;
	transient int count;//HashEntry[]中存儲的鍵值個數
	transient int modCount;
	transient int threshold;//HashEntry門檻值
	final float loadFactor;//HashEntry加載因子

	Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
		this.loadFactor = lf;
		this.threshold = threshold;
		this.table = tab;
	}

    //.......   
}
           

空參構造調用有參構造,參數為 16,0.75,16,有參構造函數中建立了Segment數組及第一個Segmen對象,并初始化其中的HashEntry[]數組,初始長度為2,concurrencyLevel為并發級别,預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,是以理論上,這個時候,最多可以同時支援 16 個線程并發寫,隻要它們的操作分别分布在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。

  • Segment []長度為 16,不可以擴容
  • Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始門檻值為 1.5,也就是之後插入第一個元素不會擴容,第二個就需要擴容了
  • 這裡初始化了 segment[0],其他位置還是 null,至于為什麼要初始化 segment[0],之後會講解。
  • 目前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,可以把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

2.put()方法

@SuppressWarnings("unchecked")
public V put(K key, V value) {
	Segment<K,V> s;
	if (value == null)
		throw new NullPointerException();
	int hash = hash(key);//計算key的hash值
	//根據key的hash值計算Segment[]數組中的j索引位置
	int j = (hash >>> segmentShift) & segmentMask;//第一次插入資料時segmentShift=28,segmentMask=15
	if ((s = (Segment<K,V>)UNSAFE.getObject
		 (segments, (j << SSHIFT) + SBASE)) == null)//如果Segment中第j位置為null
		s = ensureSegment(j);//初始第j位置的Segment,之前在構造方法中,隻初始化了0位置的Segment
	return s.put(key, hash, value, false);//插入資料
}
           

這裡通過ensureSegment()對Segment[j]位置的Segment進行初始化

ensureSegment()方法

//初始化Segment[k]位置的元素
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
	final Segment<K,V>[] ss = this.segments;//擷取Segment[]數組
	long u = (k << SSHIFT) + SBASE; // raw offset
	Segment<K,V> seg;
	if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {//确定要初始化位置的Segment為null
		Segment<K,V> proto = ss[0]; // 獲得在構造方法中已經初始化的Segment[0]元素
		int cap = proto.table.length;//獲得其中HashEntry[]數組長度
		float lf = proto.loadFactor;//擷取其中的加載因子
		int threshold = (int)(cap * lf);//計算門檻值
		HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
		if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
			== null) { // 再次确定要初始化位置的Segment為null
			Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
			while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
				   == null) {
				if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))//使用UNSAFE的CAS,保證Segment[k]初始化成功
					break;
			}
		}
	}
	return seg;
}
           

ensureSegment()中使用CAS原子操作,保證初始化。

put()方法(Segment中)

//Segment中的put()方法,參數:要插入的key,key的hash值,值,false
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
	//分段鎖Segment為了保證插入資料的安全性,再插入之前要擷取對象獨占鎖
	//Segment繼承ReentrantLock,通過tryLock擷取獨占鎖,擷取鎖傳回null
	//沒有擷取到鎖,通過scanAndLockForPut()擷取
	HashEntry<K,V> node = tryLock() ? null :
		scanAndLockForPut(key, hash, value);
	V oldValue;
	try {
		HashEntry<K,V>[] tab = table;//擷取其中的HashEntry[]
		int index = (tab.length - 1) & hash;//通過Key的hash值計算需要存儲在HashEntry的位置
		//擷取HashEntry[index]位置的連結清單的第一個元素
		HashEntry<K,V> first = entryAt(tab, index);
		for (HashEntry<K,V> e = first;;) {
			if (e != null) {//目前循環到的連結清單節點不等于null
				K k;
				//節點中的key與目前要存儲的key相等
				if ((k = e.key) == key ||
					(e.hash == hash && key.equals(k))) {
					oldValue = e.value;
					if (!onlyIfAbsent) {//替換存儲的值
						e.value = value;
						++modCount;
					}
					break;
				}
				e = e.next;//指派e,用于下次循環
			}
			else {//目前循環到的連結清單節點等于null
				if (node != null)// 如果node不為 null,那就直接将它設定為連結清單表頭
					node.setNext(first);
				else
					node = new HashEntry<K,V>(hash, key, value, first);//建立
				int c = count + 1;
				if (c > threshold && tab.length < MAXIMUM_CAPACITY)//如果HashEntry[]中存儲的鍵值個數大于門檻值,并且小于最大值
					rehash(node);//擴容
				else
					setEntryAt(tab, index, node);//将節點加入到index連結清單中,設定為連結清單表頭
				++modCount;
				count = c;
				oldValue = null;
				break;
			}
		}
	} finally {
		unlock();//釋放鎖
	}
	return oldValue;
}
           

這裡在第一步會嘗試擷取鎖,如果沒有擷取到就通過scanAndLockForPut()擷取,scanAndLockForPut()執行完畢肯定擷取到了Segment鎖,是以put方法是線程安全的,scanAndLockForPut()是控制寫鎖的關鍵

scanAndLockForPut()方法,Segment中

//Segment中方法
static final int MAX_SCAN_RETRIES =Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
	HashEntry<K,V> first = entryForHash(this, hash);//通過目前segment對象及key的hash值,擷取連結清單第一個值
	HashEntry<K,V> e = first;
	HashEntry<K,V> node = null;
	int retries = -1; // negative while locating node
	while (!tryLock()) {//如果擷取鎖失敗,循環,直到擷取成功
		HashEntry<K,V> f; // to recheck first below
		if (retries < 0) {
			if (e == null) {//e等于null,表示連結清單為null
				if (node == null) // speculatively create node
					node = new HashEntry<K,V>(hash, key, value, null);
				retries = 0;
			}
			else if (key.equals(e.key))
				retries = 0;
			else
				e = e.next;//記錄下一個節點
		}
		//重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞隊列等待鎖
		else if (++retries > MAX_SCAN_RETRIES) {
			lock();//阻塞方法,擷取鎖成功後傳回
			break;
		}
		else if ((retries & 1) == 0 &&
				 (f = entryForHash(this, hash)) != first) {//當有新的節點進入到連結清單并且為表頭(其他線程操作的)
			e = first = f; // 重新設定e及first
			retries = -1;//重新設定retries,相當于重新做while循環
		}
	}
	return node;
}
           

這裡會通過一個while循環嘗試擷取Segment鎖,并循環計數,如果retries>1/64(單核1,多核64),則通過lock()進入AQS阻塞隊列,直到擷取鎖後,才會傳回,跳出循環。

rehash()擴容方法,Segment中

//擴容,Segment中,隻對Segment中的HashEntry[]擴容
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
	HashEntry<K,V>[] oldTable = table;//儲存老的HashEntry[]
	int oldCapacity = oldTable.length;
	int newCapacity = oldCapacity << 1;//擴容為原來的2倍
	threshold = (int)(newCapacity * loadFactor);//計算新的門檻值
	HashEntry<K,V>[] newTable =
		(HashEntry<K,V>[]) new HashEntry[newCapacity];//建立新的HashEntry[]
	int sizeMask = newCapacity - 1;
	for (int i = 0; i < oldCapacity ; i++) {//循環移動老的數組中的元素
		HashEntry<K,V> e = oldTable[i];
		if (e != null) {
			HashEntry<K,V> next = e.next;
			int idx = e.hash & sizeMask;//計算在HashEntry[]中存放的位置
			if (next == null)   //目前節點的下一個節點為null,說明目前連結清單就一個節點
				newTable[idx] = e;//直接指派
			else { //存在連結清單
				HashEntry<K,V> lastRun = e;
				int lastIdx = idx;
				//循環找到一個 lastRun 節點,這個節點之後的所有元素是将要放到一起的
				for (HashEntry<K,V> last = next;
					 last != null;
					 last = last.next) {
					int k = last.hash & sizeMask;
					if (k != lastIdx) {
						lastIdx = k;
						lastRun = last;
					}
				}
				newTable[lastIdx] = lastRun;//複制連結清單
				//處理lastRun之前的節點,這些節點可能配置設定在另一個連結清單中,也可能配置設定到上面的那個連結清單中
				for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
					V v = p.value;
					int h = p.hash;
					int k = h & sizeMask;
					HashEntry<K,V> n = newTable[k];
					newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
				}
			}
		}
	}
	 // 将新加的 node 放到新數組中剛剛的兩個連結清單之一的頭部
	int nodeIndex = node.hash & sizeMask;
	node.setNext(newTable[nodeIndex]);
	newTable[nodeIndex] = node;
	table = newTable;
}
           
  • 擴容後的數組長度是原數組的2倍。
  • 擴容的是Segmen中的HashEntry[],不是所有的Segment中的HashEntry[]。
  • 擴容不需要考慮并發,因為到這裡的時候,是持有該 segment 的獨占鎖的。
  • 大for循環中存在兩個下for循環,仔細檢視發現第一個for循環如果不要也是可以的,但是,這個 for 循環下來,如果 lastRun 的後面還有比較多的節點,那麼這次就是值得的。因為我們隻需要克隆 lastRun 前面的節點,後面的一串節點跟着 lastRun 走就是了,不需要做任何操作。

至此,ConcurrentHashMap的put就分析完成,其保證擷取segment的獨占鎖主要在scanAndLockForPut()方法中,而Segment繼承ReentrantLock,隻需要調用tryLock()/lock()就可以擷取獨占鎖。

3.get()方法

public V get(Object key) {
	Segment<K,V> s;
	HashEntry<K,V>[] tab;
	int h = hash(key);//計算key的hash值
	long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
	//通過key的Hash值找到Segment及其中的HashEntry[]
	if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
		(tab = s.table) != null) {
		//通過key找到連結清單并循環
		for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
				 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
			 e != null; e = e.next) {
			K k;
			//如果key相等就傳回value
			if ((k = e.key) == key || (e.hash == h && key.equals(k)))
				return e.value;
		}
	}
	return null;
}
           
  • 計算 hash 值,找到 segment 數組中的具體位置,并且找到HashEntry[]
  • 在HashEntry[]中,根據 hash 找到數組中具體的位置
  • 循環連結清單找到對應key傳回value

4.remove()方法

public V remove(Object key) {
	int hash = hash(key);//計算Hash值
	Segment<K,V> s = segmentForHash(hash);//通過key擷取Segment
	return s == null ? null : s.remove(key, hash, null);
}
           

這裡比較簡單,主要用于擷取Key所在Segment,最後調用segment的remove()方法

remove()方法,Segment中

//Segmen中的remove方法,參數為:key,key的Hash,null
final V remove(Object key, int hash, Object value) {
	if (!tryLock())//先嘗試擷取鎖
		scanAndLock(key, hash);//位擷取到,通過scanAndLock()擷取
	V oldValue = null;
	try {
		HashEntry<K,V>[] tab = table;
		int index = (tab.length - 1) & hash;//計算key在HashEtry[]中的存儲位置
		HashEntry<K,V> e = entryAt(tab, index);//擷取這個位置上的連結清單第一個節點
		HashEntry<K,V> pred = null;
		while (e != null) {//循環
			K k;
			HashEntry<K,V> next = e.next;//擷取下一個節點
			//在連結清單中找到了key所在的節點
			if ((k = e.key) == key ||
				(e.hash == hash && key.equals(k))) {
				V v = e.value;
				if (value == null || value == v || value.equals(v)) {
					if (pred == null)//pred==null說明key所在節點為連結清單第一個節點
						setEntryAt(tab, index, next);//直接将下一個節點設定為頭節點
					else
						pred.setNext(next);//否則将目前節點的上一個節點的next設定為下一個節點,這樣key所在節點就移除了
					++modCount;
					--count;
					oldValue = v;
				}
				break;
			}
			pred = e;//記錄目前節點,用于下次循環還是表示上一個節點
			e = next;//修改下次循環的節點
		}
	} finally {
		unlock();//釋放鎖
	}
	return oldValue;
}
           

這裡上來又嘗試擷取一次鎖,如果沒有擷取到就調用scanAndLock()擷取,scanAndLock()會直到擷取到鎖後才會傳回,是以就能保證remove()線程安全,移除節點又分為移除節點是不是連結清單頭節點,是就直接将下一個節點設定為頭結點,否則就将目前節點上一個節點的next設定為下一個節點

scanAndLock()方法,類似于上面scanAndLockForPut()方法,Segment中

private void scanAndLock(Object key, int hash) {
	HashEntry<K,V> first = entryForHash(this, hash);//擷取頭節點
	HashEntry<K,V> e = first;
	int retries = -1;
	while (!tryLock()) {//嘗試擷取鎖,失敗循環
		HashEntry<K,V> f;
		if (retries < 0) {
			if (e == null || key.equals(e.key))
				retries = 0;
			else
				e = e.next;
		}
		else if (++retries > MAX_SCAN_RETRIES) {//循環次數>1/64(單核1多核64)
			lock();//進入AQS的阻塞隊列中,隻有擷取鎖,才會傳回
			break;
		}
		//如果發現頭節點被其他線程改變了,重新while循環
		else if ((retries & 1) == 0 &&
				 (f = entryForHash(this, hash)) != first) {
			e = first = f;
			retries = -1;
		}
	}
}
           

三:JDK1.8實作

JDK1.7&amp;1.8中ConcurrentHashMap解析

JDK1.8中HashMap實作當連結清單長度大于等于7且數組長度大于64時,會自動将連結清單轉換為紅黑樹存儲,這樣的目的是盡量提高查詢效率,而在JDK1.8中ConcurrentHashMap相對于JDK1.7的ConcurrentHashMap也做了優化,JDK1.7中ConcurrentHashMap使用分段鎖的形式,比較細粒度的控制線程安全性問題,不至于像Hashteble那樣,使用全表加鎖,限制了并發性,但看過源碼後發現,每次在get或put的時候,都會先查詢到連結清單的第一個元素,換個思路想想,如果能就在第一個記錄上加鎖,這樣不也可以解決線程安全性問題,是以JDK1.8就不存在Segmrnt[]了,而是使用CAS+synchronized的形式解決線程安全性問題,這樣就比JDK1.7更加細粒度的加鎖,并發性能更好。

1.構造方法

private static final int MAXIMUM_CAPACITY = 1 << 30;// node數組最大容量:2^30=1073741824
private static final int DEFAULT_CAPACITY = 16;// 預設初始值,必須是2的幂次方
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//數組可能最大值,需要與toArray()相關方法關聯
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;//并發級别,遺留下來的,為相容以前的版本
private static final float LOAD_FACTOR = 0.75f;//加載因子
static final int TREEIFY_THRESHOLD = 8;//連結清單轉紅黑樹最大節點
static final int UNTREEIFY_THRESHOLD = 6;//紅黑樹轉連結清單最小節點
static final int MIN_TREEIFY_CAPACITY = 64;//連結清單轉紅黑樹數組最大長度
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;// 2^15-1,help resize的最大線程數
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;// 32-16=16,sizeCtl中記錄size大小的偏移量
static final int MOVED     = -1; // forwarding nodes的hash值
static final int TREEBIN   = -2; // 樹根節點的hash值
static final int RESERVED  = -3; // ReservationNode的hash值
static final int NCPU = Runtime.getRuntime().availableProcessors();// 可用處理器數量
transient volatile Node<K,V>[] table;//存放node的數組
/*控制辨別符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
 *當為負數時:-1代表正在初始化,-N代表有N-1個線程正在 進行擴容
 *當為0時:代表當時的table還沒有被初始化
 *當為正數時:表示初始化或者下一次進行擴容的大小*/
private transient volatile int sizeCtl;
private transient volatile int transferIndex;//用于記錄遷移資料時已經遷移到的索引位置


//空參構造
public ConcurrentHashMap() {}

//指定初始數組大小
public ConcurrentHashMap(int initialCapacity) {
	if (initialCapacity < 0)
		throw new IllegalArgumentException();
	int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
			   MAXIMUM_CAPACITY :
			   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
	this.sizeCtl = cap;
}

//連結清單儲存key,value及key的hash值的資料結構。
static class Node<K,V> implements Map.Entry<K,V> {
	final int hash;
	final K key;
	volatile V val;
	volatile Node<K,V> next;

	Node(int hash, K key, V val, Node<K,V> next) {
		this.hash = hash;
		this.key = key;
		this.val = val;
		this.next = next;
	}
	//......
}

//紅黑樹節點
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  //父節點
    TreeNode<K,V> left;//左子樹
    TreeNode<K,V> right;//右子樹
    TreeNode<K,V> prev;  
    boolean red; //标志紅黑樹的紅節點
    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
	//.......
}

//一個特殊的Node節點,hash值為-1,其中存儲nextTable的引用。
final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
	//.......
}

//存儲樹形結構的容器,它提供轉換黑紅樹的一些條件和鎖的控制
static final class TreeBin<K,V> extends Node<K,V> {
    //指向TreeNode清單和根節點
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // 讀寫鎖狀态
    static final int WRITER = 1; // 擷取寫鎖的狀态
    static final int WAITER = 2; // 等待寫鎖的狀态
	static final int READER = 4; // 增加資料時讀鎖的狀态
	//.......
}
           

2.put()方法

public V put(K key, V value) {
        return putVal(key, value, false);
    }
final V putVal(K key, V value, boolean onlyIfAbsent) {
	if (key == null || value == null) throw new NullPointerException();
	int hash = spread(key.hashCode());//計算key的Hash值
	int binCount = 0;//記錄連結清單的長度,用與後邊判斷是否需要轉為紅黑樹
	for (Node<K,V>[] tab = table;;) {//循環數組
		Node<K,V> f; int n, i, fh;
		if (tab == null || (n = tab.length) == 0)
			tab = initTable();//初始化數組
		//擷取數組上key對應位置的頭節點,如果為null表示該位置不存在連結清單
		else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
			//直接建立一個節點,通過CAS插入到對應位置
			if (casTabAt(tab, i, null,
						 new Node<K,V>(hash, key, value, null)))
				break;                   // no lock when adding to empty bin
		}
		//頭節點的hash==-1,表示目前有線程在擴容數組,節點為ForwardingNode類型的,
        //hash值為-1,數組在擴容時,會将正在遷移資料原數組的連結清單或紅黑樹的頭結點設定為        
        //ForwardingNode類型,表示目前連結清單或紅黑樹已經前已完成,也就是說目前正在進行資料遷移
        //,如果遷移完畢,會将新數組覆寫掉老數組,也就不會出現ForwardingNode類型的節點
		else if ((fh = f.hash) == MOVED)
			//目前線程幫着移動資料
			tab = helpTransfer(tab, f);
		else {
			V oldVal = null;
			synchronized (f) {//擷取頭節點的對象鎖
				if (tabAt(tab, i) == f) {
					if (fh >= 0) {
						binCount = 1;
						for (Node<K,V> e = f;; ++binCount) {//循環連結清單
							K ek;
							if (e.hash == hash &&
								((ek = e.key) == key ||
								 (ek != null && key.equals(ek)))) {//目前循環到的節點與要插入的key相等
								//替換其中的value
								oldVal = e.val;
								if (!onlyIfAbsent)
									e.val = value;
								break;
							}
							Node<K,V> pred = e;
							if ((e = e.next) == null) {//如果目前節點的下一個節點為null,連結清單中不存在目前要插入的key
								//建立一個節點,作為目前節點的下一個節點
								pred.next = new Node<K,V>(hash, key,
														  value, null);
								break;
							}
						}
					}
					else if (f instanceof TreeBin) {//如果頭結點是紅黑樹節點
						Node<K,V> p;
						binCount = 2;
						//将鍵值加入到紅黑樹中,傳回p如果過!=null,說明key在紅黑樹中重複了
						if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
													   value)) != null) {
							//替換value
							oldVal = p.val;
							if (!onlyIfAbsent)
								p.val = value;
						}
					}
				}
			}
			if (binCount != 0) {
				if (binCount >= TREEIFY_THRESHOLD)//如果連結清單長度>=8
					treeifyBin(tab, i);//樹化連結清單
				if (oldVal != null)
					return oldVal;
				break;
			}
		}
	}
	addCount(1L, binCount);
	return null;
}
           

第一次插入資料時,如果數組不存在就低用initTable()初始化數組

initTable()方法,初始化數組

//初始化數組
private final Node<K,V>[] initTable() {
	Node<K,V>[] tab; int sc;
	while ((tab = table) == null || tab.length == 0) {
		//當構造方法指定數組大小時,sizeCtl為數組大小,不指定預設為0
		if ((sc = sizeCtl) < 0)
			Thread.yield(); //目前線程讓出CPU執行時間片,讓其它線程(優先級高于目前線程)或線程取競争CPU時間片
		//這裡通過CAS操作,傳入目前ConcurrentHashMap對象,期望值,實際值,需要改變為的值
		//如果期望值與實際值相等,則将sizeCtl改為-1,否則CAS操作失敗傳回false
		//CAS操作成功後,上一個if判斷就成立了
		else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
			try {
				if ((tab = table) == null || tab.length == 0) {
					//sc如果大于0,初始化的數組大小即為sizeCtl,否則為16
					int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
					//初始化數組
					@SuppressWarnings("unchecked")
					Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
					table = tab = nt;
					sc = n - (n >>> 2);
				}
			} finally {
				sizeCtl = sc;
			}
			break;
		}
	}
	return tab;
}
           

通過CAS保證隻有一個線程初始化數組成功。

casTabAt()方法,CAS操作主要用來設定連結清單表頭

//設定節點到連結清單中,參數:數組,需要插入的索引,null,建立節點
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
	//通過CAS操作,将建立的節點插入到連結清單表頭
	return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
           

helpTransfer()方法,其他線程正在擴容,目前線程幫忙遷移資料

//擴容時,幫忙移動資料
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
	Node<K,V>[] nextTab; int sc;
	if (tab != null && (f instanceof ForwardingNode) &&
		(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
		int rs = resizeStamp(tab.length);
		while (nextTab == nextTable && table == tab &&
			   (sc = sizeCtl) < 0) {
			if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
				sc == rs + MAX_RESIZERS || transferIndex <= 0)
				break;
			if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
				//這裡也在遷移資料,說明當多線程擴容數組時,這個線程是在幫忙遷移資料,這樣遷移效率會更快
				transfer(tab, nextTab);
				break;
			}
		}
		return nextTab;
	}
	return table;
}
           

JDK1.8的擴容是比較麻煩的,當一個線程發起擴容後,其他線程也發現數組需要擴容時,這個線程就會去幫忙遷移資料,而不是擴容,這樣效率會更高。

treeifyBin()方法,将連結清單轉換為紅黑樹

//樹化連結清單,将連結清單轉為紅黑樹
private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
	if (tab != null) {
		//如果數組長度<64,就隻擴容
		if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
			tryPresize(n << 1);//擴容,傳入的值為數組長度的2倍
		else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
			synchronized (b) {//依舊是擷取連結清單頭節點的鎖
				if (tabAt(tab, index) == b) {
					TreeNode<K,V> hd = null, tl = null;
					for (Node<K,V> e = b; e != null; e = e.next) {//循環連結清單
						//建構新的樹節點
						TreeNode<K,V> p =
							new TreeNode<K,V>(e.hash, e.key, e.val,
											  null, null);
						if ((p.prev = tl) == null)//第一次循環是,第一個節點為頭結點
							hd = p;
						else
							tl.next = p;//上個節點的next=目前節點
						tl = p;
					}
					//将紅黑樹加入到數組的對應索引位置
					setTabAt(tab, index, new TreeBin<K,V>(hd));
				}
			}
		}
	}
}
           

tryPresize()方法,擴容

//擴容,size為數組長度的2倍
private final void tryPresize(int size) {
	//size>=2^30,c=2^30,否則c為大于size+size/2+1且最接近的2^n數
	int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
		tableSizeFor(size + (size >>> 1) + 1);
	int sc;
	while ((sc = sizeCtl) >= 0) {
		Node<K,V>[] tab = table; int n;
		if (tab == null || (n = tab.length) == 0) {//如果數組為null,這初始化,其中的邏輯與initTable()中類似
			n = (sc > c) ? sc : c;
			if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
				try {
					if (table == tab) {
						@SuppressWarnings("unchecked")
						Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
						table = nt;
						sc = n - (n >>> 2);
					}
				} finally {
					sizeCtl = sc;
				}
			}
		}
		else if (c <= sc || n >= MAXIMUM_CAPACITY)//如果新數組的長度<0或>=2^30跳出循環
			break;
		else if (tab == table) {
			int rs = resizeStamp(n);
			if (sc < 0) {
				Node<K,V>[] nt;
				if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
					sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
					transferIndex <= 0)//如果transferIndex<=0說明前已完畢,目前while循環跳出
					break;
				//用CAS将sizeCtl加1
				if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
					transfer(tab, nt);//遷移資料,nt不等于null,說明此線程不是第一個發起遷移資料的線程
			}
			//将sizeCtl設定為 (rs << RESIZE_STAMP_SHIFT) + 2)
			else if (U.compareAndSwapInt(this, SIZECTL, sc,
										 (rs << RESIZE_STAMP_SHIFT) + 2))
				//第一次發起遷移資料的線程是傳入null,當第二個線程再來遷移資料時,就會執行上面的遷移方法,不會傳入null
				transfer(tab, null);
		}
	}
}
           

要說JDK1.8的ConcurrentHashMap原理比較難,就是在數組的擴容及資料的遷移上比較難懂,下面簡單描述一下資料遷移的流程。

遷移資料機制:原數組長度為 n,是以我們有 n 個遷移任務,讓每個線程每次負責一個小任務是最簡單的,每做完一個任務再檢測是否有其他沒做完的任務,幫助遷移就可以了,而 Doug Lea 使用了一個 stride,簡單了解就是步長,每個線程每次負責遷移其中的一部分,如每次遷移 16 個小任務。是以,我們就需要一個全局的排程者來安排哪個線程執行哪幾個任務,這個就是屬性 transferIndex 的作用。第一個發起資料遷移的線程會将 transferIndex 指向原數組最後的位置,然後從後往前的 stride 個任務屬于第一個線程,然後将 transferIndex 指向新的位置,再往前的 stride 個任務屬于第二個線程,依此類推。當然,這裡說的第二個線程不是真的一定指代了第二個線程,也可以是同一個線程。其實就是将一個大的遷移任務分為了一個個任務包。

transfer()方法,資料的遷移

//移動資料,從數組的後面向前遷移
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
	int n = tab.length, stride;
	//stride 在單核下直接等于n,多核模式下為 (n>>>3)/NCPU,最小值是 16
    //stride 可以了解為"步長",有n個位置是需要進行遷移的,
    //将這n個任務分為多個任務包,每個任務包有stride個任務
	if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
		stride = MIN_TRANSFER_STRIDE;
	
	//nextTab為擴容後的新數組
	//如果 nextTab 為 null,先進行一次初始化
    //第一個發起遷移的線程調用此方法時,參數 nextTab 為 null
    //之後參與遷移的線程調用此方法時,nextTab 不會為 null
	if (nextTab == null) {
		try {
			@SuppressWarnings("unchecked")
			//建立一個新的Node[],長度為現有數組長度的2倍,這裡就在擴容
			Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
			nextTab = nt;
		} catch (Throwable ex) {      // try to cope with OOME
			sizeCtl = Integer.MAX_VALUE;
			return;
		}
		nextTable = nextTab;//指派給ConcurrentHashMap的成員變量nextTable
		//transferIndex是ConcurrentHashMap成員變量,用于控制遷移的位置
		transferIndex = n;
	}
	int nextn = nextTab.length;
	
	//ForwardingNode 翻譯過來就是正在被遷移的 Node
    //這個構造方法會生成一個Node,key、value 和 next 都為 null,關鍵是 hash 為 MOVED(-1)
    //後面我們會看到,原數組中位置 i 處的節點完成遷移工作後,
    //就會将位置 i 處設定為這個 ForwardingNode,用來告訴其他線程該位置已經處理過了
    //是以它其實相當于是一個标志。
	ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
	
	//advance指的是做完了一個位置的遷移工作,可以準備做下一個位置的了
	boolean advance = true;
	boolean finishing = false;
	
	//i為索引,bound為邊界
	for (int i = 0, bound = 0;;) {
		Node<K,V> f; int fh;
		
		//advance=true表示可以進行下一個位置的遷移
		while (advance) {
			int nextIndex, nextBound;
			if (--i >= bound || finishing)
				advance = false;
            //這裡transferIndex一旦小于等于0,說明原數組的所有位置都有相應的線程去處理了
			else if ((nextIndex = transferIndex) <= 0) {
				i = -1;
				advance = false;
			}
			
			//通過CAS設定transferIndex的值
			//nextBound = (nextIndex > stride ? nextIndex - stride : 0),計算下一次遷移的邊界
			//未遷移的數組長度是否大于步長,大于則下一次遷移的邊界等其內插補點,否則這次就能遷移完畢,下一次遷移的邊界為0
			//transferIndex=0是表示遷移完畢,tryPresize()中的while就會跳出循環
			else if (U.compareAndSwapInt
					 (this, TRANSFERINDEX, nextIndex,
					  nextBound = (nextIndex > stride ?
								   nextIndex - stride : 0))) {
				//設定邊界
				bound = nextBound;
				//設定下次遷移的起始索引
				i = nextIndex - 1;
				//設定fals,表示這次遷移完畢
				advance = false;
			}
		}
		
		if (i < 0 || i >= n || i + n >= nextn) {
			int sc;
			if (finishing) {
				//所有的遷移工作已經完後
				nextTable = null;
				//設定新數組
				table = nextTab;
				//重新計算sizeCtl:n是原數組長度,是以sizeCtl得出的值将是新數組長度的0.75倍
				sizeCtl = (n << 1) - (n >>> 1);
				return;
			}
			//通過CAS将sizeCtl-1,表示做完自己的任務了
			if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
				if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
					return;//任務結束退出
				//如果(sc - 2)== resizeStamp(n) << RESIZE_STAMP_SHIFT
				//表述所有遷移任務都已經完成,finishing=true,進入上面的if
				finishing = advance = true;
				i = n; //索引等于原數組長度
			}
		}
		//如果位置i處是空的,沒有任何節點,那麼放入剛剛初始化的 ForwardingNode空節點
		else if ((f = tabAt(tab, i)) == null)
			advance = casTabAt(tab, i, null, fwd);
		//該位置處是一個 ForwardingNode(ForwardingNode節點的Hash值等于MOVED(-1)),代表該位置已經遷移過了
		else if ((fh = f.hash) == MOVED)
			advance = true;
		else {
			synchronized (f) {//獲得數組i位置的頭節點的對象鎖,開始處理其遷移工作
				if (tabAt(tab, i) == f) {
					Node<K,V> ln, hn;
					if (fh >= 0) {//Hash值大于0表示是連結清單
						int runBit = fh & n;//先計算這個節點在數組中存儲的位置
						Node<K,V> lastRun = f;
						//循環找到連結清單中lastRun節點
						for (Node<K,V> p = f.next; p != null; p = p.next) {
							int b = p.hash & n;
							if (b != runBit) {
								runBit = b;
								lastRun = p;
							}
						}
						if (runBit == 0) {
							ln = lastRun;
							hn = null;
						}
						else {
							hn = lastRun;
							ln = null;
						}
						for (Node<K,V> p = f; p != lastRun; p = p.next) {
							int ph = p.hash; K pk = p.key; V pv = p.val;
							//通過以下if判斷将連結清單分為兩個連結清單
							if ((ph & n) == 0)
								ln = new Node<K,V>(ph, pk, pv, ln);
							else
								hn = new Node<K,V>(ph, pk, pv, hn);
						}
						//将兩個連結清單分别放在新連結清單中,對應位置為  原位置  和  原位置索引+原數組長度位置
						setTabAt(nextTab, i, ln);
						setTabAt(nextTab, i + n, hn);
						//将原數組的該位置設定為ForwardingNode,表示已經遷移完畢
						setTabAt(tab, i, fwd);
						advance = true;
					}
					else if (f instanceof TreeBin) {//如果目前節點是紅黑樹
						TreeBin<K,V> t = (TreeBin<K,V>)f;
						TreeNode<K,V> lo = null, loTail = null;
						TreeNode<K,V> hi = null, hiTail = null;
						int lc = 0, hc = 0;
						for (Node<K,V> e = t.first; e != null; e = e.next) {//循環所有節點
							int h = e.hash;
							TreeNode<K,V> p = new TreeNode<K,V>
								(h, e.key, e.val, null, null);
							//以下的if-else還是将紅黑樹一分為二
							if ((h & n) == 0) {
								if ((p.prev = loTail) == null)
									lo = p;
								else
									loTail.next = p;
								loTail = p;
								++lc;
							}
							else {
								if ((p.prev = hiTail) == null)
									hi = p;
								else
									hiTail.next = p;
								hiTail = p;
								++hc;
							}
						}
						// 如果一分為二後,節點數少于 8,那麼将紅黑樹轉換回連結清單
						ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
							(hc != 0) ? new TreeBin<K,V>(lo) : t;
						hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
							(lc != 0) ? new TreeBin<K,V>(hi) : t;
						//将樹/連結清單發到新數組中,對應位置為  原位置  和  原位置索引+原數組長度位置
						setTabAt(nextTab, i, ln);
						setTabAt(nextTab, i + n, hn);
						//将原數組的該位置設定為ForwardingNode,表示已經遷移完畢
						setTabAt(tab, i, fwd);
						advance = true;
					}
				}
			}
		}
	}
}
           

transfer()方法遷移資料,依次隻能遷移stride個長度的資料,是以transfer()一次不能遷移完所有資料,需要由tryPresize()方法的控制,tryPresize()方法中當transferIndex <= 0時,表示遷移完畢,其中while就會跳出循環,整個資料的遷移工作也就完成了。

3.get()方法

public V get(Object key) {
	Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
	int h = spread(key.hashCode());//計算key的Hash值	
	if ((tab = table) != null && (n = tab.length) > 0 &&
		(e = tabAt(tab, (n - 1) & h)) != null) {
		//如果第一節點的key等于目前要查詢的key,就返value
		if ((eh = e.hash) == h) {
			if ((ek = e.key) == key || (ek != null && key.equals(ek)))
				return e.val;
		}
		//頭結點的hash值<0,表示正在擴容或者其節點紅黑樹
		//參考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)
		//ForwardingNode中儲存了nextTable為擴容的新數組,其中的find方法會在新數組中查詢對應的節點
		else if (eh < 0)
			return (p = e.find(h, key)) != null ? p.val : null;
		while ((e = e.next) != null) {//循環連結清單直到找到key對應節點傳回value
			if (e.hash == h &&
				((ek = e.key) == key || (ek != null && key.equals(ek))))
				return e.val;
		}
	}
	return null;
}
           

ForwardingNode.find()方法

Node<K,V> find(int h, Object k) {
	//nextTable為新數組
	outer: for (Node<K,V>[] tab = nextTable;;) {
		Node<K,V> e; int n;
		if (k == null || tab == null || (n = tab.length) == 0 ||
			(e = tabAt(tab, (n - 1) & h)) == null)
			return null;
		for (;;) {//死循環,保證直到擴容完畢
			int eh; K ek;
			//找到key對應的節點就傳回
			if ((eh = e.hash) == h &&
				((ek = e.key) == k || (ek != null && k.equals(ek))))
				return e;
			if (eh < 0) {
				//目前節點是ForwardingNode類型的,重新擷取一下新數組,相當于循環重新開始
				//防止在查找的過程中,遷移資料導緻的問題
				if (e instanceof ForwardingNode) {
					tab = ((ForwardingNode<K,V>)e).nextTable;
					continue outer;
				}
				else//是紅黑樹節點,通過TreeBin的find方法查找
					return e.find(h, k);
			}
			//如果查找完畢還是沒找到就傳回為null
			if ((e = e.next) == null)
				return null;
		}
	}
}
           

TreeBin.find()方法

final Node<K,V> find(int h, Object k) {
	if (k != null) {
		//first為紅黑樹中的第一個節點
		for (Node<K,V> e = first; e != null; ) {
			int s; K ek;
			if (((s = lockState) & (WAITER|WRITER)) != 0) {
				//如果節點key相等就傳回目前節點
				if (e.hash == h &&
					((ek = e.key) == k || (ek != null && k.equals(ek))))
					return e;
				e = e.next;
			}
			else if (U.compareAndSwapInt(this, LOCKSTATE, s,
										 s + READER)) {
				TreeNode<K,V> r, p;
				try {
					p = ((r = root) == null ? null :
						 r.findTreeNode(h, k, null));
				} finally {
					Thread w;
					if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
						(READER|WAITER) && (w = waiter) != null)
						LockSupport.unpark(w);
				}
				return p;
			}
		}
	}
	return null;
}
           

4.remove()方法

public V remove(Object key) {
	return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
	int hash = spread(key.hashCode());//計算hash值
	for (Node<K,V>[] tab = table;;) {
		Node<K,V> f; int n, i, fh;
		if (tab == null || (n = tab.length) == 0 ||
			(f = tabAt(tab, i = (n - 1) & hash)) == null)
			break;
		else if ((fh = f.hash) == MOVED)//如果正在進行數組的遷移資料,就幫忙遷移資料
			tab = helpTransfer(tab, f);
		else {
			V oldVal = null;
			boolean validated = false;
			synchronized (f) {//加鎖
				if (tabAt(tab, i) == f) {
					if (fh >= 0) {//是連結清單
						validated = true;
						for (Node<K,V> e = f, pred = null;;) {
							K ek;
							//key的hash值與目前節點相等
							if (e.hash == hash &&
								((ek = e.key) == key ||
								 (ek != null && key.equals(ek)))) {
								V ev = e.val;
								//cv參數傳入就為null,會進入if
								if (cv == null || cv == ev ||
									(ev != null && cv.equals(ev))) {
									oldVal = ev;
									if (value != null)
										e.val = value;
									else if (pred != null)
										pred.next = e.next;
									else
										//将數組的i位置替換為目前節點的下一個節點,意思是移除目前節點
										setTabAt(tab, i, e.next);
								}
								break;
							}
							pred = e;
							if ((e = e.next) == null)//循環到最後了就直接退出
								break;
						}
					}
					else if (f instanceof TreeBin) {//紅黑樹
						validated = true;
						TreeBin<K,V> t = (TreeBin<K,V>)f;
						TreeNode<K,V> r, p;
						//樹的根節點不等于null,并且目前要移除的key也有對應的節點
						if ((r = t.root) != null &&
							(p = r.findTreeNode(hash, key, null)) != null) {
							V pv = p.val;
							if (cv == null || cv == pv ||
								(pv != null && cv.equals(pv))) {
								oldVal = pv;
								if (value != null)
									p.val = value;
								else if (t.removeTreeNode(p))//從紅黑樹中移除p節點
									setTabAt(tab, i, untreeify(t.first));//如果需要,将紅黑樹轉為連結清單設定到數組中
							}
						}
					}
				}
			}
			if (validated) {
				if (oldVal != null) {
					if (value == null)
						addCount(-1L, -1);
					return oldVal;
				}
				break;
			}
		}
	}
	return null;
}
           

四:總結

  • JDK1.8的實作降低鎖的粒度,JDK1.7版本鎖的粒度是基于Segment的,包含多個HashEntry,而JDK1.8鎖的粒度就是HashEntry(首節點)。
  • JDK1.8版本的資料結構變得更加簡單,使得操作也更加清晰流暢,因為已經使用synchronized來進行同步,是以不需要分段鎖的概念,也就不需要Segment這種資料結構了,由于粒度的降低,實作的複雜度也增加了。
  • JDK1.8使用紅黑樹來優化連結清單,基于長度很長的連結清單的周遊是一個很漫長的過程,而紅黑樹的周遊效率是很快的,代替一定門檻值的連結清單。

JDK1.8為什麼使用内置鎖synchronized來代替重入鎖ReentrantLock?

  • 因為粒度降低了,在相對而言的低粒度加鎖方式,synchronized并不比ReentrantLock差,在粗粒度加鎖中ReentrantLock可能通過Condition來控制各個低粒度的邊界,更加的靈活,而在低粒度中,Condition的優勢就沒有了
  • JVM的開發團隊從來都沒有放棄synchronized,而且基于JVM的synchronized優化空間更大,使用内嵌的關鍵字比使用API更加自然
  • 在大量的資料操作下,對于JVM的記憶體壓力,基于API的ReentrantLock會開銷更多的記憶體,雖然不是瓶頸,但是也是一個選擇依據