JDK8的版本,與JDK6的版本有很大的差異。實作線程安全的思想也已經完全變了,它摒棄了Segment(鎖段)的概念,而是啟用了一種全新的方式實作,利用CAS算法。它沿用了與它同時期的HashMap版本的思想,底層依然由“數組”+連結清單+紅黑樹的方式思想,但是為了做到并發,又增加了很多輔助的類,例如TreeBin,Traverser等對象内部類。CAS算法實作無鎖化的修改值的操作,他可以大大降低鎖代理的性能消耗。這個算法的基本思想就是不斷地去比較目前記憶體中的變量值與你指定的一個變量值是否相等,如果相等,則接受你指定的修改的值,否則拒絕你的操作。因為目前線程中的值已經不是最新的值,你的修改很可能會覆寫掉其他線程修改的結果。這一點與樂觀鎖,SVN的思想是比較類似的。
ConcurrentHashMap是conccurrent家族中的一個類,由于它可以高效地支援并發操作,以及被廣泛使用,經典的開源架構Spring的底層資料結構就是使用ConcurrentHashMap實作的。與同是線程安全的老大哥HashTable相比,它已經更勝一籌,是以它的鎖更加細化,而不是像HashTable一樣為幾乎每個方法都添加了synchronized鎖,這樣的鎖無疑會影響到性能。
本文的分析的源碼是JDK8的版本,與JDK6的版本有很大的差異。實作線程安全的思想也已經完全變了,它摒棄了Segment(鎖段)的概念,而是啟用了一種全新的方式實作,利用CAS算法。它沿用了與它同時期的HashMap版本的思想,底層依然由“數組”+連結清單+紅黑樹的方式思想,但是為了做到并發,又增加了很多輔助的類,例如TreeBin,Traverser等對象内部類。
首先來看幾個重要的屬性,與HashMap相同的就不再介紹了,這裡重點解釋一下sizeCtl這個屬性。可以說它是ConcurrentHashMap中出鏡率很高的一個屬性,因為它是一個控制辨別符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含義。
負數代表正在進行初始化或擴容操作
-1代表正在初始化
-N 表示有N-1個線程正在進行擴容操作
正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小,這一點類似于擴容門檻值的概念。還後面可以看到,它的值始終是目前ConcurrentHashMap容量的0.75倍,這與loadfactor是對應的。
/**
* 盛裝Node元素的數組 它的大小是2的整數次幂
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
hash表初始化或擴容時的一個控制位辨別量。
負數代表正在進行初始化或擴容操作
-1代表正在初始化
-N 表示有N-1個線程正在進行擴容操作
正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小
private transient volatile int sizeCtl;
// 以下兩個是用來控制擴容的時候 單線程進入的變量
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
private static int RESIZE_STAMP_BITS = 16;
* The bit shift for recording size stamp in sizeCtl.
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
static final int MOVED = -1; // hash值是-1,表示這是一個forwardNode節點
static final int TREEBIN = -2; // hash值是-2 表示這時一個TreeBin節點
Node是最核心的内部類,它包裝了key-value鍵值對,所有插入ConcurrentHashMap的資料都包裝在這裡面。它與HashMap中的定義很相似,但是但是有一些差别它對value和next屬性設定了volatile同步鎖,它不允許調用setValue方法直接改變Node的value域,它增加了find方法輔助map.get()方法。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;//帶有同步鎖的value
volatile Node<K,V> next;//帶有同步鎖的next指針
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//不允許直接改變value的值
public final V setValue(V value) {
throw new UnsupportedOperationException();
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
這個Node内部類與HashMap中定義的Node類很相似,但是有一些差别
它對value和next屬性設定了volatile同步鎖
它不允許調用setValue方法直接改變Node的value域
它增加了find方法輔助map.get()方法
樹節點類,另外一個核心的資料結構。當連結清單長度過長的時候,會轉換為TreeNode。但是與HashMap不相同的是,它并不是直接轉換為紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的包裝。而且TreeNode在ConcurrentHashMap內建自Node類,而并非HashMap中的內建自LinkedHashMap.Entry<K,V>類,也就是說TreeNode帶有next指針,這樣做的目的是友善基于TreeBin的通路。
這個類并不負責包裝使用者的key、value資訊,而是包裝的很多TreeNode節點。它代替了TreeNode的根節點,也就是說在實際的ConcurrentHashMap“數組”中,存放的是TreeBin對象,而不是TreeNode對象,這是與HashMap的差別。另外這個類還帶有了讀寫鎖。
這裡僅貼出它的構造方法。可以看到在構造TreeBin節點時,僅僅指定了它的hash值為TREEBIN常量,這也就是個辨別為。同時也看到我們熟悉的紅黑樹構造方法
* Creates bin with initial set of nodes headed by b.
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
this.root = r;
assert checkInvariants(root);
}
一個用于連接配接兩個table的節點類。它包含一個nextTable指針,用于指向下一張表。而且這個節點的key value next指針全部為null,它的hash值為-1. 這裡面定義的find的方法是從nextTable裡進行查詢節點,而不是以自身為頭節點進行查找
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
else
return e.find(h, k);
if ((e = e.next) == null)
return null;
}
在ConcurrentHashMap中,随處可以看到U, 大量使用了U.compareAndSwapXXX的方法,這個方法是利用一個CAS算法實作無鎖化的修改值的操作,他可以大大降低鎖代理的性能消耗。這個算法的基本思想就是不斷地去比較目前記憶體中的變量值與你指定的一個變量值是否相等,如果相等,則接受你指定的修改的值,否則拒絕你的操作。因為目前線程中的值已經不是最新的值,你的修改很可能會覆寫掉其他線程修改的結果。這一點與樂觀鎖,SVN的思想是比較類似的。
unsafe代碼塊控制了一些屬性的修改工作,比如最常用的SIZECTL 。 在這一版本的concurrentHashMap中,大量應用來的CAS方法進行變量、屬性的修改工作。 利用CAS進行無鎖操作,可以大大提高性能。
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
ConcurrentHashMap定義了三個原子操作,用于對指定位置的節點進行操作。正是這些原子操作保證了ConcurrentHashMap的線程安全。
@SuppressWarnings("unchecked")
//獲得在i位置上的Node節點
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
//利用CAS算法設定i位置上的Node節點。之是以能實作并發是因為他指定了原來這個節點的值是多少
//在CAS算法中,會比較記憶體中的值與你指定的這個值是否相等,如果相等才接受你的修改,否則拒絕你的修改
//是以目前線程中的值并不是最新的值,這種修改可能會覆寫掉其他線程的修改結果 有點類似于SVN
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
//利用volatile方法設定節點位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
對于ConcurrentHashMap來說,調用它的構造方法僅僅是設定了一些參數而已。而整個table的初始化是在向ConcurrentHashMap中插入元素的時候發生的。如調用put、computeIfAbsent、compute、merge等方法的時候,調用時機是檢查table==null。
初始化方法主要應用了關鍵屬性sizeCtl 如果這個值〈0,表示其他線程正在進行初始化,就放棄這個操作。在這也可以看出ConcurrentHashMap的初始化隻能由一個線程完成。如果獲得了初始化權限,就用CAS方法将sizeCtl置為-1,防止其他線程進入。初始化數組後,将sizeCtl的值改為0.75*n
* Initializes table, using the size recorded in sizeCtl.
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl表示有其他線程正在進行初始化操作,把線程挂起。對于table的初始化工作,隻能有一個線程在進行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置為-1 表示本線程正在進行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//相當于0.75*n 設定一個擴容的門檻值
} finally {
sizeCtl = sc;
break;
return tab;
當ConcurrentHashMap容量不足的時候,需要對table進行擴容。這個方法的基本思想跟HashMap是很像的,但是由于它是支援并發擴容的,是以要複雜的多。原因是它支援多線程進行擴容操作,而并沒有加鎖。我想這樣做的目的不僅僅是為了滿足concurrent的要求,而是希望利用并發處理去減少擴容帶來的時間影響。因為在擴容的時候,總是會涉及到從一個“數組”到另一個“數組”拷貝的操作,如果這個操作能夠并發進行,那真真是極好的了。
整個擴容操作分為兩個部分
第一部分是建構一個nextTable,它的容量是原來的兩倍,這個操作是單線程完成的。這個單線程的保證是通過RESIZE_STAMP_SHIFT這個常量經過一次運算來保證的,這個地方在後面會有提到;
第二個部分就是将原來table中的元素複制到nextTable中,這裡允許多線程進行操作。
先來看一下單線程是如何完成的:
它的大體思想就是周遊、複制的過程。首先根據運算得到需要周遊的次數i,然後利用tabAt方法獲得i位置的元素:
如果這個位置為空,就在原table中的i位置放入forwardNode節點,這個也是觸發并發擴容的關鍵點;
如果這個位置是Node節點(fh>=0),如果它是一個連結清單的頭節點,就構造一個反序連結清單,把他們分别放在nextTable的i和i+n的位置上
如果這個位置是TreeBin節點(fh<0),也做一個反序處理,并且判斷是否需要untreefi,把處理的結果分别放在nextTable的i和i+n的位置上
周遊過所有的節點以後就完成了複制工作,這時讓nextTable作為新的table,并且更新sizeCtl為新容量的0.75倍 ,完成擴容。
再看一下多線程是如何完成的:
在代碼的69行有一個判斷,如果周遊到的節點是forward節點,就向後繼續周遊,再加上給節點上鎖的機制,就完成了多線程的控制。多線程周遊節點,處理了一個節點,就把對應點的值set為forward,另一個線程看到forward,就向後周遊。這樣交叉就完成了複制工作。而且還很好的解決了線程安全的問題。 這個方法的設計實在是讓我膜拜。

/**
* 一個過渡的table表 隻有在擴容的時候才會使用
*/
private transient volatile Node<K,V>[] nextTable;
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//構造一個nextTable對象 它的容量是原來的兩倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
nextTable = nextTab;
transferIndex = n;
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//構造一個連節點指針 用于标志位
boolean advance = true;//并發擴容的關鍵屬性 如果等于true 說明這個節點已經處理過
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//這個while循環體的作用就是在控制i-- 通過i--可以依次周遊原hash表中的節點
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的節點都已經完成複制工作 就把nextTable指派給table 清空臨時對象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//擴容門檻值設定為原來容量的1.5倍 依然相當于現在容量的0.75倍
return;
//利用CAS方法更新這個擴容門檻值,在這裡面sizectl值減一,說明新加入一個線程參與到擴容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
//如果周遊到的節點為空 則放入ForwardingNode指針
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果周遊到ForwardingNode節點 說明這個點已經被處理過了 直接跳過 這裡是控制并發擴容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//節點上鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果fh>=0 證明這是一個Node節點
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是構造兩個連結清單 一個是原連結清單 另一個是原連結清單的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
else {
hn = lastRun;
ln = null;
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
//在nextTable的i位置上插入一個連結清單
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一個連結清單
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode節點 表示已經處理過該節點
setTabAt(tab, i, fwd);
//設定advance為true 傳回到上面的while循環中 就可以執行i--操作
advance = true;
}
//對TreeBin對象進行處理 與上面的過程類似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//構造正序和反序兩個連結清單
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
else {
if ((p.prev = hiTail) == null)
hi = p;
hiTail.next = p;
hiTail = p;
++hc;
//如果擴容後已經不再需要tree的結構 反向轉換為連結清單結構
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一個連結清單
//在table的i位置上插入forwardNode節點 表示已經處理過該節點
}
前面的所有的介紹其實都為這個方法做鋪墊。ConcurrentHashMap最常用的就是put和get兩個方法。現在來介紹put方法,這個put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i,如果i位置是空的,直接放進去,否則進行判斷,如果i位置是樹節點,按照樹的方式插入新的節點,否則把i插入到連結清單的末尾。ConcurrentHashMap中依然沿用這個思想,有一個最重要的不同點就是ConcurrentHashMap不允許key或value為null值。另外由于涉及到多線程,put方法就要複雜一點。在多線程中可能有以下兩個情況
如果一個或多個線程正在對ConcurrentHashMap進行擴容操作,目前線程也要進入擴容的操作中。這個擴容的操作之是以能被檢測到,是因為transfer方法中在空結點上插入forward節點,如果檢測到需要插入的位置被forward節點占有,就幫助進行擴容;
如果檢測到要插入的節點是非空且不是forward節點,就對這個節點加鎖,這樣就保證了線程安全。盡管這個有一些影響效率,但是還是會比hashTable的synchronized要好得多。
整體流程就是首先定義不允許key或value為null的情況放入 對于每一個放入的值,首先利用spread方法對key的hashcode進行一次hash計算,由此來确定這個值在table中的位置。
如果這個位置是空的,那麼直接放入,而且不需要加鎖操作。
如果這個位置存在結點,說明發生了hash碰撞,首先判斷這個節點的類型。如果是連結清單節點(fh>0),則得到的結點就是hash值相同的節點組成的連結清單的頭節點。需要依次向後周遊确定這個新加入的值所在位置。如果遇到hash值與key值都與新加入節點是一緻的情況,則隻需要更新value值即可。否則依次向後周遊,直到連結清單尾插入這個結點。 如果加入這個節點以後連結清單長度大于8,就把這個連結清單轉換成紅黑樹。如果這個節點的類型已經是樹節點的話,直接調用樹節點的插入方法進行插入新的值。
public V put(K key, V value) {
return putVal(key, value, false);
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允許 key或value為null
if (key == null || value == null) throw new NullPointerException();
//計算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//死循環 何時插入成功 何時跳出
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table為空的話,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根據hash值計算出在table裡面的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果這個位置沒有值 ,直接放進去,不需要加鎖
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
//當遇到表連接配接點時,需要進行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//結點上鎖 這裡的結點可以了解為hash值相同組成的連結清單的頭結點
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh〉0 說明這個節點是一個連結清單的節點 不是樹的節點
if (fh >= 0) {
binCount = 1;
//在這裡周遊連結清單所有的結點
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值和key值相同 則修改對應結點的value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果周遊到了最後一個結點,那麼就證明新的節點需要插入 就把它插入在連結清單尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
}
//如果這個節點是樹節點,就按照樹的方式插入值
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
if (binCount != 0) {
//如果連結清單長度已經達到臨界值8 就需要把連結清單轉換為樹結構
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
//将目前ConcurrentHashMap的元素數量+1
addCount(1L, binCount);
return null;
這是一個協助擴容的方法。這個方法被調用的時候,目前ConcurrentHashMap一定已經有了nextTable對象,首先拿到這個nextTable對象,調用transfer方法。回看上面的transfer方法可以看到,當本線程進入擴容方法的時候會直接進入複制階段。
* Helps transfer if a resize is in progress.
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);//計算一個操作校驗碼
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
return nextTab;
return table;
這個方法用于将過長的連結清單轉換為TreeBin對象。但是他并不是直接轉換,而是進行一次容量判斷,如果容量沒有達到轉換的要求,直接進行擴容操作并傳回;如果滿足條件才連結清單的結構抓換為TreeBin ,這與HashMap不同的是,它并沒有把TreeNode直接放入紅黑樹,而是利用了TreeBin這個小容器來封裝所有的TreeNode.
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//如果table.length<64 就擴大一倍 傳回
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//構造了一個TreeBin對象 把所有Node節點包裝成TreeNode放進去
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);//這裡隻是利用了TreeNode封裝 而沒有利用TreeNode的next域和parent域
if ((p.prev = tl) == null)
hd = p;
tl.next = p;
tl = p;
//在原來index的位置 用TreeBin替換掉原來的Node對象
setTabAt(tab, index, new TreeBin<K,V>(hd));
get方法比較簡單,給定一個key來确定value的時候,必須滿足兩個條件 key相同 hash值相同,對于節點可能在連結清單或樹上的情況,需要分别去查找.
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
//根據hash值确定節點位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果搜尋到的節點key與傳入的key相同且不為null,直接傳回這個節點
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
//如果eh<0 說明這個節點在樹上 直接尋找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否則周遊連結清單 找到對應的值并傳回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
對于ConcurrentHashMap來說,這個table裡到底裝了多少東西其實是個不确定的數量,因為不可能在調用size()方法的時候像GC的“stop the world”一樣讓其他線程都停下來讓你去統計,是以隻能說這個數量是個估計值。對于這個估計值,ConcurrentHashMap也是大費周章才計算出來的。
為了統計元素個數,ConcurrentHashMap定義了一些變量和一個内部類
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
/******************************************/
/**
* 實際上儲存的是hashmap中的元素個數 利用CAS鎖進行更新
但它并不用傳回目前hashmap的元素個數
private transient volatile long baseCount;
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
private transient volatile int cellsBusy;
* Table of counter cells. When non-null, size is a power of 2.
private transient volatile CounterCell[] counterCells;
mappingCount與size方法的類似 從Java工程師給出的注釋來看,應該使用mappingCount代替size方法 兩個方法都沒有直接傳回basecount 而是統計一次這個值,而這個值其實也是一個大概的數值,是以可能在統計的時候有其他線程正在執行插入或删除操作。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
public long mappingCount() {
return (n < 0L) ? 0L : n; // ignore transient negative values
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;//所有counter的值求和
return sum;
在put方法結尾處調用了addCount方法,把目前ConcurrentHashMap的元素個數+1這個方法一共做了兩件事,更新baseCount的值,檢測是否進行擴容。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
if (check <= 1)
s = sumCount();
//如果check值大于等于0 則需要檢驗是否需要進行擴容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已經有其他線程在執行擴容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
//目前線程是唯一的或是第一個發起擴容的線程 此時nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
CAS
CAS:Compare and Swap, 翻譯成比較并交換。
java.util.concurrent包中借助CAS實作了差別于synchronouse同步鎖的一種樂觀鎖。
本文先從CAS的應用說起,再深入原了解析。
CAS應用
CAS有3個操作數,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,将記憶體值V修改為B,否則什麼都不做。
非阻塞算法 (nonblocking algorithms)
一個線程的失敗或者挂起不應該影響其他線程的失敗或挂起的算法。
現代的CPU提供了特殊的指令,可以自動更新共享資料,而且能夠檢測到其他線程的幹擾,而 compareAndSet() 就用這些代替了鎖定。
拿出AtomicInteger來研究在沒有鎖的情況下是如何做到資料正确性的。
private volatile int value;
首先毫無以為,在沒有鎖的機制下可能需要借助volatile原語,保證線程間的資料是可見的(共享的)。
這樣才擷取變量的值的時候才能直接讀取。
public final int get() { return value; }
然後來看看++i是怎麼做到的。
public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; }
在這裡采用了CAS操作,每次從記憶體中讀取資料然後将此資料和+1後的結果進行CAS操作,如果成功就傳回結果,否則重試直到成功為止。
而compareAndSet利用JNI來完成CPU指令的操作。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
整體的過程就是這樣子的,利用CPU的CAS指令,同時借助JNI來完成Java的非阻塞算法。其它原子操作都是利用類似的特性完成的。
其中
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
類似:
if (this == expect) {
this = update
return true;
} else {
return false;
那麼問題就來了,成功過程中需要2個步驟:比較this == expect,替換this = update,compareAndSwapInt如何這兩個步驟的原子性呢? 參考CAS的原理。
CAS原理
CAS通過調用JNI的代碼實作的。JNI:Java Native Interface為JAVA本地調用,允許java調用其他語言。
而compareAndSwapInt就是借助C來調用CPU底層指令實作的。
下面從分析比較常用的CPU(intel x86)來解釋CAS的實作原理。
下面是sun.misc.Unsafe類的compareAndSwapInt()方法的源代碼:
可以看到這是個本地方法調用。這個本地方法在openjdk中依次調用的c++代碼為:unsafe.cpp,atomic.cpp和atomicwindowsx86.inline.hpp。這個本地方法的最終實作在openjdk的如下位置:openjdk-7-fcs-src-b147-27jun2011\openjdk\hotspot\src\oscpu\windowsx86\vm\
atomicwindowsx86.inline.hpp(對應于windows作業系統,X86處理器)。下面是對應于intel x86處理器的源代碼的片段:
如上面源代碼所示,程式會根據目前處理器的類型來決定是否為cmpxchg指令添加lock字首。如果程式是在多處理器上運作,就為cmpxchg指令加上lock字首(lock cmpxchg)。反之,如果程式是在單處理器上運作,就省略lock字首(單處理器自身會維護單處理器内的順序一緻性,不需要lock字首提供的記憶體屏障效果)。
intel的手冊對lock字首的說明如下:
確定對記憶體的讀-改-寫操作原子執行。在Pentium及Pentium之前的處理器中,帶有lock字首的指令在執行期間會鎖住總線,使得其他處理器暫時無法通過總線通路記憶體。很顯然,這會帶來昂貴的開銷。從Pentium 4,Intel Xeon及P6處理器開始,intel在原有總線鎖的基礎上做了一個很有意義的優化:如果要通路的記憶體區域(area of memory)在lock字首指令執行期間已經在處理器内部的緩存中被鎖定(即包含該記憶體區域的緩存行目前處于獨占或以修改狀态),并且該記憶體區域被完全包含在單個緩存行(cache
line)中,那麼處理器将直接執行該指令。由于在指令執行期間該緩存行會一直被鎖定,其它處理器無法讀/寫該指令要通路的記憶體區域,是以能保證指令執行的原子性。這個操作過程叫做緩存鎖定(cache locking),緩存鎖定将大大降低lock字首指令的執行開銷,但是當多處理器之間的競争程度很高或者指令通路的記憶體位址未對齊時,仍然會鎖住總線。
禁止該指令與之前和之後的讀和寫指令重排序。
把寫緩沖區中的所有資料重新整理到記憶體中。
備注知識:
關于CPU的鎖有如下3種:
3.1 處理器自動保證基本記憶體操作的原子性
首先處理器會自動保證基本的記憶體操作的原子性。處理器保證從系統記憶體當中讀取或者寫入一個位元組是原子的,意思是當一個處理器讀取一個位元組時,其他處理器不能通路這個位元組的記憶體位址。奔騰6和最新的處理器能自動保證單處理器對同一個緩存行裡進行16/32/64位的操作是原子的,但是複雜的記憶體操作處理器不能自動保證其原子性,比如跨總線寬度,跨多個緩存行,跨頁表的通路。但是處理器提供總線鎖定和緩存鎖定兩個機制來保證複雜記憶體操作的原子性。
3.2 使用總線鎖保證原子性
第一個機制是通過總線鎖保證原子性。如果多個處理器同時對共享變量進行讀改寫(i++就是經典的讀改寫操作)操作,那麼共享變量就會被多個處理器同時進行操作,這樣讀改寫操作就不是原子的,操作完之後共享變量的值會和期望的不一緻,舉個例子:如果i=1,我們進行兩次i++操作,我們期望的結果是3,但是有可能結果是2。如下圖
原因是有可能多個處理器同時從各自的緩存中讀取變量i,分别進行加一操作,然後分别寫入系統記憶體當中。那麼想要保證讀改寫共享變量的操作是原子的,就必須保證CPU1讀改寫共享變量的時候,CPU2不能操作緩存了該共享變量記憶體位址的緩存。
處理器使用總線鎖就是來解決這個問題的。所謂總線鎖就是使用處理器提供的一個LOCK#信号,當一個處理器在總線上輸出此信号時,其他處理器的請求将被阻塞住,那麼該處理器可以獨占使用共享記憶體。
3.3 使用緩存鎖保證原子性
第二個機制是通過緩存鎖定保證原子性。在同一時刻我們隻需保證對某個記憶體位址的操作是原子性即可,但總線鎖定把CPU和記憶體之間通信鎖住了,這使得鎖定期間,其他處理器不能操作其他記憶體位址的資料,是以總線鎖定的開銷比較大,最近的處理器在某些場合下使用緩存鎖定代替總線鎖定來進行優化。
頻繁使用的記憶體會緩存在處理器的L1,L2和L3高速緩存裡,那麼原子操作就可以直接在處理器内部緩存中進行,并不需要聲明總線鎖,在奔騰6和最近的處理器中可以使用“緩存鎖定”的方式來實作複雜的原子性。所謂“緩存鎖定”就是如果緩存在處理器緩存行中記憶體區域在LOCK操作期間被鎖定,當它執行鎖操作回寫記憶體時,處理器不在總線上聲言LOCK#信号,而是修改内部的記憶體位址,并允許它的緩存一緻性機制來保證操作的原子性,因為緩存一緻性機制會阻止同時修改被兩個以上處理器緩存的記憶體區域資料,當其他處理器回寫已被鎖定的緩存行的資料時會起緩存行無效,在例1中,當CPU1修改緩存行中的i時使用緩存鎖定,那麼CPU2就不能同時緩存了i的緩存行。
但是有兩種情況下處理器不會使用緩存鎖定。第一種情況是:當操作的資料不能被緩存在處理器内部,或操作的資料跨多個緩存行(cache line),則處理器會調用總線鎖定。第二種情況是:有些處理器不支援緩存鎖定。對于Inter486和奔騰處理器,就算鎖定的記憶體區域在處理器的緩存行中也會調用總線鎖定。
以上兩個機制我們可以通過Inter處理器提供了很多LOCK字首的指令來實作。比如位測試和修改指令BTS,BTR,BTC,交換指令XADD,CMPXCHG和其他一些操作數和邏輯指令,比如ADD(加),OR(或)等,被這些指令操作的記憶體區域就會加鎖,導緻其他處理器不能同時通路它。
CAS缺點
CAS雖然很高效的解決原子操作,但是CAS仍然存在三大問題。ABA問題,循環時間長開銷大和隻能保證一個共享變量的原子操作
1. ABA問題。因為CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本号。在變量前面追加上版本号,每次變量更新的時候把版本号加一,那麼A-B-A
就會變成1A-2B-3A。
從Java1.5開始JDK的atomic包裡提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法作用是首先檢查目前引用是否等于預期引用,并且目前标志是否等于預期标志,如果全部相等,則以原子方式将該引用和該标志的值設定為給定的更新值。
關于ABA問題參考文檔: http://blog.hesey.net/2011/09/resolve-aba-by-atomicstampedreference.html
2. 循環時間長開銷大。自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。如果JVM能支援處理器提供的pause指令那麼效率會有一定的提升,pause指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決于具體實作的版本,在一些處理器上延遲時間是零。第二它可以避免在退出循環的時候因記憶體順序沖突(memory
order violation)而引起CPU流水線被清空(CPU pipeline flush),進而提高CPU的執行效率。
3. 隻能保證一個共享變量的原子操作。當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合并成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合并一下ij=2a,然後用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你可以把多個變量放在一個對象裡來進行CAS操作。
由于java的CAS同時具有 volatile 讀和volatile寫的記憶體語義,是以Java線程之間的通信現在有了下面四種方式:
A線程寫volatile變量,随後B線程讀這個volatile變量。
A線程寫volatile變量,随後B線程用CAS更新這個volatile變量。
A線程用CAS更新一個volatile變量,随後B線程用CAS更新這個volatile變量。
A線程用CAS更新一個volatile變量,随後B線程讀這個volatile變量。
Java的CAS會使用現代處理器上提供的高效機器級别原子指令,這些原子指令以原子方式對記憶體執行讀-改-寫操作,這是在多處理器中實作同步的關鍵(從本質上來說,能夠支援原子性讀-改-寫指令的計算機器,是順序計算圖靈機的異步等價機器,是以任何現代的多處理器都會去支援某種能對記憶體執行原子性讀-改-寫操作的原子指令)。同時,volatile變量的讀/寫和CAS可以實作線程之間的通信。把這些特性整合在一起,就形成了整個concurrent包得以實作的基石。如果我們仔細分析concurrent包的源代碼實作,會發現一個通用化的實作模式:
首先,聲明共享變量為volatile;
然後,使用CAS的原子條件更新來實作線程之間的同步;
同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的記憶體語義來實作線程之間的通信。
AQS,非阻塞資料結構和原子變量類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實作的,而concurrent包中的高層類又是依賴于這些基礎類來實作的。從整體來看,concurrent包的實作示意圖如下:
鎖是用來做并發最簡單的方式,當然其代價也是最高的。核心态的鎖的時候需要作業系統進行一次上下文切換,加鎖、釋放鎖會導緻比較多的上下文切換和排程延時,等待鎖的線程會被挂起直至鎖釋放。在上下文切換的時候,cpu之前緩存的指令和資料都将失效,對性能有很大的損失。作業系統對多線程的鎖進行判斷就像兩姐妹在為一個玩具在争吵,然後作業系統就是能決定他們誰能拿到玩具的父母,這是很慢的。使用者态的鎖雖然避免了這些問題,但是其實它們隻是在沒有真實的競争時才有效。
Java在JDK1.5之前都是靠synchronized關鍵字保證同步的,這種通過使用一緻的鎖定協定來協調對共享狀态的通路,可以確定無論哪個線程持有守護變量的鎖,都采用獨占的方式來通路這些變量,如果出現多個線程同時通路鎖,那第一些線線程将被挂起,當線程恢複執行時,必須等待其它線程執行完他們的時間片以後才能被排程執行,在挂起和恢複執行過程中存在着很大的開銷。鎖還存在着其它一些缺點,當一個線程正在等待鎖時,它不能做任何事。如果一個線程在持有鎖的情況下被延遲執行,那麼所有需要這個鎖的線程都無法執行下去。如果被阻塞的線程優先級高,而持有鎖的線程優先級低,将會導緻優先級反轉(Priority
Inversion)。
獨占鎖是一種悲觀鎖,synchronized就是一種獨占鎖,它假設最壞的情況,并且隻有在確定其它線程不會造成幹擾的情況下執行,會導緻其它所有需要鎖的線程挂起,等待持有鎖的線程釋放鎖。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,直到成功為止。
原子操作指的是在一步之内就完成而且不能被中斷。原子操作在多線程環境中是線程安全的,無需考慮同步的問題。在java中,下列操作是原子操作:
all assignments of primitive types except for long and double
all assignments of references
all operations of java.concurrent.Atomic* classes
all assignments to volatile longs and doubles
問題來了,為什麼long型指派不是原子操作呢?例如:
1
<code>long</code> <code>foo = 65465498L;</code>
實時上java會分兩步寫入這個long變量,先寫32位,再寫後32位。這樣就線程不安全了。如果改成下面的就線程安全了:
<code>private</code> <code>volatile</code> <code>long</code> <code>foo;</code>
因為volatile内部已經做了synchronized.
CPU指令,在大多數處理器架構,包括IA32、Space中采用的都是CAS指令,CAS的語義是“我認為V的值應該為A,如果是,那麼将V的值更新為B,否則不修改并告訴V的值實際為多少”,CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,隻有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程并不會被挂起,而是被告知這次競争中失敗,并可以再次嘗試。CAS有3個操作數,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,将記憶體值V修改為B,否則什麼都不做。CAS無鎖算法的C實作如下:
2
3
4
5
6
7
8
9
<code>int</code> <code>compare_and_swap (</code><code>int</code><code>* reg,</code><code>int</code> <code>oldval,</code><code>int</code> <code>newval)</code>
<code>{</code>
<code> </code><code>ATOMIC();</code>
<code> </code><code>int</code> <code>old_reg_val = *reg;</code>
<code> </code><code>if</code> <code>(old_reg_val == oldval)</code>
<code> </code><code>*reg = newval;</code>
<code> </code><code>END_ATOMIC();</code>
<code> </code><code>return</code> <code>old_reg_val;</code>
<code>}</code>
CAS比較與交換的僞代碼可以表示為:
do{
備份舊資料;
基于舊資料構造新資料;
}while(!CAS( 記憶體位址,備份的舊資料,新資料 ))
(上圖的解釋:CPU去更新一個值,但如果想改的值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。)
就是指當兩者進行比較時,如果相等,則證明共享資料沒有被修改,替換成新值,然後繼續往下運作;如果不相等,說明共享資料已經被修改,放棄已經所做的操作,然後重新執行剛才的操作。容易看出 CAS 操作是基于共享資料不會被修改的假設,采用了類似于資料庫的 commit-retry 的模式。當同步沖突出現的機會很少時,這種假設能帶來較大的性能提升。
前面說過了,CAS(比較并交換)是CPU指令級的操作,隻有一步原子操作,是以非常快。而且CAS避免了請求作業系統來裁定鎖的問題,不用麻煩作業系統,直接在CPU内部就搞定了。但CAS就沒有開銷了嗎?不!有cache miss的情況。這個問題比較複雜,首先需要了解CPU的硬體體系結構:
上圖可以看到一個8核CPU計算機系統,每個CPU有cache(CPU内部的高速緩存,寄存器),管芯内還帶有一個互聯子產品,使管芯内的兩個核可以互相通信。在圖中央的系統互聯子產品可以讓四個管芯互相通信,并且将管芯與主存連接配接起來。資料以“緩存線”為機關在系統中傳輸,“緩存線”對應于記憶體中一個 2 的幂大小的位元組塊,大小通常為 32 到 256 位元組之間。當 CPU 從記憶體中讀取一個變量到它的寄存器中時,必須首先将包含了該變量的緩存線讀取到
CPU 高速緩存。同樣地,CPU 将寄存器中的一個值存儲到記憶體時,不僅必須将包含了該值的緩存線讀到 CPU 高速緩存,還必須確定沒有其他 CPU 擁有該緩存線的拷貝。
比如,如果 CPU0 在對一個變量執行“比較并交換”(CAS)操作,而該變量所在的緩存線在 CPU7 的高速緩存中,就會發生以下經過簡化的事件序列:
CPU0 檢查本地高速緩存,沒有找到緩存線。
請求被轉發到 CPU0 和 CPU1 的互聯子產品,檢查 CPU1 的本地高速緩存,沒有找到緩存線。
請求被轉發到系統互聯子產品,檢查其他三個管芯,得知緩存線被 CPU6和 CPU7 所在的管芯持有。
請求被轉發到 CPU6 和 CPU7 的互聯子產品,檢查這兩個 CPU 的高速緩存,在 CPU7 的高速緩存中找到緩存線。
CPU7 将緩存線發送給所屬的互聯子產品,并且重新整理自己高速緩存中的緩存線。
CPU6 和 CPU7 的互聯子產品将緩存線發送給系統互聯子產品。
系統互聯子產品将緩存線發送給 CPU0 和 CPU1 的互聯子產品。
CPU0 和 CPU1 的互聯子產品将緩存線發送給 CPU0 的高速緩存。
CPU0 現在可以對高速緩存中的變量執行 CAS 操作了
以上是重新整理不同CPU緩存的開銷。最好情況下的 CAS 操作消耗大概 40 納秒,超過 60 個時鐘周期。這裡的“最好情況”是指對某一個變量執行 CAS 操作的 CPU 正好是最後一個操作該變量的CPU,是以對應的緩存線已經在 CPU 的高速緩存中了,類似地,最好情況下的鎖操作(一個“round trip 對”包括擷取鎖和随後的釋放鎖)消耗超過 60 納秒,超過 100 個時鐘周期。這裡的“最好情況”意味着用于表示鎖的資料結構已經在擷取和釋放鎖的
CPU 所屬的高速緩存中了。鎖操作比 CAS 操作更加耗時,是因深入了解并行程式設計
為鎖操作的資料結構中需要兩個原子操作。緩存未命中消耗大概 140 納秒,超過 200 個時鐘周期。需要在存儲新值時查詢變量的舊值的 CAS 操作,消耗大概 300 納秒,超過 500 個時鐘周期。想想這個,在執行一次 CAS 操作的時間裡,CPU 可以執行 500 條普通指令。這表明了細粒度鎖的局限性。
以下是cache miss cas 和lock的性能對比:
在JDK1.5之前,如果不編寫明确的代碼就無法執行CAS操作,在JDK1.5中引入了底層的支援,在int、long和對象的引用等類型上都公開了CAS的操作,并且JVM把它們編譯為底層硬體提供的最有效的方法,在運作CAS的平台上,運作時把它們編譯為相應的機器指令,如果處理器/CPU不支援CAS指令,那麼JVM将使用自旋鎖。是以,值得注意的是,CAS解決方案與平台/編譯器緊密相關(比如x86架構下其對應的彙編指令是lock
cmpxchg,如果想要64Bit的交換,則應使用lock cmpxchg8b。在.NET中我們可以使用Interlocked.CompareExchange函數)。
在原子類變量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了這些底層的JVM支援為數字類型的引用類型提供一種高效的CAS操作,而在java.util.concurrent中的大多數類在實作時都直接或間接的使用了這些原子變量類。
Java 1.6中AtomicLong.incrementAndGet()的實作源碼為:
<a target="_blank" href="http://www.cnblogs.com/Mainz/p/3546347.html?utm_source=tuicool&utm_medium=referral#">+ View Code</a>
CAS算法,用CPU指令來實作無鎖自增。是以,AtomicLong.incrementAndGet的自增比用synchronized的鎖效率倍增。
10
11
12
<code>public</code> <code>final</code> <code>int</code> <code>getAndIncrement() { </code>
<code> </code><code>for</code> <code>(;;) { </code>
<code> </code><code>int</code> <code>current = get(); </code>
<code> </code><code>int</code> <code>next = current +</code><code>1</code><code>; </code>
<code> </code><code>if</code> <code>(compareAndSet(current, next)) </code>
<code> </code><code>return</code> <code>current; </code>
<code> </code><code>} </code>
<code>} </code>
<code> </code>
<code>public</code> <code>final</code> <code>boolean</code> <code>compareAndSet(</code><code>int</code> <code>expect,</code><code>int</code> <code>update) { </code>
<code> </code><code>return</code> <code>unsafe.compareAndSwapInt(</code><code>this</code><code>, valueOffset, expect, update); </code>
下面是測試代碼:可以看到用AtomicLong.incrementAndGet的性能比用synchronized高出幾倍。
下面是比非阻塞自增稍微複雜一點的CAS的例子:非阻塞堆棧/<code>ConcurrentStack</code> 。<code>ConcurrentStack</code> 中的 <code>push()</code> 和<code>pop()</code> 操作在結構上與<code>NonblockingCounter</code> 上相似,隻是做的工作有些冒險,希望在 “送出”
工作的時候,底層假設沒有失效。<code>push()</code> 方法觀察目前最頂的節點,建構一個新節點放在堆棧上,然後,如果最頂端的節點在初始觀察之後沒有變化,那麼就安裝新節點。如果 CAS 失敗,意味着另一個線程已經修改了堆棧,那麼過程就會重新開始。
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<code>public</code> <code>class</code> <code>ConcurrentStack<E> {</code>
<code> </code><code>AtomicReference<Node<E>> head =</code><code>new</code> <code>AtomicReference<Node<E>>();</code>
<code> </code><code>public</code> <code>void</code> <code>push(E item) {</code>
<code> </code><code>Node<E> newHead =</code><code>new</code> <code>Node<E>(item);</code>
<code> </code><code>Node<E> oldHead;</code>
<code> </code><code>do</code> <code>{</code>
<code> </code><code>oldHead = head.get();</code>
<code> </code><code>newHead.next = oldHead;</code>
<code> </code><code>}</code><code>while</code> <code>(!head.compareAndSet(oldHead, newHead));</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>E pop() {</code>
<code> </code><code>Node<E> newHead;</code>
<code> </code><code>if</code> <code>(oldHead ==</code><code>null</code><code>)</code>
<code> </code><code>return</code> <code>null</code><code>;</code>
<code> </code><code>newHead = oldHead.next;</code>
<code> </code><code>}</code><code>while</code> <code>(!head.compareAndSet(oldHead,newHead));</code>
<code> </code><code>return</code> <code>oldHead.item;</code>
<code> </code><code>static</code> <code>class</code> <code>Node<E> {</code>
<code> </code><code>final</code> <code>E item;</code>
<code> </code><code>Node<E> next;</code>
<code> </code><code>public</code> <code>Node(E item) {</code><code>this</code><code>.item = item; }</code>
在輕度到中度的争用情況下,非阻塞算法的性能會超越阻塞算法,因為 CAS 的多數時間都在第一次嘗試時就成功,而發生争用時的開銷也不涉及線程挂起和上下文切換,隻多了幾個循環疊代。沒有争用的 CAS 要比沒有争用的鎖便宜得多(這句話肯定是真的,因為沒有争用的鎖涉及 CAS 加上額外的處理),而争用的 CAS 比争用的鎖擷取涉及更短的延遲。
在高度争用的情況下(即有多個線程不斷争用一個記憶體位置的時候),基于鎖的算法開始提供比非阻塞算法更好的吞吐率,因為當線程阻塞時,它就會停止争用,耐心地等候輪到自己,進而避免了進一步争用。但是,這麼高的争用程度并不常見,因為多數時候,線程會把線程本地的計算與争用共享資料的操作分開,進而給其他線程使用共享資料的機會。
以上的示例(自增計數器和堆棧)都是非常簡單的非阻塞算法,一旦掌握了在循環中使用 CAS,就可以容易地模仿它們。對于更複雜的資料結構,非阻塞算法要比這些簡單示例複雜得多,因為修改連結清單、樹或哈希表可能涉及對多個指針的更新。CAS 支援對單一指針的原子性條件更新,但是不支援兩個以上的指針。是以,要建構一個非阻塞的連結清單、樹或哈希表,需要找到一種方式,可以用 CAS 更新多個指針,同時不會讓資料結構處于不一緻的狀态。
在連結清單的尾部插入元素,通常涉及對兩個指針的更新:“尾” 指針總是指向清單中的最後一個元素,“下一個” 指針從過去的最後一個元素指向新插入的元素。因為需要更新兩個指針,是以需要兩個 CAS。在獨立的 CAS 中更新兩個指針帶來了兩個需要考慮的潛在問題:如果第一個 CAS 成功,而第二個 CAS 失敗,會發生什麼?如果其他線程在第一個和第二個 CAS 之間企圖通路連結清單,會發生什麼?
對于非複雜資料結構,建構非阻塞算法的 “技巧” 是確定資料結構總處于一緻的狀态(甚至包括線上程開始修改資料結構和它完成修改之間),還要確定其他線程不僅能夠判斷出第一個線程已經完成了更新還是處在更新的中途,還能夠判斷出如果第一個線程走向 AWOL,完成更新還需要什麼操作。如果線程發現了處在更新中途的資料結構,它就可以 “幫助” 正在執行更新的線程完成更新,然後再進行自己的操作。當第一個線程回來試圖完成自己的更新時,會發現不再需要了,傳回即可,因為
CAS 會檢測到幫助線程的幹預(在這種情況下,是建設性的幹預)。
這種 “幫助鄰居” 的要求,對于讓資料結構免受單個線程失敗的影響,是必需的。如果線程發現資料結構正處在被其他線程更新的中途,然後就等候其他線程完成更新,那麼如果其他線程在操作中途失敗,這個線程就可能永遠等候下去。即使不出現故障,這種方式也會提供糟糕的性能,因為新到達的線程必須放棄處理器,導緻上下文切換,或者等到自己的時間片過期(而這更糟)。
28
29
30
<code>public</code> <code>class</code> <code>LinkedQueue <E> {</code>
<code> </code><code>private</code> <code>static</code> <code>class</code> <code>Node <E> {</code>
<code> </code><code>final</code> <code>AtomicReference<Node<E>> next;</code>
<code> </code><code>Node(E item, Node<E> next) {</code>
<code> </code><code>this</code><code>.item = item;</code>
<code> </code><code>this</code><code>.next =</code><code>new</code> <code>AtomicReference<Node<E>>(next);</code>
<code> </code><code>}</code>
<code> </code><code>private</code> <code>AtomicReference<Node<E>> head</code>
<code> </code><code>=</code><code>new</code> <code>AtomicReference<Node<E>>(</code><code>new</code> <code>Node<E>(</code><code>null</code><code>,</code><code>null</code><code>));</code>
<code> </code><code>private</code> <code>AtomicReference<Node<E>> tail = head;</code>
<code> </code><code>public</code> <code>boolean</code> <code>put(E item) {</code>
<code> </code><code>Node<E> newNode =</code><code>new</code> <code>Node<E>(item,</code><code>null</code><code>);</code>
<code> </code><code>while</code> <code>(</code><code>true</code><code>) {</code>
<code> </code><code>Node<E> curTail = tail.get();</code>
<code> </code><code>Node<E> residue = curTail.next.get();</code>
<code> </code><code>if</code> <code>(curTail == tail.get()) {</code>
<code> </code><code>if</code> <code>(residue ==</code><code>null</code><code>)</code><code>/* A */</code> <code>{</code>
<code> </code><code>if</code> <code>(curTail.next.compareAndSet(</code><code>null</code><code>, newNode))</code><code>/* C */</code> <code>{</code>
<code> </code><code>tail.compareAndSet(curTail, newNode)</code><code>/* D */</code> <code>;</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>}</code><code>else</code> <code>{</code>
<code> </code><code>tail.compareAndSet(curTail, residue)</code><code>/* B */</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
Java5中的ConcurrentHashMap,線程安全,設計巧妙,用桶粒度的鎖,避免了put和get中對整個map的鎖定,尤其在get中,隻對一個HashEntry做鎖定操作,性能提升是顯而易見的。
服務端程式設計的3大性能殺手:1、大量線程導緻的線程切換開銷。2、鎖。3、非必要的記憶體拷貝。在高并發下,對于純記憶體操作來說,單線程是要比多線程快的, 可以比較一下多線程程式在壓力測試下cpu的sy和ni百分比。高并發環境下要實作高吞吐量和線程安全,兩個思路:一個是用優化的鎖實作,一個是lock-free的無鎖結構。但非阻塞算法要比基于鎖的算法複雜得多。開發非阻塞算法是相當專業的訓練,而且要證明算法的正确也極為困難,不僅和具體的目标機器平台和編譯器相關,而且需要複雜的技巧和嚴格的測試。雖然Lock-Free程式設計非常困難,但是它通常可以帶來比基于鎖程式設計更高的吞吐量。是以Lock-Free程式設計是大有前途的技術。它線上程中止、優先級倒置以及信号安全等方面都有着良好的表現。
io)
如果深入 JVM 和作業系統,會發現非阻塞算法無處不在。垃圾收集器使用非阻塞算法加快并發和平行的垃圾搜集;排程器使用非阻塞算法有效地排程線程和程序,實作内在鎖。在 Mustang(Java 6.0)中,基于鎖的<code>SynchronousQueue</code> 算法被新的非阻塞版本代替。很少有開發人員會直接使用 <code>SynchronousQueue</code>,但是通過<code>Executors.newCachedThreadPool()</code> 工廠建構的線程池用它作為工作隊列。比較緩存線程池性能的對比測試顯示,新的非阻塞同步隊列實作提供了幾乎是目前實作
3 倍的速度。在 Mustang 的後續版本(代碼名稱為 Dolphin)中,已經規劃了進一步的改進。