天天看點

Java程式設計拾遺『并發容器——CopyOnWrite』

從本片文章開始我們來介紹并發容器類,之前的文章介紹過普通容器類,比如ArrayList、LinkedList等。但是這些容器都有個問題,就是非線程安全,多線程并發寫,會存線上程安全問題,是以Java提供了并發容器類來解決普通容器類的線程安全問題。本篇文章我們先來看一種解決容器類線程安全的思想——CopyOnWrite(寫時複制)。

1. 同步容器

在講CopyOnWrite并發容器之前,先來介紹一下Java中提供的另一種用來解決容器線程安全問題的機制——同步容器。所謂同步容器,就是java.util.Collections類提供的基于普通容器類的包裝類,它将普通容器類的方法都使用synchronized關鍵字重新實作了一遍,進而提供線程安全的容器類。Collections類提供一下擷取同步容器的方法:

public static <T> Collection<T> synchronizedCollection(Collection<T> c)
public static <T> List<T> synchronizedList(List<T> list)
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
           

下面展示一下SynchronizedCollection的部分實作:

static class SynchronizedCollection<E> implements Collection<E> {
    final Collection<E> c;  // Backing Collection
    final Object mutex;     // Object on which to synchronize

    SynchronizedCollection(Collection<E> c) {
        if (c==null)
            throw new NullPointerException();
        this.c = c;
        mutex = this;
    }
    public int size() {
        synchronized (mutex) {return c.size();}
    }
    public boolean add(E e) {
        synchronized (mutex) {return c.add(e);}
    }
    public boolean remove(Object o) {
        synchronized (mutex) {return c.remove(o);}
    }
    //....
}
           

這裡線程安全針對的是容器對象,指的是當多個線程并發通路同一個容器對象時,不需要額外的同步操作,也不會出現錯誤的結果。

加了synchronized,所有方法調用變成了原子操作,用戶端在調用時,是不是就絕對安全了呢?不是的,至少有以下情況需要注意:

  • 複合操作,比如先檢查再更新
  • 僞同步
  • 疊代

1.1 複合操作

public class EnhancedMap <K, V> {
    Map<K, V> map;
    
    public EnhancedMap(Map<K,V> map){
        this.map = Collections.synchronizedMap(map);
    }
    
    public V putIfAbsent(K key, V value){
         V old = map.get(key);
         if(old!=null){
             return old;
         }
         map.put(key, value);
         return null;
     }
    
    public void put(K key, V value){
        map.put(key, value);
    }
    
    //... 其他代碼
}
           

EnhancedMap是一個裝飾類,接受一個Map對象,調用synchronizedMap轉換為了同步容器對象map,增加了一個方法putIfAbsent,該方法隻有在原Map中沒有對應鍵的時候才添加。

map的每個方法都是安全的,但這個複合方法putIfAbsent是安全的嗎?顯然是否定的,這是一個檢查然後再更新的複合操作,在多線程的情況下,可能有多個線程都執行完了檢查這一步,都發現Map中沒有對應的鍵,然後就會都調用put,而這就破壞了putIfAbsent方法期望保持的語義。

1.2 僞同步

那給該方法加上synchronized就能實作安全嗎?如下所示:

public synchronized V putIfAbsent(K key, V value){
    V old = map.get(key);
    if(old!=null){
        return old;
    }
    map.put(key, value);
    return null;
}
           

答案是否定的!為什麼呢?同步錯對象了。putIfAbsent同步使用的是EnhancedMap對象,而其他方法(如代碼中的put方法)使用的是Collections.synchronizedMap傳回的對象map,兩者是不同的對象。要解決這個問題,所有方法必須使用相同的鎖,可以使用EnhancedMap的對象鎖,也可以使用map。使用EnhancedMap對象作為鎖,則EnhancedMap中的所有方法都需要加上synchronized。使用map作為鎖,putIfAbsent方法可以改為:

public V putIfAbsent(K key, V value){
    synchronized(map){
        V old = map.get(key);
         if(old!=null){
             return old;
         }
         map.put(key, value);
         return null;    
    }
}
           

1.3 疊代

對于同步容器對象,雖然單個操作是安全的,但疊代并不是。我們看個例子,建立一個同步List對象,一個線程修改List,另一個周遊,看看會發生什麼,如下:

private static void startModifyThread(final List<String> list) {
    Thread modifyThread = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                list.add("item " + i);
                try {
                    Thread.sleep((int) (Math.random() * 10));
                } catch (InterruptedException e) {
                }
            }
        }
    });
    modifyThread.start();
}

private static void startIteratorThread(final List<String> list) {
    Thread iteratorThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                for (String str : list) {
                }
            }
        }
    });
    iteratorThread.start();
}

public static void main(String[] args) {
    final List<String> list = Collections
            .synchronizedList(new ArrayList<String>());
    startIteratorThread(list);
    startModifyThread(list);
}
           

運作該程式,程式抛出ConcurrentModificationException。

之前介紹容器類的時候,介紹過過這個異常,如果在周遊的同時容器發生了結構性變化,就會抛出該異常,同步容器并沒有解決這個問題。如果要避免這個異常,需要在周遊的時候給整個容器對象加鎖,比如,上面的代碼,startIteratorThread可以改為:

private static void startIteratorThread(final List<String> list) {
    Thread iteratorThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                synchronized(list){
                    for (String str : list) {
                    }    
                }
            }
        }
    });
    iteratorThread.start();
}
           

除了以上這些注意事項,同步容器的性能也是比較低的,當并發通路量比較大的時候性能很差。是以Java中提供了很多并發容器類,比如CopyOnWriteArrayList、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListSet等。本文就先來介紹一下CopyOnWrite(寫時複制)并發容器。

2. CopyOnWriteArrayList

CopyOnWriteArrayList實作了List接口,它的用法與其他List如ArrayList基本是一樣的,差別如下:

  • CopyOnWriteArrayList是線程安全的,可以被多個線程并發通路
  • CopyOnWriteArrayList的疊代器不支援修改操作,但也不會抛出ConcurrentModificationException,而是抛出UnsupportedOperationException
  • CopyOnWriteArrayList以原子方式支援一些複合操作

比如使用CopyOnWriteArrayList就可以解決上面同步容器一邊寫一邊周遊時抛ConcurrentModificationException異常的問題,如下:

public static void main(String[] args) {
    final List<String> list = new CopyOnWriteArrayList<>();
    startIteratorThread(list);
    startModifyThread(list);
}
           

因為CopyOnWriteArrayList實作了List接口,是以可以像使用ArrayList/LinkedList那樣使用CopyOnWriteArrayList,除了List接口提供的方法之外,CopyOnWriteArrayList還提供了一些其它的方法,我們重點關注一下這部分特殊的方法,List接口的方法在這裡就不單獨介紹了。

2.1 addIfAbsent

上面講到同步容器不能以線程安全的方式處理複合操作,CopyOnWriteArrayList提供了線程安全的複合操作方法addIfAbsent。

public boolean addIfAbsent(E e)
           

如果CopyOnWriteArrayList中存在元素e,傳回false。如果CopyOnWriteArrayList中不存在元素e,添加成功後傳回true。

2.2 addAllAbsent

public int addAllAbsent(Collection<? extends E> c)
           

批量添加c中的非重複元素,不存在才添加,傳回實際添加的個數。

2.3 疊代器不支援修改操作

CopyOnWriteArrayList的疊代器支援不可變數組實作,是以不支援修改操作。

public void remove() {
    throw new UnsupportedOperationException();
}

public void set(E e) {
    throw new UnsupportedOperationException();
}

public void add(E e) {
    throw new UnsupportedOperationException();
}
           

