來源:saymagic
當當優化最後幾天: 申請到當當一些IT書籍的優惠碼
多線程程式設計一直是老生常談的問題,在Java中,随着JDK的逐漸發展,JDK提供給我們的并發模型也越來越多,本文摘取三例使用不同原理的模型,分析其大緻原理。
COW之CopyOnWriteArrayList
cow是copy-on-write的簡寫,這種模型來源于linux系統fork指令,Java中一種使用cow模型來實作的并發類是CopyOnWriteArrayList。相比于Vector,它的讀操作是無需加鎖的:
public E get(int index) {
return (E) elements[index];
}
之是以有如此神奇功效,其采取的是空間換取時間的方法,檢視其add方法:
public synchronized boolean add(E e) {
Object[] newElements = new Object[elements.length + 1];
System.arraycopy(elements, 0, newElements, 0, elements.length);
newElements[elements.length] = e;
elements = newElements;
return true;
我們注意到,CopyOnWriteArrayList的add方法是需要加鎖的,但其内部并沒有直接對elements數組做操作,而是先copy一份目前的資料到一個新的數組,然後對新的數組進行指派操作。這樣做就讓get操作從同步中解脫出來。因為更改的資料并沒有發生在get所需的數組中。而是放生在新生成的副本中,是以不需要同步。但應該注意的是,盡管如此,get操作還是可能會讀取到髒資料的。
CopyOnWriteArrayList的另一特點是允許多線程周遊,且其它線程更改資料并不會導緻周遊線程抛出ConcurrentModificationException 異常,來看下iterator(),
public Iterator<E> iterator() {
Object[] snapshot = elements;
return new CowIterator<E>(snapshot, 0, snapshot.length);
這個CowIterator 是 ListIterator的子類,這個Iterator的特點是它并不支援對資料的更改操作:
public void add(E object) {
throw new UnsupportedOperationException();
public void remove() {
throw new UnsupportedOperationException();
public void set(E object) {
這樣做的原因也很容易了解,我們可以簡單地的認為CowIterator中的snapshot是不可變數組,因為list中有資料更新都會生成新數組,而不會改變snapshot, 是以此時Iterator沒辦法再将更改的資料寫回list了。同理,list資料有更新也不會反映在CowIterator中。CowIterator隻是保證其疊代過程不會發生異常。
CAS之ConcurrentHashMap(JDK1.8)
CAS是Compare and Swap的簡寫,即比較與替換,CAS造作将比較和替換封裝為一組原子操作,不會被外部打斷。這種原子操作的保證往往由處理器層面提供支援。
在Java中有一個非常神奇的Unsafe類來對CAS提供語言層面的接口。但類如其名,此等神器如果使用不當,會造成武功盡失的,是以Unsafe不對外開放,想使用的話需要通過反射等技巧。這裡不對其做展開。介紹它的原因是因為它是JDK1.8中ConcurrentHashMap的實作基礎。
ConcurrentHashMap與HashMap對資料的存儲有着相似的地方,都采用數組+連結清單+紅黑樹的方式。基本邏輯是内部使用Node來儲存map中的一項key, value結構,對于hash不沖突的key,使用數組來儲存Node資料,而每一項Node都是一個連結清單,用來儲存hash沖突的Node,當連結清單的大小達到一定程度會轉為紅黑樹,這樣會使在沖突資料較多時也會有比較好的查詢效率。
了解了ConcurrentHashMap的存儲結構後,我們來看下在這種結構下,ConcurrentHashMap是如何實作高效的并發操作,這得益于ConcurrentHashMap中的如下三個函數。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putOrderedObject(tab, ((long)i << ASHIFT) + ABASE, v);
其中的U就是我們前文提到的Unsafe的一個執行個體,這三個函數都通過Unsafe的幾個方法保證了是原子性:
- tabAt作用是傳回tab數組第i項
- casTabAt函數是對比tab第i項是否與c相等,相等的話将其設定為v。
- setTabAt将tab的第i項設定為v
有了這三個函數就可以保證ConcurrentHashMap的線程安全嗎?并不是的,ConcurrentHashMap内部也使用比較多的synchronized,不過與HashTable這種對所有操作都使用synchronized不同,ConcurrentHashMap隻在特定的情況下使用synchronized,來較少鎖的定的區域。來看下putVal方法(精簡版):
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to embin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
....
}
}
addCount(1L, binCount);
return null;
整個put流程大緻如下:
- 判斷key與value是否為空,為空抛異常
- 計算kek的hash值,然後進入死循環,一般來講,caw算法與死循環是搭檔。
- 判斷table是否初始化,未初始化進行初始化操作
- Node在table中的目标位置是否為空,為空的話使用caw操作進行指派,當然,這種指派是有可能失敗的,是以前面的死循環發揮了重試的作用。
- 如果目前正在擴容,則嘗試協助其擴容,死循環再次發揮了重試的作用,有趣的是ConcurrentHashMap是可以多線程同時擴容的。這裡說協助的原因在于,對于數組擴容,一般分為兩步:1.建立一個更大的數組;2.将原數組資料copy到新數組中。對于第一步,ConcurrentHashMap通過CAW來控制一個int變量保證建立數組這一步隻會執行一次。對于第二步,ConcurrentHashMap采用CAW + synchronized + 移動後标記 的方式來達到多線程擴容的目的。感興趣可以檢視transfer函數。
- 最後的一個else分支,黑科技的流程已嘗試無效,目标Node已經存在值,隻能鎖住目前Node來進行put操作,當然,這裡省略了很多代碼,包括連結清單轉紅黑樹的操作等等。
相比于put,get的代碼更好了解一下:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
- 檢查表是否為空
- 擷取key的hash h,擷取key在table中對應的Node e
- 判斷Node e的第一項是否與預期的Node相等,相等話, 則傳回e.val
- 如果e.hash < 0, 說明e為紅黑樹,調用e的find接口來進行查找。
- 走到這一步,e為連結清單無疑,且第一項不是需要查詢的資料,一直調用next來進行查找即可。
讀寫分離之LinkedBlockingQueue
還有一種實作線程安全的方式是通過将讀寫進行分離,這種方式的一種實作是LinkedBlockingQueue。LinkedBlockingQueue整體設計的也十分精巧,它的全局變量分為三類:
- final 型
- Atomic 型
- 普通變量
final型變量由于聲明後就不會被修改,是以自然線程安全,Atomic型内部采用了cas模型來保證線程安全。對于普通型變量,LinkedBlockingQueue中隻包含head與last兩個表示隊列的頭與尾。并且私有,外部無法更改,是以,LinkedBlockingQueue隻需要保證head與last的安全即可保證真個隊列的線程安全。并且LinkedBlockingQueue屬于FIFO型隊列,一般情況下,讀寫會在不同元素上工作,是以, LinkedBlockingQueue定義了兩個可重入鎖,巧妙的通過對head與last分别加鎖,實作讀寫分離,來實作良好的安全并發特性:
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
首先看下它的offer 方法:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
if (c == 0)
signalNotEmpty();
return c >= 0;
可見,在對隊列進行添加元素時,隻需要對putLock進行加鎖即可,保證同一時刻隻有一個線程可以對last進行插入。同樣的,在從隊列進行提取元素時,也隻需要擷取takeLock鎖來對head操作即可:
public E poll() {
if (count.get() == 0)
return null;
E x = null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
takeLock.unlock();
if (c == capacity)
signalNotFull();
return x;
LinkedBlockingQueue整體還是比較好了解的,但有幾個點需要特殊注意:
- LinkedBlockingQueue是一個阻塞隊列,當隊列無元素為空時,所有取元素的線程會通過notEmpty 的await()方法進行等待,直到再次有資料enqueue時,notEmpty發出signal信号。對于隊列達到上限時也是同理。