天天看點

JDK多線程基礎(12):并發資料結構(集合)

文章目錄

    • ConcurrentModificationException 異常
    • 并發 List
      • 簡介
      • 詳解
        • SynchronizedList
        • CopyOnWriteArrayList
        • Vector
    • 并發 Set
    • 并發 Map
      • 簡介
      • ConcurrentHashMap (JDK7、8 設計差别較大)
        • JDK7源碼
        • JDK7設計: 使用`Segment`分段鎖,減小鎖粒度
        • JDK8設計
        • JDK8源碼
    • 參考

ConcurrentModificationException 異常

  1. java.util

    包下日常使用的大部分集合都是線程不安全的,如

    ArrayList

    HashMap

  2. 簡單的例子:

    ConcurrentModificationException

    并發修改異常
public class ConcurrentModificationExceptionTest {

    public static void main(String[] args) {
        ArrayList<String> list = new ArrayList<>();
        list.add("張三");
        list.add("李四");
        list.add("王五");
        
        Iterator<String> iterator = list.iterator();
        //疊代集合
        while (iterator.hasNext()) {
            String next = iterator.next();
            if ("張三".equals(next)) {
                // 在集合周遊的時候,不能由其他的線程對其内部元素做修改
//                list.remove(next);
                list.add("趙六");
            } else {
                System.out.println(next);
            }
        }
    }
}

           

并發 List

簡介

  1. Vector

    CopyOnWriteArrayList

    是兩個線程安全的 List 實作,還有集合線程安全包裝類

    SynchronizedList

    進行包裝後的線程安全集合
  2. 建議:
  • SynchronizedList

    Collections.synchronizedList()

    擷取,實作效率比較慢,不建議使用;
  • CopyOnWriteArrayList

    的實作原理是減少鎖競争,讀方法沒有任何鎖操作,是以效率很高(犧牲了寫)。适合讀多寫好的環境。
  • Vector

    寫的效率比

    CopyOnWriteArrayList

    好,寫多讀少的情況可以考慮

詳解

SynchronizedList

  1. 常用方法:
public E get(int index) {
    synchronized (mutex) {return list.get(index);}
}

public E set(int index, E element) {
    synchronized (mutex) {return list.set(index, element);}
}
           
  1. 直接是在利用

    synchronized

    鎖定了代碼塊,簡單粗暴,其實不建議使用

CopyOnWriteArrayList

  1. CopyOnWriteArrayList

    實作機制
  • 進行寫操作時,複制内部容器,寫入拷貝;讀操作,直接傳回結果。讀寫操作的不同的數組容器
  • 利用了對象的不變性,在沒有對對象進行寫操作前,由于對象沒有發生改變,不需要加鎖
  • 試圖改變對象時(寫),總是先擷取對象的一個副本,然後對副本進行修改,最後寫回。讀的是容器對象(數組對象),寫的時候操作拷貝對象。
  • 減少鎖競争,提高讀的性能,犧牲了寫的性能(寫要加鎖,不然沒辦法保證副本線程安全)
  1. 常用方法
  • get

    方法:并沒有任何的鎖,是以效率很高
public E get(int index) {
    return get(getArray(), index);
}

private E get(Object[] a, int index) {
    return (E) a[index];
}
           
  • add

    方法: 每一個的

    add

    方法中,都會有一次數組拷貝,并且申請了鎖
public boolean add(E e) {
    final ReentrantLock lock = this.lock; // 加鎖
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1); // 做了一次數組拷貝
        newElements[len] = e;   // 修改副本
        setArray(newElements);  // 寫回副本
        return true;
    } finally {
        lock.unlock();  // 解鎖
    }
}
           
  1. 問題:
  • 記憶體占用問題。因為

    CopyOnWrite

    的寫時複制機制,是以在進行寫操作的時候,記憶體裡會同時駐紮兩份對象的記憶體。
  • 資料一緻性問題。

    CopyOnWrite

    容器隻能保證資料的最終一緻性,不能保證資料的實時一緻性。是以如果你希望寫入的的資料,馬上能讀到,請不要使用CopyOnWrite容器

Vector

  1. get

    方法:使用

    synchronized

    關鍵字,存在鎖競争,效率沒有

    CopyOnWriteArrayList