上述remove、set、add方法是CopyOnWriteArrayList内部類COWIterator對ListIterator接口的實作,是以CopyOnWriteArrayList疊代器不支援修改操作。是以CopyOnWriteArrayList在一些基于疊代器修改操作的方法中會抛異常,比如Java7及以前的Collections.sort方法:

public static void sort(){
    CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
    list.add("c");
    list.add("a");
    list.add("b");
    Collections.sort(list);
}
           

上述代碼在Java7及以前的版本會抛UnsupportedOperationException,因為Java7中Collections.sort方法的實作為:

public static <T extends Comparable<? super T>> void sort(List<T> list) {
    Object[] a = list.toArray();
    Arrays.sort(a);
    ListIterator<T> i = list.listIterator();
    for (int j=0; j<a.length; j++) {
        i.next();
        i.set((T)a[j]);
    }
}
           

調用了疊代器的set方法,是以在Java7中CopyOnWriteArrayList對象使用Collections.srot方法排序會抛UnsupportedOperationException。但是在Java8及之後的版本就不會報錯了,在Java8中Collections.srot方法實作如下:

public static <T extends Comparable<? super T>> void sort(List<T> list) {
    list.sort(null);
}
           

在Java8中,List接口中新增了default sort方法,實作跟上述Java7中Collections.sort方法實作一緻,但是在List接口的具體實作類如CopyOnWriteArrayList中,重寫了該方法,重寫的排序方法實作不依賴于疊代器:

public void sort(Comparator<? super E> c) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        Object[] newElements = Arrays.copyOf(elements, elements.length);
        @SuppressWarnings("unchecked") E[] es = (E[])newElements;
        Arrays.sort(es, c);
        setArray(newElements);
    } finally {
        lock.unlock();
    }
}
           

2.4 實作原理

CopyOnWriteArrayList的内部是一個數組,跟ArrayList内部的數組不同的是,這個數組是以原子方式被整體更新的。每次修改操作,都會建立一個數組,複制原數組的内容到新數組,在新數組上進行需要的修改,然後以原子方式設定内部的數組引用,這就是寫時拷貝。

所有的讀操作,都是先拿到目前引用的數組,然後直接通路該數組,在讀的過程中,可能内部的數組引用已經被修改了,但不會影響讀操作,它依舊通路原數組内容。

換句話說,數組内容是隻讀的,寫操作都是通過建立數組,然後原子性的修改數組引用來實作的。CopyOnWriteArrayList聲明如下:

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

    /*保護寫操作線程安全的鎖*/
    final transient ReentrantLock lock = new ReentrantLock();

    /*實作并發容器存儲元素的數組*/
    private transient volatile Object[] array;

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        return array;
    }

    /**
     * Sets the array.
     */
    final void setArray(Object[] a) {
        array = a;
    }

    //構造函數

    //methods
 }
           

這裡有幾點需要注意:

  • 内部數組聲明為了volatile,這是必需的,保證記憶體可見性,寫操作更改了之後,讀操作能看到。
  • 隻有兩個方法用來通路/設定該數組,分别位getArray和setArray,像add、set等操作,都是先複制一個數組出來,然後操作新複制的數組,再通過setArray方法将複制的數組指派給array。
  • 讀不需要鎖,可以并行,讀和寫也可以并行,但多個線程不能同時寫,每個寫操作都需要先擷取鎖,CopyOnWriteArrayList内部使用ReentrantLock。

2.4.1 add(E e)

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}
           

add方法是修改操作,整個過程需要被鎖保護,先拿到目前數組elements,然後複制了個長度加1的新數組newElements,在新數組中添加元素,最後調用setArray原子性的修改内部數組引用,這就是“寫時複制”的概念。

2.4.2 indexOf(Object o)

public int indexOf(Object o) {
    Object[] elements = getArray();
    return indexOf(o, elements, 0, elements.length);
}
           
