天天看點

Jdk1.6 JUC源碼解析(27)-Exchanger

Jdk1.6 JUC源碼解析(27)-Exchanger

作者:大飛

功能簡介:

  • Exchanger是一種線程間安全交換資料的機制。可以和之前分析過的SynchronousQueue對比一下:線程A通過SynchronousQueue将資料a交給線程B;線程A通過Exchanger和線程B交換資料,線程A把資料a交給線程B,同時線程B把資料b交給線程A。可見,SynchronousQueue是交給一個資料,Exchanger是交換兩個資料。

  源碼分析:

  • 先看下内部結構:
private static final class Node extends AtomicReference<Object> {
        /** 建立這個節點的線程提供的用于交換的資料。 */
        public final Object item;
        /** 等待喚醒的線程 */
        public volatile Thread waiter;
        /**
         * Creates node with given item and empty hole.
         * @param item the item
         */
        public Node(Object item) {
            this.item = item;
        }
    }

    /**
     * 一個Slot就是一對線程交換資料的地方。
     * 這裡對Slot做了緩存行填充,能夠避免僞共享問題。
     * 雖然填充導緻浪費了一些空間,但Slot是按需建立,一般沒什麼問題。
     */
    private static final class Slot extends AtomicReference<Object> {
        // Improve likelihood of isolation on <= 64 byte cache lines
        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
    }

    /**
     * Slot數組,在需要時才進行初始化。
     * 用volatile修飾,因為這樣可以安全的使用雙重鎖檢測方式建構。
     */
    private volatile Slot[] arena = new Slot[CAPACITY];
    /**
     * arena(Slot數組)的容量。設定這個值用來避免競争。
     */
    private static final int CAPACITY = 32;
    /**
     * 正在使用的slot下标的最大值。當一個線程經曆了多次CAS競争後,
     * 這個值會遞增;當一個線程自旋等待逾時後,這個值會遞減。
     */
    private final AtomicInteger max = new AtomicInteger();
           

       内部結構很清晰,首先内部包含一個Slot數組,預設容量是32,用來避免以一些競争,有點類似于ConcurrentHashMap的政策;其次,交換資料的場所就是Slot,它本身進行了cache line填充,避免了僞共享問題;最後,每個要進行資料交換的線程在内部會用一個Node來表示。        僞共享說明:假設一個類的兩個互相獨立的屬性a和b在記憶體位址上是連續的(比如FIFO隊列的頭尾指針),那麼它們通常會被加載到相同的cpu cache line裡面。并發情況下,如果一個線程修改了a,會導緻整個cache line失效(包括b),這時另一個線程來讀b,就需要從記憶體裡再次加載了,這種多線程頻繁修改ab的情況下,雖然a和b看似獨立,但它們會互相幹擾,非常影響性能。  

  • 看完了内部結構,接下來就從Exchanger的交換資料方法exchange入手來分析代碼:
/**
     * 等待其他線程到達交換點,然後與其進行資料交換。
     *
     * 如果其他線程到來,那麼交換資料,傳回。
     *
     * 如果其他線程未到來,那麼目前線程等待,知道如下情況發生:
     *   1.有其他線程來進行資料交換。
     *   2.目前線程被中斷。
     */
    public V exchange(V x) throws InterruptedException {
        if (!Thread.interrupted()) {//檢測目前線程是否被中斷。
            //進行資料交換。
            Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
            if (v == NULL_ITEM)
                return null; //檢測結果是否為null。
            if (v != CANCEL) //檢測是否被取消。
                return (V)v;
            Thread.interrupted(); // 清除中斷标記。
        }
        throw new InterruptedException();
    }
    /**
     * 等待其他線程到達交換點,然後與其進行資料交換。
     * 
     * 如果其他線程到來,那麼交換資料,傳回。
     * 
     * 如果其他線程未到來,那麼目前線程等待,知道如下情況發生:
     *   1.有其他線程來進行資料交換。
     *   2.目前線程被中斷。
     *   3.逾時。
     */
    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        if (!Thread.interrupted()) {
            Object v = doExchange(x == null? NULL_ITEM : x,
                                  true, unit.toNanos(timeout));
            if (v == NULL_ITEM)
                return null;
            if (v != CANCEL)
                return (V)v;
            if (!Thread.interrupted())
                throw new TimeoutException();
        }
        throw new InterruptedException();
    }
           

       上面的方法都調用了doExchange方法,主要邏輯在這個方法裡,分析下這個方法: 