public synchronized E get(int index) {
    if (index >= elementCount)
        throw new ArrayIndexOutOfBoundsException(index);

    return elementData(index);
}
           
  1. add

    方法:同樣的實作方式,沒有副本拷貝,效率比

    CopyOnWriteArrayList

public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}
           

并發 Set

  1. CopyOnWriteArraySet

    内部依賴

    CopyOnWriteArrayList

    。特性一緻,适合讀多寫少的并發場景
  2. CopyOnWriteArrayList

    CopyOnWriteArraySet

    這種

    CopyOnWrite

    類型的容器,更加重要的是要了解其設計實作:
  • 讀寫分離
  • 最終一緻性
  • 使用另外開辟空間的思路,來解決并發沖突
  1. 簡單源碼
public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements java.io.Serializable {
    
    private final CopyOnWriteArrayList<E> al;
    
    public boolean add(E e) {
        return al.addIfAbsent(e);
    }
}
           

并發 Map

簡介

  1. 包裝類:

    Collections.synchronizedList(list);

  2. 并發類:

    ConcurrentHashMap

    ,性能更好,讀是無鎖的,寫操作的鎖粒度是比較小的(經典的減小鎖粒度設計),整體性能要較好

ConcurrentHashMap (JDK7、8 設計差别較大)

JDK7源碼

  1. put

    方法:

    Segment

    分段加鎖
