文章目錄
-
- ConcurrentModificationException 異常
- 并發 List
-
- 簡介
- 詳解
-
- SynchronizedList
- CopyOnWriteArrayList
- Vector
- 并發 Set
- 并發 Map
-
- 簡介
- ConcurrentHashMap (JDK7、8 設計差别較大)
-
- JDK7源碼
- JDK7設計: 使用`Segment`分段鎖,減小鎖粒度
- JDK8設計
- JDK8源碼
- 參考
ConcurrentModificationException 異常
-
包下日常使用的大部分集合都是線程不安全的,如java.util
、ArrayList
等HashMap
- 簡單的例子:
并發修改異常ConcurrentModificationException
public class ConcurrentModificationExceptionTest {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
list.add("張三");
list.add("李四");
list.add("王五");
Iterator<String> iterator = list.iterator();
//疊代集合
while (iterator.hasNext()) {
String next = iterator.next();
if ("張三".equals(next)) {
// 在集合周遊的時候,不能由其他的線程對其内部元素做修改
// list.remove(next);
list.add("趙六");
} else {
System.out.println(next);
}
}
}
}
并發 List
簡介
-
和Vector
是兩個線程安全的 List 實作,還有集合線程安全包裝類CopyOnWriteArrayList
進行包裝後的線程安全集合SynchronizedList
- 建議:
-
SynchronizedList
擷取,實作效率比較慢,不建議使用;Collections.synchronizedList()
-
的實作原理是減少鎖競争,讀方法沒有任何鎖操作,是以效率很高(犧牲了寫)。适合讀多寫好的環境。CopyOnWriteArrayList
-
寫的效率比Vector
好,寫多讀少的情況可以考慮CopyOnWriteArrayList
詳解
SynchronizedList
- 常用方法:
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}
}
- 直接是在利用
鎖定了代碼塊,簡單粗暴,其實不建議使用synchronized
CopyOnWriteArrayList
-
實作機制CopyOnWriteArrayList
- 進行寫操作時,複制内部容器,寫入拷貝;讀操作,直接傳回結果。讀寫操作的不同的數組容器
- 利用了對象的不變性,在沒有對對象進行寫操作前,由于對象沒有發生改變,不需要加鎖
- 試圖改變對象時(寫),總是先擷取對象的一個副本,然後對副本進行修改,最後寫回。讀的是容器對象(數組對象),寫的時候操作拷貝對象。
- 減少鎖競争,提高讀的性能,犧牲了寫的性能(寫要加鎖,不然沒辦法保證副本線程安全)
- 常用方法
-
方法:并沒有任何的鎖,是以效率很高get
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}
-
方法: 每一個的add
方法中,都會有一次數組拷貝,并且申請了鎖add
public boolean add(E e) {
final ReentrantLock lock = this.lock; // 加鎖
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 做了一次數組拷貝
newElements[len] = e; // 修改副本
setArray(newElements); // 寫回副本
return true;
} finally {
lock.unlock(); // 解鎖
}
}
- 問題:
- 記憶體占用問題。因為
的寫時複制機制,是以在進行寫操作的時候,記憶體裡會同時駐紮兩份對象的記憶體。CopyOnWrite
- 資料一緻性問題。
容器隻能保證資料的最終一緻性,不能保證資料的實時一緻性。是以如果你希望寫入的的資料,馬上能讀到,請不要使用CopyOnWrite容器CopyOnWrite
Vector
-
方法:使用get
關鍵字,存在鎖競争,效率沒有synchronized
好CopyOnWriteArrayList
public synchronized E get(int index) {
if (index >= elementCount)
throw new ArrayIndexOutOfBoundsException(index);
return elementData(index);
}
-
方法:同樣的實作方式,沒有副本拷貝,效率比add
好CopyOnWriteArrayList
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
并發 Set
-
内部依賴CopyOnWriteArraySet
。特性一緻,适合讀多寫少的并發場景CopyOnWriteArrayList
-
和CopyOnWriteArrayList
這種CopyOnWriteArraySet
類型的容器,更加重要的是要了解其設計實作:CopyOnWrite
- 讀寫分離
- 最終一緻性
- 使用另外開辟空間的思路,來解決并發沖突
- 簡單源碼
public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements java.io.Serializable {
private final CopyOnWriteArrayList<E> al;
public boolean add(E e) {
return al.addIfAbsent(e);
}
}
并發 Map
簡介
- 包裝類:
Collections.synchronizedList(list);
- 并發類:
,性能更好,讀是無鎖的,寫操作的鎖粒度是比較小的(經典的減小鎖粒度設計),整體性能要較好ConcurrentHashMap
ConcurrentHashMap (JDK7、8 設計差别較大)
JDK7源碼
-
方法:put
分段加鎖Segment
@Override
public V put(K key, V value) {
if (value == null) {
throw new NullPointerException();
}
int hash = hashOf(key); // hash 分段
return segmentFor(hash).put(key, hash, value, false);
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); // Segment 分段加鎖
try {
int c = count;
if (c ++ > threshold) { // ensure capacity
int reduced = rehash();
if (reduced > 0) {
count = (c -= reduced) - 1; // write-volatile
}
}
HashEntry<K, V>[] tab = table;
int index = hash & tab.length - 1;
HashEntry<K, V> first = tab[index];
HashEntry<K, V> e = first;
while (e != null && (e.hash != hash || !keyEq(key, e.key()))) {
e = e.next;
}
V oldValue;
if (e != null) {
oldValue = e.value();
if (!onlyIfAbsent) {
e.setValue(value);
}
} else {
oldValue = null;
++ modCount;
tab[index] = newHashEntry(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
-
方法:get
public V get(Object key) {
int hash = hashOf(key);
return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K, V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && keyEq(key, e.key())) {
V opaque = e.value();
if (opaque != null) {
return opaque;
}
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
JDK7設計: 使用 Segment
分段鎖,減小鎖粒度
Segment
- 一個典型的
,如果HashMap
和get
進行同步,如果鎖對象為整個add
,那麼沒有兩個線程是可以真正的并發HashMap
- 使用
分段鎖。Segment
,采用拆分鎖對象的方式提高吞吐量。其将整個ConcurrentHashMap
拆分成若幹個段HashMap
,每段都是一個子的segment
HashMap
- 如果需要在
中增加一個新的資料,并不是将整個ConcurrentHashMap
加鎖,而是先根據hashmap
得到該資料項應該被存到哪個段裡面,然後對該段加鎖。多個線程同時進行hashcode
操作,隻要被加入的資料項不是同一個段put
中,可以真正的并行segment
- 缺點問題:系統需要全局鎖時,其消耗的資源比較大。如 ConcurrentHashMap 的 size 方法,有可能需要每段加鎖加以統計(因為在統計的時候,有可能會有寫入)。JDK7的實作思路:先采用不加鎖的方式,連續計算元素的個數,最多計算3次
- 如果前後兩次計算結果相同,則說明計算出來的元素個數是準确的
- 如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數
public int size() {
final Segment<K, V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to continuous
// async changes in table, resort to locking.
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++ k) { // 先不加鎖算
check = 0;
sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++ i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++ i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
if (check == sum) {
break;
}
}
if (check != sum) { // Resort to locking all segments 前後不一緻,就加鎖算
sum = 0;
for (Segment<K, V> segment: segments) {
segment.lock(); // 每個分段都需要加鎖
}
for (Segment<K, V> segment: segments) {
sum += segment.count;
}
for (Segment<K, V> segment: segments) {
segment.unlock();
}
}
if (sum > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
} else {
return (int) sum;
}
}
JDK8設計
- 取消了
分段鎖,數組+連結清單(紅黑樹)的結構,對于鎖的粒度,調整為對每個數組元素加鎖(Node)Segment
- 定位節點的
算法被簡化了,這樣帶來的弊端是Hash沖突會加劇。是以在連結清單節點數量大于8時,會将連結清單轉化為紅黑樹進行存儲。這樣一來,查詢的時間複雜度就會由原先的O(n)變為O(logN)hash
- 使用
關鍵字,這一點也說明synchronized
在JDK8中優化的程度和synchronized
差不多了ReentrantLock
JDK8源碼
-
方法:沒有加鎖,是以在多線程操作的過程中,并不能完全的保證一緻性get
- 首先定位到table[]中的i
- 若table[i]存在,則繼續查找
- 首先比較連結清單頭部,如果是則傳回
- 然後如果為紅黑樹,查找樹
- 最後再循環連結清單查找
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());// 定位到table[]中的i
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {// 若table[i]存在
if ((eh = e.hash) == h) {// 比較連結清單頭部
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)// 若為紅黑樹,查找樹
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {// 循環連結清單查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;// 未找到
}
-
方法:put
- 參數校驗
- 若table[]未建立,則初始化
- 當table[i]後面無節點時,直接建立Node(無鎖操作)
- 如果目前正在擴容,則幫助擴容并傳回最新table[]
- 然後在連結清單或者紅黑樹中追加節點
- 最後還回去判斷是否到達閥值,如到達變為紅黑樹結構
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 若table[]未建立,則初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // table[i]後面無節點時,直接建立Node(無鎖操作)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 如果目前正在擴容,則幫助擴容并傳回最新table[]
tab = helpTransfer(tab, f);
else { // 在連結清單或者紅黑樹中追加節點
V oldVal = null;
synchronized (f) { // 這裡并沒有使用ReentrantLock,說明synchronized已經足夠優化了
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 如果為連結清單結構
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 找到key,替換value
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 在尾部插入Node
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 如果為紅黑樹
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 到達閥值,變為紅黑樹結構
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
-
方法:size
- 從上面代碼可以看出來,JDK1.8中新增了一個 mappingCount()的API。這個API與size()不同的就是傳回值是Long類型,這樣就不受Integer.MAX_VALUE的大小限制了
- 兩個方法都同時調用了
方法。對于每個table[i]都有一個CounterCell與之對應,上面方法做了求和之後就傳回了。進而可以看出,size() 和 mappingCount() 傳回的都是一個估計值(沒有加鎖,強一緻)sumCount()
- 與JDK1.7裡面的實作不同,1.7裡面使用了加鎖的方式實作。這裡面也可以看出 JDK1.8 犧牲了精度,來換取更高的效率
// 1.2時加入
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
// 1.8加入的API
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
參考
- 源碼位址
- ConcurrentHashMap的JDK1.8實作
- 談談ConcurrentHashMap1.7和1.8的不同實作