從本片文章開始我們來介紹并發容器類,之前的文章介紹過普通容器類,比如ArrayList、LinkedList等。但是這些容器都有個問題,就是非線程安全,多線程并發寫,會存線上程安全問題,是以Java提供了并發容器類來解決普通容器類的線程安全問題。本篇文章我們先來看一種解決容器類線程安全的思想——CopyOnWrite(寫時複制)。
1. 同步容器
在講CopyOnWrite并發容器之前,先來介紹一下Java中提供的另一種用來解決容器線程安全問題的機制——同步容器。所謂同步容器,就是java.util.Collections類提供的基于普通容器類的包裝類,它将普通容器類的方法都使用synchronized關鍵字重新實作了一遍,進而提供線程安全的容器類。Collections類提供一下擷取同步容器的方法:
public static <T> Collection<T> synchronizedCollection(Collection<T> c)
public static <T> List<T> synchronizedList(List<T> list)
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
下面展示一下SynchronizedCollection的部分實作:
static class SynchronizedCollection<E> implements Collection<E> {
final Collection<E> c; // Backing Collection
final Object mutex; // Object on which to synchronize
SynchronizedCollection(Collection<E> c) {
if (c==null)
throw new NullPointerException();
this.c = c;
mutex = this;
}
public int size() {
synchronized (mutex) {return c.size();}
}
public boolean add(E e) {
synchronized (mutex) {return c.add(e);}
}
public boolean remove(Object o) {
synchronized (mutex) {return c.remove(o);}
}
//....
}
這裡線程安全針對的是容器對象,指的是當多個線程并發通路同一個容器對象時,不需要額外的同步操作,也不會出現錯誤的結果。
加了synchronized,所有方法調用變成了原子操作,用戶端在調用時,是不是就絕對安全了呢?不是的,至少有以下情況需要注意:
- 複合操作,比如先檢查再更新
- 僞同步
- 疊代
1.1 複合操作
public class EnhancedMap <K, V> {
Map<K, V> map;
public EnhancedMap(Map<K,V> map){
this.map = Collections.synchronizedMap(map);
}
public V putIfAbsent(K key, V value){
V old = map.get(key);
if(old!=null){
return old;
}
map.put(key, value);
return null;
}
public void put(K key, V value){
map.put(key, value);
}
//... 其他代碼
}
EnhancedMap是一個裝飾類,接受一個Map對象,調用synchronizedMap轉換為了同步容器對象map,增加了一個方法putIfAbsent,該方法隻有在原Map中沒有對應鍵的時候才添加。
map的每個方法都是安全的,但這個複合方法putIfAbsent是安全的嗎?顯然是否定的,這是一個檢查然後再更新的複合操作,在多線程的情況下,可能有多個線程都執行完了檢查這一步,都發現Map中沒有對應的鍵,然後就會都調用put,而這就破壞了putIfAbsent方法期望保持的語義。
1.2 僞同步
那給該方法加上synchronized就能實作安全嗎?如下所示:
public synchronized V putIfAbsent(K key, V value){
V old = map.get(key);
if(old!=null){
return old;
}
map.put(key, value);
return null;
}
答案是否定的!為什麼呢?同步錯對象了。putIfAbsent同步使用的是EnhancedMap對象,而其他方法(如代碼中的put方法)使用的是Collections.synchronizedMap傳回的對象map,兩者是不同的對象。要解決這個問題,所有方法必須使用相同的鎖,可以使用EnhancedMap的對象鎖,也可以使用map。使用EnhancedMap對象作為鎖,則EnhancedMap中的所有方法都需要加上synchronized。使用map作為鎖,putIfAbsent方法可以改為:
public V putIfAbsent(K key, V value){
synchronized(map){
V old = map.get(key);
if(old!=null){
return old;
}
map.put(key, value);
return null;
}
}
1.3 疊代
對于同步容器對象,雖然單個操作是安全的,但疊代并不是。我們看個例子,建立一個同步List對象,一個線程修改List,另一個周遊,看看會發生什麼,如下:
private static void startModifyThread(final List<String> list) {
Thread modifyThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
list.add("item " + i);
try {
Thread.sleep((int) (Math.random() * 10));
} catch (InterruptedException e) {
}
}
}
});
modifyThread.start();
}
private static void startIteratorThread(final List<String> list) {
Thread iteratorThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
for (String str : list) {
}
}
}
});
iteratorThread.start();
}
public static void main(String[] args) {
final List<String> list = Collections
.synchronizedList(new ArrayList<String>());
startIteratorThread(list);
startModifyThread(list);
}
運作該程式,程式抛出ConcurrentModificationException。
之前介紹容器類的時候,介紹過過這個異常,如果在周遊的同時容器發生了結構性變化,就會抛出該異常,同步容器并沒有解決這個問題。如果要避免這個異常,需要在周遊的時候給整個容器對象加鎖,比如,上面的代碼,startIteratorThread可以改為:
private static void startIteratorThread(final List<String> list) {
Thread iteratorThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
synchronized(list){
for (String str : list) {
}
}
}
}
});
iteratorThread.start();
}
除了以上這些注意事項,同步容器的性能也是比較低的,當并發通路量比較大的時候性能很差。是以Java中提供了很多并發容器類,比如CopyOnWriteArrayList、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListSet等。本文就先來介紹一下CopyOnWrite(寫時複制)并發容器。
2. CopyOnWriteArrayList
CopyOnWriteArrayList實作了List接口,它的用法與其他List如ArrayList基本是一樣的,差別如下:
- CopyOnWriteArrayList是線程安全的,可以被多個線程并發通路
- CopyOnWriteArrayList的疊代器不支援修改操作,但也不會抛出ConcurrentModificationException,而是抛出UnsupportedOperationException
- CopyOnWriteArrayList以原子方式支援一些複合操作
比如使用CopyOnWriteArrayList就可以解決上面同步容器一邊寫一邊周遊時抛ConcurrentModificationException異常的問題,如下:
public static void main(String[] args) {
final List<String> list = new CopyOnWriteArrayList<>();
startIteratorThread(list);
startModifyThread(list);
}
因為CopyOnWriteArrayList實作了List接口,是以可以像使用ArrayList/LinkedList那樣使用CopyOnWriteArrayList,除了List接口提供的方法之外,CopyOnWriteArrayList還提供了一些其它的方法,我們重點關注一下這部分特殊的方法,List接口的方法在這裡就不單獨介紹了。
2.1 addIfAbsent
上面講到同步容器不能以線程安全的方式處理複合操作,CopyOnWriteArrayList提供了線程安全的複合操作方法addIfAbsent。
public boolean addIfAbsent(E e)
如果CopyOnWriteArrayList中存在元素e,傳回false。如果CopyOnWriteArrayList中不存在元素e,添加成功後傳回true。
2.2 addAllAbsent
public int addAllAbsent(Collection<? extends E> c)
批量添加c中的非重複元素,不存在才添加,傳回實際添加的個數。
2.3 疊代器不支援修改操作
CopyOnWriteArrayList的疊代器支援不可變數組實作,是以不支援修改操作。
public void remove() {
throw new UnsupportedOperationException();
}
public void set(E e) {
throw new UnsupportedOperationException();
}
public void add(E e) {
throw new UnsupportedOperationException();
}
上述remove、set、add方法是CopyOnWriteArrayList内部類COWIterator對ListIterator接口的實作,是以CopyOnWriteArrayList疊代器不支援修改操作。是以CopyOnWriteArrayList在一些基于疊代器修改操作的方法中會抛異常,比如Java7及以前的Collections.sort方法:
public static void sort(){
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("c");
list.add("a");
list.add("b");
Collections.sort(list);
}
上述代碼在Java7及以前的版本會抛UnsupportedOperationException,因為Java7中Collections.sort方法的實作為:
public static <T extends Comparable<? super T>> void sort(List<T> list) {
Object[] a = list.toArray();
Arrays.sort(a);
ListIterator<T> i = list.listIterator();
for (int j=0; j<a.length; j++) {
i.next();
i.set((T)a[j]);
}
}
調用了疊代器的set方法,是以在Java7中CopyOnWriteArrayList對象使用Collections.srot方法排序會抛UnsupportedOperationException。但是在Java8及之後的版本就不會報錯了,在Java8中Collections.srot方法實作如下:
public static <T extends Comparable<? super T>> void sort(List<T> list) {
list.sort(null);
}
在Java8中,List接口中新增了default sort方法,實作跟上述Java7中Collections.sort方法實作一緻,但是在List接口的具體實作類如CopyOnWriteArrayList中,重寫了該方法,重寫的排序方法實作不依賴于疊代器:
public void sort(Comparator<? super E> c) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
Object[] newElements = Arrays.copyOf(elements, elements.length);
@SuppressWarnings("unchecked") E[] es = (E[])newElements;
Arrays.sort(es, c);
setArray(newElements);
} finally {
lock.unlock();
}
}
2.4 實作原理
CopyOnWriteArrayList的内部是一個數組,跟ArrayList内部的數組不同的是,這個數組是以原子方式被整體更新的。每次修改操作,都會建立一個數組,複制原數組的内容到新數組,在新數組上進行需要的修改,然後以原子方式設定内部的數組引用,這就是寫時拷貝。
所有的讀操作,都是先拿到目前引用的數組,然後直接通路該數組,在讀的過程中,可能内部的數組引用已經被修改了,但不會影響讀操作,它依舊通路原數組内容。
換句話說,數組内容是隻讀的,寫操作都是通過建立數組,然後原子性的修改數組引用來實作的。CopyOnWriteArrayList聲明如下:
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/*保護寫操作線程安全的鎖*/
final transient ReentrantLock lock = new ReentrantLock();
/*實作并發容器存儲元素的數組*/
private transient volatile Object[] array;
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}
/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}
//構造函數
//methods
}
這裡有幾點需要注意:
- 内部數組聲明為了volatile,這是必需的,保證記憶體可見性,寫操作更改了之後,讀操作能看到。
- 隻有兩個方法用來通路/設定該數組,分别位getArray和setArray,像add、set等操作,都是先複制一個數組出來,然後操作新複制的數組,再通過setArray方法将複制的數組指派給array。
- 讀不需要鎖,可以并行,讀和寫也可以并行,但多個線程不能同時寫,每個寫操作都需要先擷取鎖,CopyOnWriteArrayList内部使用ReentrantLock。
2.4.1 add(E e)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
add方法是修改操作,整個過程需要被鎖保護,先拿到目前數組elements,然後複制了個長度加1的新數組newElements,在新數組中添加元素,最後調用setArray原子性的修改内部數組引用,這就是“寫時複制”的概念。
2.4.2 indexOf(Object o)
public int indexOf(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length);
}
private static int indexOf(Object o, Object[] elements,
int index, int fence) {
if (o == null) {
for (int i = index; i < fence; i++)
if (elements[i] == null)
return i;
} else {
for (int i = index; i < fence; i++)
if (o.equals(elements[i]))
return i;
}
return -1;
}
indexOf方法通路的所有資料都是通過參數傳遞進來的,數組内容也不會被修改,不存在并發問題。
2.4.3 疊代器
CopyOnWriteArrayList疊代器是通過内部類COWIterator實作的,如下:
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
COWIterator是内部類,傳遞給它的是不變的數組,不支援修改,是以在COWIterator内部的remove、set及add方法實作都是直接抛UnsupportedOperationException異常。其它讀方法跟ArrayList内部疊代器實作一緻,都是通過數組索引實作的。
每次修改都建立一個新數組,然後複制所有内容,這聽上去是一個難以令人接受的方案,如果數組比較大,修改操作又比較頻繁,可以想象,CopyOnWriteArrayList的性能是很低的。事實确實如此,CopyOnWriteArrayList不适用于數組很大,且修改頻繁的場景。它是以優化讀操作為目标的,讀不需要同步,性能很高,但在優化讀的同時就犧牲了寫的性能。
之前我們介紹了保證線程安全的兩種思路,一種是鎖,使用synchronized或ReentrantLock,另外一種是循環CAS,寫時拷貝展現了保證線程安全的另一種思路。對于絕大部分通路都是讀,且有大量并發線程要求讀,隻有個别線程進行寫,且隻是偶爾寫的場合,這種寫時拷貝就是一種很好的解決方案。
3. CopyOnWriteArraySet
CopyOnWriteArraySet跟HashSet的實作思想是不同的,HashSet依賴于HashMap實作,而CopyOnWriteArraySet并沒有使用這一思想,而是依賴CopyOnWriteArrayList實作,如下:
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
private final CopyOnWriteArrayList<E> al;
//構造函數
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
public CopyOnWriteArraySet(Collection<? extends E> c) {
if (c.getClass() == CopyOnWriteArraySet.class) {
@SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
(CopyOnWriteArraySet<E>)c;
al = new CopyOnWriteArrayList<E>(cc.al);
}
else {
al = new CopyOnWriteArrayList<E>();
al.addAllAbsent(c);
}
}
//methods
}
CopyOnWriteArraySet實作了Set接口,持有一個CopyOnWriteArrayList的成員變量,依賴CopyOnWriteArrayList提供的addIfAbsent和addAllAbsent實作Set中元素的唯一性。
3.1 方法說明
3.1.1 add(E e)
public boolean add(E e) {
return al.addIfAbsent(e);
}
調用了CopyOnWriteArrayList的addIfAbsent方法,保證線程安全和Set集合中元素的唯一性。
3.1.2 contains(Object o)
public boolean contains(Object o) {
return al.contains(o);
}
調用了CopyOnWriteArrayList的contains方法。
由于CopyOnWriteArraySet是基于CopyOnWriteArrayList實作的,是以與之前介紹過的Set的實作類如HashSet/TreeSet相比,它的性能比較低,不适用于元素個數特别多的集合。如果元素個數比較多,可以考慮ConcurrentHashMap或ConcurrentSkipListSet,這兩個類,會在之後的文章介紹。
ConcurrentHashMap與HashMap類似,适用于不要求排序的場景,ConcurrentSkipListSet與TreeSet類似,适用于要求排序的場景。Java并發包中沒有與HashSet對應的并發容器,但可以很容易的基于ConcurrentHashMap建構一個,利用Collections.newSetFromMap方法即可。
參考連結:
1. 《Java程式設計的邏輯》
2. Java API