Jdk1.6 JUC源碼解析(27)-Exchanger
作者:大飛
功能簡介:
- Exchanger是一種線程間安全交換資料的機制。可以和之前分析過的SynchronousQueue對比一下:線程A通過SynchronousQueue将資料a交給線程B;線程A通過Exchanger和線程B交換資料,線程A把資料a交給線程B,同時線程B把資料b交給線程A。可見,SynchronousQueue是交給一個資料,Exchanger是交換兩個資料。
源碼分析:
- 先看下内部結構:
private static final class Node extends AtomicReference<Object> {
/** 建立這個節點的線程提供的用于交換的資料。 */
public final Object item;
/** 等待喚醒的線程 */
public volatile Thread waiter;
/**
* Creates node with given item and empty hole.
* @param item the item
*/
public Node(Object item) {
this.item = item;
}
}
/**
* 一個Slot就是一對線程交換資料的地方。
* 這裡對Slot做了緩存行填充,能夠避免僞共享問題。
* 雖然填充導緻浪費了一些空間,但Slot是按需建立,一般沒什麼問題。
*/
private static final class Slot extends AtomicReference<Object> {
// Improve likelihood of isolation on <= 64 byte cache lines
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
/**
* Slot數組,在需要時才進行初始化。
* 用volatile修飾,因為這樣可以安全的使用雙重鎖檢測方式建構。
*/
private volatile Slot[] arena = new Slot[CAPACITY];
/**
* arena(Slot數組)的容量。設定這個值用來避免競争。
*/
private static final int CAPACITY = 32;
/**
* 正在使用的slot下标的最大值。當一個線程經曆了多次CAS競争後,
* 這個值會遞增;當一個線程自旋等待逾時後,這個值會遞減。
*/
private final AtomicInteger max = new AtomicInteger();
内部結構很清晰,首先内部包含一個Slot數組,預設容量是32,用來避免以一些競争,有點類似于ConcurrentHashMap的政策;其次,交換資料的場所就是Slot,它本身進行了cache line填充,避免了僞共享問題;最後,每個要進行資料交換的線程在内部會用一個Node來表示。 僞共享說明:假設一個類的兩個互相獨立的屬性a和b在記憶體位址上是連續的(比如FIFO隊列的頭尾指針),那麼它們通常會被加載到相同的cpu cache line裡面。并發情況下,如果一個線程修改了a,會導緻整個cache line失效(包括b),這時另一個線程來讀b,就需要從記憶體裡再次加載了,這種多線程頻繁修改ab的情況下,雖然a和b看似獨立,但它們會互相幹擾,非常影響性能。
- 看完了内部結構,接下來就從Exchanger的交換資料方法exchange入手來分析代碼:
/**
* 等待其他線程到達交換點,然後與其進行資料交換。
*
* 如果其他線程到來,那麼交換資料,傳回。
*
* 如果其他線程未到來,那麼目前線程等待,知道如下情況發生:
* 1.有其他線程來進行資料交換。
* 2.目前線程被中斷。
*/
public V exchange(V x) throws InterruptedException {
if (!Thread.interrupted()) {//檢測目前線程是否被中斷。
//進行資料交換。
Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
if (v == NULL_ITEM)
return null; //檢測結果是否為null。
if (v != CANCEL) //檢測是否被取消。
return (V)v;
Thread.interrupted(); // 清除中斷标記。
}
throw new InterruptedException();
}
/**
* 等待其他線程到達交換點,然後與其進行資料交換。
*
* 如果其他線程到來,那麼交換資料,傳回。
*
* 如果其他線程未到來,那麼目前線程等待,知道如下情況發生:
* 1.有其他線程來進行資料交換。
* 2.目前線程被中斷。
* 3.逾時。
*/
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (!Thread.interrupted()) {
Object v = doExchange(x == null? NULL_ITEM : x,
true, unit.toNanos(timeout));
if (v == NULL_ITEM)
return null;
if (v != CANCEL)
return (V)v;
if (!Thread.interrupted())
throw new TimeoutException();
}
throw new InterruptedException();
}
上面的方法都調用了doExchange方法,主要邏輯在這個方法裡,分析下這個方法:
/**
* 這個方法會處理不同的情況,使用Object而不是泛型,主要是為了傳回一些
* 哨兵值(比如表示null和取消的對象)。
*
* @param item 用來進行交換的資料。
* @param timed 如果有逾時延遲,設定為true
* @param nanos 具體的逾時時間。
* @return 傳回另一個線程(與目前線程交換資料)的資料,或者CANCEL(表示取消)
*/
private Object doExchange(Object item, boolean timed, long nanos) {
Node me = new Node(item); // 建立目前節點me。
int index = hashIndex(); // 計算出目前slot的下标。
int fails = 0; // 用來儲存CAS失敗的次數。
for (;;) {
Object y; // 用來儲存目前slot中可能存在的Node。
Slot slot = arena[index]; // 按照前面計算出的下标擷取目前slot。
if (slot == null)
createSlot(index); // 如果slot為null,那麼建立一個slot,然後繼續循環。
else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { // 如果slot不為空,那麼slot可能被另一個Node給占了,如果确實存在這個Node,嘗試将其置空。(表示目前節點要和這個Node交換資料了)
Node you = (Node)y; // 給這個Node轉型,賦給you。
if (you.compareAndSet(null, item)) { // 将item設定給you,注意you本身是一個AtomicReference,這裡相當于把item設定到you的value字段上。
LockSupport.unpark(you.waiter); // 然後喚醒you節點上等待的線程。
return you.item; // 傳回you的item。
} // 競争失敗,放棄,繼續循環。
}
else if (y == null && // 如果slot為空,那麼說明沒有要和目前線程交換資料的線程,
slot.compareAndSet(null, me)) { //那麼目前線程先嘗試把這個slot給占了。
if (index == 0) // 如果slot下标為0,那麼阻塞等待。
return timed? awaitNanos(me, slot, nanos): await(me, slot); // 有逾時的話,會阻塞給定的時間。
Object v = spinWait(me, slot); // 如果slot下标不是0,自旋等待,等待其他線程來和目前線程交換資料,然後傳回交換後的資料。
if (v != CANCEL)
return v;
me = new Node(item); // 如果取消的話,重試,重建一個Node,之前的Node就丢棄了。
int m = max.get(); // 擷取目前slot下标的最大值。
if (m > (index >>>= 1)) // 如果目前允許的最大索引太大。
max.compareAndSet(m, m - 1); // 遞減最大索引
}
else if (++fails > 1) { // 如果1個slot競争失敗超過2次。
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) //如果競争失敗超過3次,嘗試遞增最大索引值。
index = m + 1; // 增加索引值。
else if (--index < 0) // 換個index。
index = m; // 繞回邏輯,防止index越界。
}
}
}
這裡形象的了解一下: 其實就是"我"和"你"(可能有多個"我",多個"你")在一個叫Slot的地方做交易(一手交錢,一手交貨),過程分以下步驟: 1.我到交易地點(Slot)的時候,你已經到了,那我就嘗試喊你交易,如果你回應了我,決定和我交易那麼進入第2步;如果别人搶先一步把你喊走了,那我隻能再找别人了,進入第5步。 2.我拿出錢交給你,你可能會接收我的錢,然後把貨給我,交易結束;也可能嫌我掏錢太慢(逾時)或者接個電話(中斷),TM的不賣了,走了,那我隻能再找别人買貨了(從頭開始)。 3.我到交易地點的時候,你不在,那我先嘗試把這個交易點給占了(一屁股做凳子上...),如果我成功搶占了單間(交易點),那就坐這兒等着你拿貨來交易,進入第4步;如果被别人搶座了,那我隻能在找别的地方兒了,進入第5步。 4.你拿着貨來了,喊我交易,然後完成交易;也可能我等了好長時間你都沒來,我不等了,繼續找别人交易去,走的時候我看了一眼,一共沒多少人,弄了這麼多單間(交易地點Slot),太TM浪費了,我喊來交易地點管理者:一共也沒幾個人,搞這麼多單間兒幹毛,給哥撤一個!。然後再找别人買貨(從頭開始);或者我老大給我打了個電話,不讓我買貨了(中斷)。 5.如果之前我嘗試交易了2次都沒成功,那我就想我TM選的這個位置(Slot下标)是不是風水不好啊,換個地兒繼續(從頭開始);如果之前都嘗試交易了4次還沒成功,我怒了,喊過來交易地點的管理者:給哥再開一個單間(Slot),加一個凳子,這麼多人就這麼幾個破凳子夠誰用!
看一下doExchange調用的計算slot下标的方法:
/**
* Returns a hash index for the current thread. Uses a one-step
* FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
* based on the current thread's Thread.getId(). These hash codes
* have more uniform distribution properties with respect to small
* moduli (here 1-31) than do other simple hashing functions.
*
* <p>To return an index between 0 and max, we use a cheap
* approximation to a mod operation, that also corrects for bias
* due to non-power-of-2 remaindering (see {@link
* java.util.Random#nextInt}). Bits of the hashcode are masked
* with "nbits", the ceiling power of two of table size (looked up
* in a table packed into three ints). If too large, this is
* retried after rotating the hash by nbits bits, while forcing new
* top bit to 0, which guarantees eventual termination (although
* with a non-random-bias). This requires an average of less than
* 2 tries for all table sizes, and has a maximum 2% difference
* from perfectly uniform slot probabilities when applied to all
* possible hash codes for sizes less than 32.
*
* @return a per-thread-random index, 0 <= index < max
*/
private final int hashIndex() {
long id = Thread.currentThread().getId();
int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
int m = max.get();
int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
((0x000001f8 >>> m) & 2) | // The constants hold
((0xffff00f2 >>> m) & 1)); // a lookup table
int index;
while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on
hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
return index;
}
這裡就是根據目前線程的ID,算一個hash值,然後針對slot最大index值做了一個近似取模的操作來計算slot的下标。
接下來看一下createSlot方法:
private void createSlot(int index) {
// 在同步塊外面建立Slot執行個體,以減小同步塊範圍。
Slot newSlot = new Slot();
Slot[] a = arena;
synchronized (a) {
if (a[index] == null)
a[index] = newSlot;
}
}
再看一下awaitNanos方法:
/**
* 在下标為0的Slot上等待擷取其他線程填充的值。
* 如果在Slot被填充之前逾時或者被中斷,那麼操作失敗。
*/
private Object awaitNanos(Node node, Slot slot, long nanos) {
int spins = TIMED_SPINS;
long lastTime = 0;
Thread w = null;
for (;;) {
Object v = node.get();
if (v != null)
//如果已經被其他線程填充了值,那麼傳回這個值。
return v;
long now = System.nanoTime();
if (w == null)
w = Thread.currentThread();
else
nanos -= now - lastTime;
lastTime = now;
if (nanos > 0) {
if (spins > 0)
--spins; //先自旋幾次。
else if (node.waiter == null)
node.waiter = w; //自旋階段完畢後,将目前線程設定到node的waiter域。
else if (w.isInterrupted())
tryCancel(node, slot); //如果目前線程被中斷,嘗試取消node。
else
LockSupport.parkNanos(node, nanos); //阻塞給定的時間。
}
else if (tryCancel(node, slot) && !w.isInterrupted())
//逾時後,如果目前線程沒有被中斷,那麼從Slot數組的其他位置看看有沒有等待交換資料的節點
return scanOnTimeout(node);
}
}
awaitNanos中的自旋次數為TIMED_SPINS,這裡說明一下自旋次數:
/**
* 單核處理器下這個自旋次數為0
* 多核情況下,這個值設定為大多數系統中上下文切換時間的平均值。
*/
private static final int SPINS = (NCPU == 1) ? 0 : 2000;
/**
* 在有逾時情況下阻塞等待之前自旋的次數。.
* 逾時等待的自旋次數之是以更少,是因為檢測時間也需要耗費時間。
* 這裡的值是一個經驗值。
*/
private static final int TIMED_SPINS = SPINS / 20;
繼續看一下tryCancel方法:
private static boolean tryCancel(Node node, Slot slot) {
if (!node.compareAndSet(null, CANCEL))//嘗試取消node
return false;
if (slot.get() == node) // pre-check to minimize contention
slot.compareAndSet(node, null); //如果還關聯在sot上,斷開關聯。
return true;
}
繼續看awaitNanos方法中最後調用的scanOnTimeout方法,這個方法在要取消的時候調用,找一下其他下标的Slot上有沒有可以交換資料的節點,找到的話就可以成功交換資料,而不取消了:
private Object scanOnTimeout(Node node) {
Object y;
for (int j = arena.length - 1; j >= 0; --j) {
//從Slot數組的後面往前找
Slot slot = arena[j];
if (slot != null) {
//找到了有初始化好的Slot,然後看看裡面有沒有node。
while ((y = slot.get()) != null) {
//發現有node,嘗試和這個node進行資料交換。
if (slot.compareAndSet(y, null)) {
Node you = (Node)y;
//嘗試進行資料交換,
if (you.compareAndSet(null, node.item)) {
//如果交換成功(把目前節點的資料交給you),喚醒you上面等待的線程。
LockSupport.unpark(you.waiter);
//傳回you的資料。
return you.item;
}
}
}
}
}
//沒找到其他等待交換資料的線程,最後取消目前節點node。
return CANCEL;
}
上面看的awaitNanos方法是在下标為0的Slot裡面,有逾時情況下的處理方式。再看下沒有逾時情況的處理方法await:
private static Object await(Node node, Slot slot) {
Thread w = Thread.currentThread();
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
//如果已經被其他線程填充了值,那麼傳回這個值。
return v;
else if (spins > 0) // 先自旋幾次。
--spins;
else if (node.waiter == null) // 自旋階段完畢後,将目前線程設定到node的waiter域。
node.waiter = w;
else if (w.isInterrupted()) // 如果目前線程被中斷,嘗試取消目前node。
tryCancel(node, slot);
else // 否則阻塞目前線程。
LockSupport.park(node);
}
}
之前看的awaitNanos和await方法都是在下标為0的Slot的情況下采取的有阻塞行為的處理方式,如果下标不為0,采取完全自旋的方式,調用方法spinWait:
private static Object spinWait(Node node, Slot slot) {
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
return v;
else if (spins > 0)
--spins; //先自旋
else
tryCancel(node, slot); //自旋了指定的次數還沒等到交換的資料,嘗試取消。
}
}
最後看一下arena(Slot數組),預設的容量和實際使用的下标最大值:
private static final int CAPACITY = 32;
/**
* The value of "max" that will hold all threads without
* contention. When this value is less than CAPACITY, some
* otherwise wasted expansion can be avoided.
*/
private static final int FULL =
Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
前面說過arena容量預設為32,目的是為了減少線程的競争,但實際上對arena的使用不會超過FULL這個值(避免一些空間浪費)。這個值取的是32(預設CAPACITY)和CPU核心數量的一半,這兩個數的較小值在減1的數和0的較大值.... 也就是說,如果CPU核很多的情況下,這個值最大也就是31,;如果是單核或者雙核CPU,這個值就是0,也就是說隻能用arena[0]。這也是為什麼前面的hashIndex方法裡面會做的(近似)取模操作比較複雜,因為實際的能使用的Slot數組範圍可能不是2的幂。 Exchanger的代碼解析完畢!