private static int indexOf(Object o, Object[] elements,
                           int index, int fence) {
    if (o == null) {
        for (int i = index; i < fence; i++)
            if (elements[i] == null)
                return i;
    } else {
        for (int i = index; i < fence; i++)
            if (o.equals(elements[i]))
                return i;
    }
    return -1;
}
           

indexOf方法通路的所有資料都是通過參數傳遞進來的,數組内容也不會被修改,不存在并發問題。

2.4.3 疊代器

CopyOnWriteArrayList疊代器是通過内部類COWIterator實作的,如下:

public Iterator<E> iterator() {
    return new COWIterator<E>(getArray(), 0);
}
           

COWIterator是内部類,傳遞給它的是不變的數組,不支援修改,是以在COWIterator内部的remove、set及add方法實作都是直接抛UnsupportedOperationException異常。其它讀方法跟ArrayList内部疊代器實作一緻,都是通過數組索引實作的。

每次修改都建立一個新數組,然後複制所有内容,這聽上去是一個難以令人接受的方案,如果數組比較大,修改操作又比較頻繁,可以想象,CopyOnWriteArrayList的性能是很低的。事實确實如此,CopyOnWriteArrayList不适用于數組很大,且修改頻繁的場景。它是以優化讀操作為目标的,讀不需要同步,性能很高,但在優化讀的同時就犧牲了寫的性能。

之前我們介紹了保證線程安全的兩種思路,一種是鎖,使用synchronized或ReentrantLock,另外一種是循環CAS,寫時拷貝展現了保證線程安全的另一種思路。對于絕大部分通路都是讀,且有大量并發線程要求讀,隻有個别線程進行寫,且隻是偶爾寫的場合,這種寫時拷貝就是一種很好的解決方案。

3. CopyOnWriteArraySet

CopyOnWriteArraySet跟HashSet的實作思想是不同的,HashSet依賴于HashMap實作,而CopyOnWriteArraySet并沒有使用這一思想,而是依賴CopyOnWriteArrayList實作,如下:

public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {

    private final CopyOnWriteArrayList<E> al;

    //構造函數
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }

    public CopyOnWriteArraySet(Collection<? extends E> c) {
        if (c.getClass() == CopyOnWriteArraySet.class) {
            @SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
                (CopyOnWriteArraySet<E>)c;
            al = new CopyOnWriteArrayList<E>(cc.al);
        }
        else {
            al = new CopyOnWriteArrayList<E>();
            al.addAllAbsent(c);
        }
    }

    //methods

}
           

CopyOnWriteArraySet實作了Set接口,持有一個CopyOnWriteArrayList的成員變量,依賴CopyOnWriteArrayList提供的addIfAbsent和addAllAbsent實作Set中元素的唯一性。

3.1 方法說明

3.1.1 add(E e)

public boolean add(E e) {
    return al.addIfAbsent(e);
}
           

調用了CopyOnWriteArrayList的addIfAbsent方法,保證線程安全和Set集合中元素的唯一性。

3.1.2 contains(Object o)

public boolean contains(Object o) {
    return al.contains(o);
}
           

調用了CopyOnWriteArrayList的contains方法。

由于CopyOnWriteArraySet是基于CopyOnWriteArrayList實作的,是以與之前介紹過的Set的實作類如HashSet/TreeSet相比,它的性能比較低,不适用于元素個數特别多的集合。如果元素個數比較多,可以考慮ConcurrentHashMap或ConcurrentSkipListSet,這兩個類,會在之後的文章介紹。

ConcurrentHashMap與HashMap類似,适用于不要求排序的場景,ConcurrentSkipListSet與TreeSet類似,适用于要求排序的場景。Java并發包中沒有與HashSet對應的并發容器,但可以很容易的基于ConcurrentHashMap建構一個,利用Collections.newSetFromMap方法即可。

參考連結:

1. 《Java程式設計的邏輯》

2. Java API