天天看點

SynchronousQueue源碼閱讀1.資料結構2.方法說明3.總結

文章目錄

  • 1.資料結構
    • 1.等待隊列:waitQueue
      • 1.公平隊列:FifoWaitQueue
      • 2.非公平隊列:LifoWaitQueue
    • 2.資料傳輸方式:Transferer
      • 1.TransferStack
      • 2.TransferQueue
  • 2.方法說明
    • 1.轉換(重點):transfer
      • 1.區分操作模式:mode
      • 2.頭節點和目前是相同模式
      • 3.頭結點和目前目前不是相同模式
      • 4.目前節點是輔助模式(可能從上一步來)
      • 5.圖解
    • 2. 擷取一個元素:take
    • 3.加入隊列:put
  • 3.總結

1.資料結構

1.等待隊列:waitQueue

        waitQueue是一個接口,在這個隊列當中會聲明兩個等待的隊列,分别是生産者和消費者

//生産者隊列
    private WaitQueue waitingProducers;
    //消費者隊列
    private WaitQueue waitingConsumers;
           

1.公平隊列:FifoWaitQueue

        公平隊列對應的是先進先出,對應的資料結構的queue

static class FifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3623113410248163686L;
    }
           

2.非公平隊列:LifoWaitQueue

        非公平隊列對應的是後進先出,對應的資料結構是stack

static class LifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3633113410248163686L;
    }
           

2.資料傳輸方式:Transferer

        資料傳輸方式,這邊定義了一個抽象方法transfer,

Performs a put or take.

,是用來處理拿出和放入操作的.