/**
     * 這個方法會處理不同的情況,使用Object而不是泛型,主要是為了傳回一些
     * 哨兵值(比如表示null和取消的對象)。
     *
     * @param item 用來進行交換的資料。
     * @param timed 如果有逾時延遲,設定為true
     * @param nanos 具體的逾時時間。
     * @return 傳回另一個線程(與目前線程交換資料)的資料,或者CANCEL(表示取消)
     */
    private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);                 // 建立目前節點me。
        int index = hashIndex();                  // 計算出目前slot的下标。
        int fails = 0;                            // 用來儲存CAS失敗的次數。
        for (;;) {
            Object y;                             // 用來儲存目前slot中可能存在的Node。
            Slot slot = arena[index];             // 按照前面計算出的下标擷取目前slot。
            if (slot == null)                     
                createSlot(index);                // 如果slot為null,那麼建立一個slot,然後繼續循環。
            else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { // 如果slot不為空,那麼slot可能被另一個Node給占了,如果确實存在這個Node,嘗試将其置空。(表示目前節點要和這個Node交換資料了)
                Node you = (Node)y;               // 給這個Node轉型,賦給you。
                if (you.compareAndSet(null, item)) { // 将item設定給you,注意you本身是一個AtomicReference,這裡相當于把item設定到you的value字段上。
                    LockSupport.unpark(you.waiter); // 然後喚醒you節點上等待的線程。
                    return you.item;                // 傳回you的item。
                }                                 // 競争失敗,放棄,繼續循環。
            }
            else if (y == null &&                 // 如果slot為空,那麼說明沒有要和目前線程交換資料的線程,
                     slot.compareAndSet(null, me)) { //那麼目前線程先嘗試把這個slot給占了。
                if (index == 0)                   // 如果slot下标為0,那麼阻塞等待。
                    return timed? awaitNanos(me, slot, nanos): await(me, slot); // 有逾時的話,會阻塞給定的時間。
                Object v = spinWait(me, slot);    // 如果slot下标不是0,自旋等待,等待其他線程來和目前線程交換資料,然後傳回交換後的資料。
                if (v != CANCEL)
                    return v;
                me = new Node(item);              // 如果取消的話,重試,重建一個Node,之前的Node就丢棄了。
                int m = max.get();                // 擷取目前slot下标的最大值。
                if (m > (index >>>= 1))           // 如果目前允許的最大索引太大。
                    max.compareAndSet(m, m - 1);  // 遞減最大索引
            }
            else if (++fails > 1) {               // 如果1個slot競争失敗超過2次。
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) //如果競争失敗超過3次,嘗試遞增最大索引值。
                    index = m + 1;                // 增加索引值。
                else if (--index < 0)             // 換個index。
                    index = m;                    // 繞回邏輯,防止index越界。
            }
        }
    }
           

       這裡形象的了解一下:               其實就是"我"和"你"(可能有多個"我",多個"你")在一個叫Slot的地方做交易(一手交錢,一手交貨),過程分以下步驟:               1.我到交易地點(Slot)的時候,你已經到了,那我就嘗試喊你交易,如果你回應了我,決定和我交易那麼進入第2步;如果别人搶先一步把你喊走了,那我隻能再找别人了,進入第5步。               2.我拿出錢交給你,你可能會接收我的錢,然後把貨給我,交易結束;也可能嫌我掏錢太慢(逾時)或者接個電話(中斷),TM的不賣了,走了,那我隻能再找别人買貨了(從頭開始)。               3.我到交易地點的時候,你不在,那我先嘗試把這個交易點給占了(一屁股做凳子上...),如果我成功搶占了單間(交易點),那就坐這兒等着你拿貨來交易,進入第4步;如果被别人搶座了,那我隻能在找别的地方兒了,進入第5步。               4.你拿着貨來了,喊我交易,然後完成交易;也可能我等了好長時間你都沒來,我不等了,繼續找别人交易去,走的時候我看了一眼,一共沒多少人,弄了這麼多單間(交易地點Slot),太TM浪費了,我喊來交易地點管理者:一共也沒幾個人,搞這麼多單間兒幹毛,給哥撤一個!。然後再找别人買貨(從頭開始);或者我老大給我打了個電話,不讓我買貨了(中斷)。               5.如果之前我嘗試交易了2次都沒成功,那我就想我TM選的這個位置(Slot下标)是不是風水不好啊,換個地兒繼續(從頭開始);如果之前都嘗試交易了4次還沒成功,我怒了,喊過來交易地點的管理者:給哥再開一個單間(Slot),加一個凳子,這麼多人就這麼幾個破凳子夠誰用!  

       看一下doExchange調用的計算slot下标的方法:

/**
     * Returns a hash index for the current thread.  Uses a one-step
     * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
     * based on the current thread's Thread.getId().  These hash codes
     * have more uniform distribution properties with respect to small
     * moduli (here 1-31) than do other simple hashing functions.
     *
     * <p>To return an index between 0 and max, we use a cheap
     * approximation to a mod operation, that also corrects for bias
     * due to non-power-of-2 remaindering (see {@link
     * java.util.Random#nextInt}).  Bits of the hashcode are masked
     * with "nbits", the ceiling power of two of table size (looked up
     * in a table packed into three ints).  If too large, this is
     * retried after rotating the hash by nbits bits, while forcing new
     * top bit to 0, which guarantees eventual termination (although
     * with a non-random-bias).  This requires an average of less than
     * 2 tries for all table sizes, and has a maximum 2% difference
     * from perfectly uniform slot probabilities when applied to all
     * possible hash codes for sizes less than 32.
     *
     * @return a per-thread-random index, 0 <= index < max
     */
    private final int hashIndex() {
        long id = Thread.currentThread().getId();
        int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
        int m = max.get();
        int nbits = (((0xfffffc00  >> m) & 4) | // Compute ceil(log2(m+1))
                     ((0x000001f8 >>> m) & 2) | // The constants hold
                     ((0xffff00f2 >>> m) & 1)); // a lookup table
        int index;
        while ((index = hash & ((1 << nbits) - 1)) > m)       // May retry on
            hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
        return index;
    }
           

       這裡就是根據目前線程的ID,算一個hash值,然後針對slot最大index值做了一個近似取模的操作來計算slot的下标。  

       接下來看一下createSlot方法:  

private void createSlot(int index) {
        // 在同步塊外面建立Slot執行個體,以減小同步塊範圍。
        Slot newSlot = new Slot();
        Slot[] a = arena;
        synchronized (a) {
            if (a[index] == null)
                a[index] = newSlot;
        }
    }
           

       再看一下awaitNanos方法:

/**
     * 在下标為0的Slot上等待擷取其他線程填充的值。
     * 如果在Slot被填充之前逾時或者被中斷,那麼操作失敗。
     */
    private Object awaitNanos(Node node, Slot slot, long nanos) {
        int spins = TIMED_SPINS;
        long lastTime = 0;
        Thread w = null;
        for (;;) {
            Object v = node.get();
            if (v != null)
                //如果已經被其他線程填充了值,那麼傳回這個值。
                return v;
            long now = System.nanoTime();
            if (w == null)
                w = Thread.currentThread();
            else
                nanos -= now - lastTime;
            lastTime = now;
            if (nanos > 0) {
                if (spins > 0)
                    --spins; //先自旋幾次。
                else if (node.waiter == null)
                    node.waiter = w; //自旋階段完畢後,将目前線程設定到node的waiter域。
                else if (w.isInterrupted())
                    tryCancel(node, slot); //如果目前線程被中斷,嘗試取消node。
                else
                    LockSupport.parkNanos(node, nanos); //阻塞給定的時間。
            }
            else if (tryCancel(node, slot) && !w.isInterrupted())
                //逾時後,如果目前線程沒有被中斷,那麼從Slot數組的其他位置看看有沒有等待交換資料的節點
                return scanOnTimeout(node);
        }
    }
           

       awaitNanos中的自旋次數為TIMED_SPINS,這裡說明一下自旋次數: 