@Override
public V put(K key, V value) {
    if (value == null) {
        throw new NullPointerException();
    }
    int hash = hashOf(key); // hash 分段
    return segmentFor(hash).put(key, hash, value, false);
}

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock(); // Segment 分段加鎖
    try {
        int c = count;
        if (c ++ > threshold) { // ensure capacity
            int reduced = rehash();
            if (reduced > 0) {
                count = (c -= reduced) - 1; // write-volatile
            }
        }

        HashEntry<K, V>[] tab = table;
        int index = hash & tab.length - 1;
        HashEntry<K, V> first = tab[index];
        HashEntry<K, V> e = first;
        while (e != null && (e.hash != hash || !keyEq(key, e.key()))) {
            e = e.next;
        }

        V oldValue;
        if (e != null) {
            oldValue = e.value();
            if (!onlyIfAbsent) {
                e.setValue(value);
            }
        } else {
            oldValue = null;
            ++ modCount;
            tab[index] = newHashEntry(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

           
  1. get

    方法:
public V get(Object key) {
    int hash = hashOf(key);
    return segmentFor(hash).get(key, hash);
}

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K, V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && keyEq(key, e.key())) {
                V opaque = e.value();
                if (opaque != null) {
                    return opaque;
                }

                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}
           

JDK7設計: 使用

Segment

分段鎖,減小鎖粒度

JDK多線程基礎(12):并發資料結構(集合)
  1. 一個典型的

    HashMap

    ,如果

    get

    add

    進行同步,如果鎖對象為整個

    HashMap

    ,那麼沒有兩個線程是可以真正的并發
  2. 使用

    Segment

    分段鎖。

    ConcurrentHashMap

    ,采用拆分鎖對象的方式提高吞吐量。其将整個

    HashMap

    拆分成若幹個段

    segment

    ,每段都是一個子的

    HashMap

  3. 如果需要在

    ConcurrentHashMap

    中增加一個新的資料,并不是将整個

    hashmap

    加鎖,而是先根據

    hashcode

    得到該資料項應該被存到哪個段裡面,然後對該段加鎖。多個線程同時進行

    put

    操作,隻要被加入的資料項不是同一個段

    segment

    中,可以真正的并行
  4. 缺點問題:系統需要全局鎖時,其消耗的資源比較大。如 ConcurrentHashMap 的 size 方法,有可能需要每段加鎖加以統計(因為在統計的時候,有可能會有寫入)。JDK7的實作思路:先采用不加鎖的方式,連續計算元素的個數,最多計算3次
  • 如果前後兩次計算結果相同,則說明計算出來的元素個數是準确的
  • 如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數
public int size() {
        final Segment<K, V>[] segments = this.segments;
        long sum = 0;
        long check = 0;
        int[] mc = new int[segments.length];
        // Try a few times to get accurate count. On failure due to continuous
        // async changes in table, resort to locking.
        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++ k) { // 先不加鎖算
            check = 0;
            sum = 0;
            int mcsum = 0;
            for (int i = 0; i < segments.length; ++ i) {
                sum += segments[i].count;
                mcsum += mc[i] = segments[i].modCount;
            }
            if (mcsum != 0) {
                for (int i = 0; i < segments.length; ++ i) {
                    check += segments[i].count;
                    if (mc[i] != segments[i].modCount) {
                        check = -1; // force retry
                        break;
                    }
                }
            }
            if (check == sum) {
                break;
            }
        }
        if (check != sum) { // Resort to locking all segments 前後不一緻,就加鎖算
            sum = 0;
            for (Segment<K, V> segment: segments) {
                segment.lock(); // 每個分段都需要加鎖
            }
            for (Segment<K, V> segment: segments) {
                sum += segment.count;
            }
            for (Segment<K, V> segment: segments) {
                segment.unlock();
            }
        }
        if (sum > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        } else {
            return (int) sum;
        }
    }
           

JDK8設計

JDK多線程基礎(12):并發資料結構(集合)
  1. 取消了

    Segment

    分段鎖,數組+連結清單(紅黑樹)的結構,對于鎖的粒度,調整為對每個數組元素加鎖(Node)
  2. 定位節點的

    hash

    算法被簡化了,這樣帶來的弊端是Hash沖突會加劇。是以在連結清單節點數量大于8時,會将連結清單轉化為紅黑樹進行存儲。這樣一來,查詢的時間複雜度就會由原先的O(n)變為O(logN)
  3. 使用

    synchronized

    關鍵字,這一點也說明

    synchronized

    在JDK8中優化的程度和

    ReentrantLock

    差不多了

JDK8源碼

  1. get

    方法:沒有加鎖,是以在多線程操作的過程中,并不能完全的保證一緻性
  • 首先定位到table[]中的i
  • 若table[i]存在,則繼續查找
  • 首先比較連結清單頭部,如果是則傳回
  • 然後如果為紅黑樹,查找樹
  • 最後再循環連結清單查找
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());// 定位到table[]中的i
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {// 若table[i]存在
        if ((eh = e.hash) == h) {// 比較連結清單頭部
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)// 若為紅黑樹,查找樹
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {// 循環連結清單查找
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;// 未找到
}

           
  1. put

    方法:
  • 參數校驗
  • 若table[]未建立,則初始化
  • 當table[i]後面無節點時,直接建立Node(無鎖操作)
  • 如果目前正在擴容,則幫助擴容并傳回最新table[]
  • 然後在連結清單或者紅黑樹中追加節點
  • 最後還回去判斷是否到達閥值,如到達變為紅黑樹結構
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();  // 若table[]未建立,則初始化
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    // table[i]後面無節點時,直接建立Node(無鎖操作)
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)    // 如果目前正在擴容,則幫助擴容并傳回最新table[]
            tab = helpTransfer(tab, f);
        else {  // 在連結清單或者紅黑樹中追加節點
            V oldVal = null;
            synchronized (f) {  // 這裡并沒有使用ReentrantLock,說明synchronized已經足夠優化了
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {  // 如果為連結清單結構
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) { // 找到key,替換value
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) { // 在尾部插入Node
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {    // 如果為紅黑樹
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)  // 到達閥值,變為紅黑樹結構
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
           
  1. size

    方法:
  • 從上面代碼可以看出來,JDK1.8中新增了一個 mappingCount()的API。這個API與size()不同的就是傳回值是Long類型,這樣就不受Integer.MAX_VALUE的大小限制了
  • 兩個方法都同時調用了

    sumCount()

    方法。對于每個table[i]都有一個CounterCell與之對應,上面方法做了求和之後就傳回了。進而可以看出,size() 和 mappingCount() 傳回的都是一個估計值(沒有加鎖,強一緻)
  • 與JDK1.7裡面的實作不同,1.7裡面使用了加鎖的方式實作。這裡面也可以看出 JDK1.8 犧牲了精度,來換取更高的效率
// 1.2時加入
    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    // 1.8加入的API
    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }
 
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

           

參考

  1. 源碼位址
  2. ConcurrentHashMap的JDK1.8實作
  3. 談談ConcurrentHashMap1.7和1.8的不同實作
JDK多線程基礎(12):并發資料結構(集合)

繼續閱讀