/**
     * Shared internal API for dual stacks and queues.
     */
    abstract static class Transferer<E> {
        /**
         * Performs a put or take.
         *
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }
           

        這邊分别對應着兩種不同的傳輸方式:TransferStack、TransferQueue

1.TransferStack

        TransferStack包含一個Snode節點,這邊不直接看方法,看到隊列操作之後再根據具體詳細分析

static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
       }
           

2.TransferQueue

        TransferQueue包含一個Qnode節點,這邊不直接看方法,看到隊列操作之後再根據具體詳細分析

TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;
        }
           

2.方法說明

1.轉換(重點):transfer

        在這個類型隊列的所有操作中基本都用到了transfer方法,主要的差別是是否計時,後面會講到。這邊隻檢視stack的實作:

SynchronousQueue源碼閱讀1.資料結構2.方法說明3.總結

        這一段代碼比較長,這邊我們分成四個部分來說,下面是代碼的原貌。後面我們一個一個部分來了解.

E transfer(E e, boolean timed, long nanos) {
            // constructed/reused as needed
            SNode s = null;
            //當e==null的時候說明是消費者進來等待了(REQUEST),否則表示生産者(DATA)生産了等待消費
            int mode = (e == null) ? REQUEST : DATA;

            for (; ; ) {
                //拿到頭結點
                SNode h = head;
                // empty or same-mode
                //如果頭結點為空,或者也是相同的模式(比如都是消費者)
                if (h == null || h.mode == mode) {
                    // can't wait
                    //時間到了
                    if (timed && nanos <= 0) {
                        // 如果頭節點不為空且是取消狀态
                        if (h != null && h.isCancelled()) {
                            // pop cancelled node
                            //把頭結點彈出
                            casHead(h, h.next);
                        } else {
                            return null;
                        }
                    }
                    //cas會進行入棧,這邊是入棧成功了
                    else if (casHead(h, s = snode(s, e, h, mode))) {
                        //睡眠,如果是逾時或者是中斷,會把match替換成this(自己)嘗試取消
                        SNode m = awaitFulfill(s, timed, nanos);
                        //傳回節點的match是自己,取消
                        if (m == s) {
                            // wait was cancelled
                            //清除目前節點
                            clean(s);
                            return null;
                        }
                        //比對到元素:因為從awaitFulfill()裡面出來要不被取消了要不就比對到了
                        //如果頭結點不為空,并且s是頭結點下一個節點,彈出head和s.
                        if ((h = head) != null && h.next == s) {
                            // help s's fulfiller
                            casHead(h, s.next);
                        }
                        //根據模式傳回生産者還是消費者的值
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                    // try to fulfill
                }
                //如果h.mode不是FULFILLING,并且不是相同的模式
                else if (!isFulfilling(h.mode)) {
                    // already cancelled
                    //節點取消
                    if (h.isCancelled()) {
                        // pop and retry
                        //頭結點換成下一個節點
                        casHead(h, h.next);
                    }
                    //這邊應該是頭結點 不是cancelled的,s入棧變成頭結點
                    else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                        // loop until matched or waiters disappear
                        for (; ; ) {
                            // m is s's match
                            SNode m = s.next;
                            // all waiters are gone
                            //如果沒有下一個節點了,清空棧跳出循環繼續往下走
                            if (m == null) {
                                // pop fulfill node
                                casHead(s, null);
                                // use new node next time
                                s = null;
                                // restart main loop
                                break;
                            }
                            //如果頭結點的下一個節點不為空,往再下一個節點找
                            SNode mn = m.next;
                            //這個地方的條件是是兩個可以比對的節點(生産和消費)
                            if (m.tryMatch(s)) {
                                //比對成功,兩個節點都彈出
                                // pop both s and m
                                casHead(s, mn);
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                                // lost match
                            } else {
                                //嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
                                // help unlink
                                s.casNext(m, mn);
                            }
                        }
                    }
                    // help a fulfiller
                }
                //如果不是相同模式,并且目前節點是FULFILLING的情況
                else {
                    //mode在上面隻可能是生産或者消費,這邊的話說明都是FULFILLING的情況
                    // m is h's match
                    SNode m = h.next;
                    // waiter is gone
                    if (m == null) {
                        //頭結點的下一個節點為null
                        // pop fulfilling node
                        casHead(h, null);
                    } else {
                        //否則嘗試比對.
                        SNode mn = m.next;
                        // help match
                        //比對成功,兩個節點都彈出
                        if (m.tryMatch(h)) {
                            // pop both h and m
                            casHead(h, mn);
                        }
                        // lost match
                        else {
                            // help unlink
                            //嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
                            h.casNext(m, mn);
                        }
                    }
                }
            }
        }
           

1.區分操作模式:mode

        這邊說的模式是相對于線程操作來說的,這邊定義的三種模式:REQUEST(消費)、DATA(生産)、FULFILLING(交換),第一部分是怎麼判斷這個模式:

int mode = (e == null) ? REQUEST : DATA;

        這邊的話,根據入參,如果傳入的是null就是要消費,可以看下面的take方法,如果傳入的不是null,就是要生産,可以看下面的put,這個應該很好了解。

2.頭節點和目前是相同模式

        這邊的操作步驟分别如下:

        先判斷是否逾時,如果逾時的話,判斷頭部節點不為空,并且被取消(比對的節點是自己,也就是說沒有比對到可以進行交換的節點),通過cas方式把頭部換成下一個節點。如果頭部節點為空, 說明沒有隊列了,直接傳回null;

        如果沒有逾時一說,先把目前節點添加到棧中,這邊的棧的入棧和以前的入棧可能不一樣(後入的節點添加到head,然後出棧的時候出的也是head,實作後進先出),然後下一步調用awaitFulfill方法,這個方法的作用是找到比對的節點(不同模式),如果沒找到自旋結束挂起等待被喚醒,match如果是自己則說明比對失敗(被cancel)

        往後判斷一下比對的節點是不是自己,如果是則清除目前節點傳回null。否則就是比對到了節點(因為awaitFulfill方法隻有一個出口,出口必定攜帶比對的節點值),比對到之後設定頭部節點,根據mode傳回相應的節點值(item)

//時間到了
         if (timed && nanos <= 0) {
             // 如果頭節點不為空且是取消狀态
             if (h != null && h.isCancelled()) {
                 // pop cancelled node
                 //把頭結點彈出
                 casHead(h, h.next);
             } else {
                 return null;
             }
         }
         //cas會進行入棧,這邊是入棧成功了
         else if (casHead(h, s = snode(s, e, h, mode))) {
             //睡眠,如果是逾時或者是中斷,會把match替換成this(自己)嘗試取消
             SNode m = awaitFulfill(s, timed, nanos);
             //傳回節點的match是自己,取消
             if (m == s) {
                 // wait was cancelled
                 //清除目前節點
                 clean(s);
                 return null;
             }
             //比對到元素:因為從awaitFulfill()裡面出來要不被取消了要不就比對到了
             //如果頭結點不為空,并且s是頭結點下一個節點,彈出head和s.
             if ((h = head) != null && h.next == s) {
                 // help s's fulfiller
                 casHead(h, s.next);
             }
             //根據模式傳回生産者還是消費者的值
             return (E) ((mode == REQUEST) ? m.item : s.item);
         }
         // try to fulfill
           

        看一下awaitFulfill方法:

        這邊先是會計算一下循環的次數

        然後進入死循環,出口隻有一個就是 match!=null的時候,傳回match值。

        如果被中斷和逾時的情況下會調用trycancel方法會将目前節點的match值設定為自己。然後下一個循環開始的時候,match不為null就傳回了

        如果是沒有計時的情況,可以看到

park

的操作,這邊的park是等待下一個操作進來的時候去喚醒這個節點。我們可以往後去看下面的方法

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            //計算循環的次數,這邊會分别根據有計時和沒計時進行判斷
            int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (; ; ) {
                //被中斷,嘗試取消這個節點
                if (w.isInterrupted()){
                    // Tries to cancel a wait by matching node to itself.
                    //1 - 嘗試通過将節點與自身比對來取消等待。
                    s.tryCancel();
                }
                //比對的節點不為空傳回
                SNode m = s.match;
                if (m != null){
                    return m;
                }
                //有計時的操作,到達時間取消節點
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        //2 - 嘗試通過将節點與自身比對來取消等待。
                        s.tryCancel();
                        continue;
                    }
                }

                //還有自旋次數,計算自旋次數
                if (spins > 0){
                    spins = shouldSpin(s) ? (spins - 1) : 0;
                }
                //沒有自旋次數,設定waiter為目前線程
                else if (s.waiter == null){
                    // establish waiter so can park next iter
                    s.waiter = w;
                }
                //沒有自旋次數并且不計數,進行park休眠
                else if (!timed){
                    LockSupport.park(this);
                }
                //如果有計時,則休眠對應時間
                else if (nanos > spinForTimeoutThreshold){
                    LockSupport.parkNanos(this, nanos);
                }
            }
        }
           

3.頭結點和目前目前不是相同模式

        如果頭節點和目前節點不是相同的模式,這邊還有一個條件就是不是輔助模式。也就是說可能進來的節點是消費(take),而頭結點是生産(put),那麼這兩個模式不一樣,正好可以抵消了。是以就進入這個判斷:

        這邊會先判斷節點是否被取消(就是比對的等于自己,這個是操作基本是中斷和逾時導緻),頭結點被取消就往下一個節點走

否則把目前節點添加到頭部(入棧),并且修改狀态為交換,開始死循環

        如果沒有下一個節點,設定頭結點為null,跳出循環結束,下一步就會再走到上面的2.頭節點和目前是相同模式,可能被挂起等待或者逾時結束。

        如果下一個節點不為空的情況,進行tryMatch比對,如果能比對到出棧(這個時候是兩個節點),傳回對應的元素

        如果比對不上,說明可能被另外一個節點比對了,就要往在下一個節點去比對。

// already cancelled
            //節點取消
            if (h.isCancelled()) {
                // pop and retry
                //頭結點換成下一個節點
                casHead(h, h.next);
            }
            //這邊應該是頭結點 不是cancelled的,s入棧變成頭結點
            else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                // loop until matched or waiters disappear
                for (; ; ) {
                    // m is s's match
                    SNode m = s.next;
                    // all waiters are gone
                    //如果沒有下一個節點了,清空棧跳出循環繼續往下走
                    if (m == null) {
                        // pop fulfill node
                        casHead(s, null);
                        // use new node next time
                        s = null;
                        // restart main loop
                        break;
                    }
                    //如果頭結點的下一個節點不為空,往再下一個節點找
                    SNode mn = m.next;
                    //這個地方的條件是是兩個可以比對的節點(生産和消費)
                    if (m.tryMatch(s)) {
                        //比對成功,兩個節點都彈出
                        // pop both s and m
                        casHead(s, mn);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                        // lost match
                    } else {
                        //嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
                        // help unlink
                        s.casNext(m, mn);
                    }
                }
            }
            // help a fulfiller
        }
           

        tryMatch如何進行比對,這邊看一下

        這邊如果比對成功,成功的前提原來的頭結點(目前節點的下一個節點)的match為空(沒有被比對),并且cas操作是從null到目前節點成功,可能失敗,就是在這個過程中被其他節點比對走了,如果成功要喚醒原來的頭結點,因為之前可能被park起來等待,現在match不為空了,可以起來工作了,比對失敗傳回結果

boolean tryMatch(SNode s) {
                //如果m沒有比對這,将s作為他的比對這
                if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    // waiters need at most one unpark
                    //喚醒m中的waiter
                    if (w != null) {
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    // 傳回true
                    return true;
                }
                //可能有其他線程比對了m,傳回是否比對結果
                return match == s;
            }
           

4.目前節點是輔助模式(可能從上一步來)

        因為之前的第二個判斷會把目前節點設值為交換中狀态。這邊的操作相當于第二個判斷之後會進來

        操作和上面的操作很類似,如果head的下一個節點為null,也要回到第一個判斷在進行操作等待。

        否則嘗試比對,成功彈出兩個節點,失敗繼續往下,不過這邊成功沒有傳回的打算,依然會再進入自旋的下一步操作

        這邊可以了解這一步操作是輔助操作,清除比對成功的節點,或者當節點所屬線程消失後将其移除棧。

//mode在上面隻可能是生産或者消費,這邊的話說明都是FULFILLING的情況
                    // m is h's match
                    SNode m = h.next;
                    // waiter is gone
                    if (m == null) {
                        //頭結點的下一個節點為null
                        // pop fulfilling node
                        casHead(h, null);
                    } else {
                        //否則嘗試比對.
                        SNode mn = m.next;
                        // help match
                        //比對成功,兩個節點都彈出
                        if (m.tryMatch(h)) {
                            // pop both h and m
                            casHead(h, mn);
                        }
                        // lost match
                        else {
                            // help unlink
                            //嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
                            h.casNext(m, mn);
                        }
                    }
           

5.圖解

SynchronousQueue源碼閱讀1.資料結構2.方法說明3.總結

2. 擷取一個元素:take

public E take() throws InterruptedException {
        //直接調用transfer方法,不計數
        E e = transferer.transfer(null, false, 0);
        if (e != null){
            return e;
        }
        //如果元素為空,讓線程中斷抛出中斷異常結束.
        Thread.interrupted();
        throw new InterruptedException();
    }
           

        可以看到take這邊是直接傳入null,表明自己是消費者,并且沒有設定逾時時間,一直等待到對應的生産者給定資料傳回

3.加入隊列:put

        put這邊也不設定時間,相當于是等待消費者消費掉目前操作生産的節點.

public void put(E e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }
        //把目前節點添加進去執行transfer操作
        if (transferer.transfer(e, false, 0) == null) {
            //傳回為空就中斷被排除異常
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
           

3.總結

        可以看到雖然說這個隊列内部的容量是0,但是他還是會維護一個隊列,個人感覺這種隊列适合用于短期的任務,生産和消費相近,不會造成大量任務堆積。等後面看完線程池再回來補充吧

繼續閱讀