/**
     * 單核處理器下這個自旋次數為0
     * 多核情況下,這個值設定為大多數系統中上下文切換時間的平均值。
     */
    private static final int SPINS = (NCPU == 1) ? 0 : 2000;
    /**
     * 在有逾時情況下阻塞等待之前自旋的次數。.
     * 逾時等待的自旋次數之是以更少,是因為檢測時間也需要耗費時間。
     * 這裡的值是一個經驗值。
     */
    private static final int TIMED_SPINS = SPINS / 20;
           

         繼續看一下tryCancel方法:

private static boolean tryCancel(Node node, Slot slot) {
        if (!node.compareAndSet(null, CANCEL))//嘗試取消node
            return false;
        if (slot.get() == node) // pre-check to minimize contention
            slot.compareAndSet(node, null); //如果還關聯在sot上,斷開關聯。
        return true;
    }
           

         繼續看awaitNanos方法中最後調用的scanOnTimeout方法,這個方法在要取消的時候調用,找一下其他下标的Slot上有沒有可以交換資料的節點,找到的話就可以成功交換資料,而不取消了:

private Object scanOnTimeout(Node node) {
        Object y;
        for (int j = arena.length - 1; j >= 0; --j) {
            //從Slot數組的後面往前找
            Slot slot = arena[j];
            if (slot != null) {
                //找到了有初始化好的Slot,然後看看裡面有沒有node。
                while ((y = slot.get()) != null) {
                    //發現有node,嘗試和這個node進行資料交換。
                    if (slot.compareAndSet(y, null)) {
                        Node you = (Node)y;
                        //嘗試進行資料交換,
                        if (you.compareAndSet(null, node.item)) {
                            //如果交換成功(把目前節點的資料交給you),喚醒you上面等待的線程。
                            LockSupport.unpark(you.waiter);
                            //傳回you的資料。
                            return you.item;
                        }
                    }
                }
            }
        }
        //沒找到其他等待交換資料的線程,最後取消目前節點node。
        return CANCEL;
    }
           

       上面看的awaitNanos方法是在下标為0的Slot裡面,有逾時情況下的處理方式。再看下沒有逾時情況的處理方法await:

private static Object await(Node node, Slot slot) {
        Thread w = Thread.currentThread();
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                //如果已經被其他線程填充了值,那麼傳回這個值。
                return v;
            else if (spins > 0)                 // 先自旋幾次。
                --spins;
            else if (node.waiter == null)       // 自旋階段完畢後,将目前線程設定到node的waiter域。
                node.waiter = w;
            else if (w.isInterrupted())         // 如果目前線程被中斷,嘗試取消目前node。
                tryCancel(node, slot);
            else                                // 否則阻塞目前線程。
                LockSupport.park(node);
        }
    }
           

      之前看的awaitNanos和await方法都是在下标為0的Slot的情況下采取的有阻塞行為的處理方式,如果下标不為0,采取完全自旋的方式,調用方法spinWait:

private static Object spinWait(Node node, Slot slot) {
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            else if (spins > 0)
                --spins; //先自旋
            else
                tryCancel(node, slot); //自旋了指定的次數還沒等到交換的資料,嘗試取消。
        }
    }
           

      最後看一下arena(Slot數組),預設的容量和實際使用的下标最大值:

private static final int CAPACITY = 32;
    /**
     * The value of "max" that will hold all threads without
     * contention.  When this value is less than CAPACITY, some
     * otherwise wasted expansion can be avoided.
     */
    private static final int FULL =
        Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
           

        前面說過arena容量預設為32,目的是為了減少線程的競争,但實際上對arena的使用不會超過FULL這個值(避免一些空間浪費)。這個值取的是32(預設CAPACITY)和CPU核心數量的一半,這兩個數的較小值在減1的數和0的較大值.... 也就是說,如果CPU核很多的情況下,這個值最大也就是31,;如果是單核或者雙核CPU,這個值就是0,也就是說隻能用arena[0]。這也是為什麼前面的hashIndex方法裡面會做的(近似)取模操作比較複雜,因為實際的能使用的Slot數組範圍可能不是2的幂。          Exchanger的代碼解析完畢!