天天看點

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇

  • 一、什麼是SynchronousQueue?
  • 二、SynchronousQueue類的結構圖
  • 三、SynchronousQueue的小Demo
  • 四、SynchronousQueue源碼分析
    • 1、構造方法
    • 2、put方法
    • 3、take方法
    • 4、棧結構
      • 4.1、常量講解
      • 4.2、TransferStack講解
        • 4.2.1、前期代碼
        • 4.2.2、核心代碼
        • 4.2.3、線程阻塞的實作
        • 4.2.4、不公平政策下隊列圖解
      • 4.3、棧結構小總結
    • 5、隊列結構
      • 5.1、前期代碼
      • 5.2、核心代碼
      • 5.3、線程阻塞的實作
      • 5.4、公平政策下隊列圖解
      • 5.5、出隊代碼
      • 5.5、隊列結構總結

CSDN部落格位址:https://blog.csdn.net/liuyu973971883

文章來源:轉載,原文位址:https://blog.csdn.net/weixin_41622183/article/details/89283085,感謝這位老哥的辛勤付出,寫的非常棒,各位看完别忘了給這位老哥點個贊啊。如有侵權,請聯系删除。

一、什麼是SynchronousQueue?

  • SynchronousQueue作為阻塞隊列的時候,對于每一個take的線程會阻塞直到有一個put的線程放入元素為止,反之亦然。
  • 在SynchronousQueue内部沒有任何存放元素的能力,可以了解為容量為 0。是以類似peek操作或者疊代器操作也是無效的,元素隻能通過put類操作或者take類操作才有效。
  • SynchronousQueue支援支援生産者和消費者等待的公平性政策。預設情況下:非公平。
  • 如果是公平鎖的話可以保證目前第一個隊首的線程是等待時間最長的線程,這時可以視SynchronousQueue為一個FIFO隊列。
  • SynchronousQueue 提供兩種實作方式,分别是棧和隊列的方式實作。這兩種實作方式中,棧是屬于非公平的政策,隊列是屬于公平政策。

二、SynchronousQueue類的結構圖

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

三、SynchronousQueue的小Demo

public class TestSynchronousQueue {
    public static void main(String[] args) throws Exception {
        //使用非公平政策
//        SynchronousQueue synchronousQueue= new SynchronousQueue();
        //使用公平政策
        SynchronousQueue synchronousQueue= new SynchronousQueue(true);
        new Thread(()-> {
            try {
                synchronousQueue.put("A");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //休眠一下,讓異步線程完成
        Thread.sleep(1000);
        new Thread(()-> {
            try {
                synchronousQueue.put("B");
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }).start();
        //休眠一下,讓異步線程完成
        Thread.sleep(1000);
        new Thread(()-> {
            try {
                Object take = synchronousQueue.take();
                System.out.println(take);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }).start();
        //休眠一下,讓異步線程完成
        Thread.sleep(1000);
        //不管如何輸出,都是 0
        System.out.println(synchronousQueue.size());
    }
}
           

公平政策結果:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

非公平政策結果:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

為什麼會出現這種情況?我們這裡先不解釋,先繼續學習。

四、SynchronousQueue源碼分析

1、構造方法

//預設構造,false 為非公平政策
public SynchronousQueue() {
    this(false);
}
//可選政策。可以看出使用的是不同形式的實作。
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
           

2、put方法

入隊方法,該方法為阻塞方法
public void put(E e) throws InterruptedException {
	//入隊時判斷是否傳入元素為空
    if (e == null) throw new NullPointerException();
    //入隊
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}
           

3、take方法

出隊方法,該方法為阻塞方法
public E take() throws InterruptedException {
	//出隊穿個 null 就說明這個是出隊
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
           

4、棧結構

4.1、常量講解

  • REQUEST:表示這是個請求節點,從隊列中取資料的辨別(方法有 take,poll)
  • DATA:表示這個是資料節點,插入資料到隊列中的辨別(方法有 offer,put)
  • FULFILLING:這個表示配對成功,隻有一消費者和生産者進行配對成功後,才會更改為該狀态
/** 表示這是個請求節點 */
static final int REQUEST    = 0;
/** 資料節點 */
static final int DATA       = 1;
/** 比對成功後設定的節點 */
static final int FULFILLING = 2;
           

4.2、TransferStack講解

4.2.1、前期代碼

static final class TransferStack<E> extends Transferer<E> {
      	/** 表示這是個請求節點 */
        static final int REQUEST    = 0;
        /** 資料節點 */
        static final int DATA       = 1;
        /** 比對成功後設定的節點 */
        static final int FULFILLING = 2;
        
        /** 内部維護的 SNode 類 */
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;  //0表示請求節點,1表示資料節點
            SNode(Object item) {
                this.item = item;
            }
        //棧頂部指針
        volatile SNode head;
           

4.2.2、核心代碼

由于入隊出隊沒太大差別。代碼都是調用同一個方法,唯一的不同是傳入的值是否為空。下面看看該 transfer 方法:

E transfer(E e, boolean timed, long nanos) {
	//空節點
	SNode s = null; // constructed/reused as needed
	//判斷是請求節點還是資料節點
	int mode = (e == null) ? REQUEST : DATA;
	
	for (;;) {
		//拿到頭指針
	    SNode h = head;
	    //頭指針為空,h.mode 預設是0,判斷是否一緻
	    if (h == null || h.mode == mode) {  // empty or same-mode
	    	//判斷是否為有設定逾時時間
	        if (timed && nanos <= 0) {      // can't wait
	        	//
	            if (h != null && h.isCancelled())
	                casHead(h, h.next);     // pop cancelled node
	            else
	                return null;
	        // 将目前節點壓入棧
	        } else if (casHead(h, s = snode(s, e, h, mode))) {
	        	//進行線程堵塞
	            SNode m = awaitFulfill(s, timed, nanos);
	            if (m == s) {               // wait was cancelled
	            	//清除該節點
	                clean(s);
	                return null;
	            }
	            /**
	            * 線程池被喚醒後,這裡需要 cas 設定一下頭指針,配合出隊線程
	            * 這裡難了解,下面會用圖解的形式解析
	            */
	            if ((h = head) != null && h.next == s)
	                casHead(h, s.next);     // help s's fulfiller
	            //傳回配對的值
	            return (E) ((mode == REQUEST) ? m.item : s.item);
	        }
	     /**
	     * 這裡表示的是出隊操作
	     * 如果是入隊操作走到這一步,說明壓棧失敗,繼續 CAS 吧
	     */
	    } else if (!isFulfilling(h.mode)) { // try to fulfill
	    	//判斷線程是否中斷
	        if (h.isCancelled())            // already cancelled
	            casHead(h, h.next);         // pop and retry
	        //将目前節點壓入棧
	        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
	            for (;;) { // loop until matched or waiters disappear
	            	//拿到原來的棧頭指針指向的節點
	                SNode m = s.next;       // m is s's match
	                //如果為空,說明被其他線程搶走了,重新 CAS 吧
	                if (m == null) {        // all waiters are gone
	                    casHead(s, null);   // pop fulfill node
	                    s = null;           // use new node next time
	                    break;              // restart main loop
	                }
	                //這裡拿到配對的資料節點
	                SNode mn = m.next;
	                //嘗試去比對
	                if (m.tryMatch(s)) {
	                    casHead(s, mn);     // pop both s and m
	                    //比對成功傳回資料節點的值
	                    return (E) ((mode == REQUEST) ? m.item : s.item);
	               //比對失敗
	                } else                  // lost match
	                    s.casNext(m, mn);   // help unlink
	            }
	        }
	    //有其他線程在配對
	    } else {                            // help a fulfiller
	        SNode m = h.next;               // m is h's match
	        if (m == null)                  // waiter is gone
	            casHead(h, null);           // pop fulfilling node
	        else {
	            SNode mn = m.next;	
	            //嘗試和其它線程競争比對		
	            if (m.tryMatch(h))          // help match
	            //配對成功就一起離開
	                casHead(h, mn);         // pop both h and m
	            else                        // lost match
	            	//比對失敗,肯定要擦屁股,将連結置空吧
	                h.casNext(m, mn);       // help unlink
	        }
	    }
	}
}
           

4.2.3、線程阻塞的實作

SNode awaitFulfill(SNode s, boolean timed, long nanos) {

	final long deadline = timed ? System.nanoTime() + nanos : 0L;
	Thread w = Thread.currentThread();
	//計算自旋次數
	int spins = (shouldSpin(s) ?
	             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
	for (;;) {
		//判斷是否中斷
	    if (w.isInterrupted())
	        s.tryCancel();
	    //拿到比對的節點
	    SNode m = s.match;
	    if (m != null)
	        return m;
	    if (timed) {
	        nanos = deadline - System.nanoTime();
	        if (nanos <= 0L) {
	            s.tryCancel();
	            continue;
	        }
	    }
	    //判斷自旋次數是否大于 0 了
	    if (spins > 0)
	        spins = shouldSpin(s) ? (spins-1) : 0;
	    //如果沒有比對的節點,就儲存目前阻塞的線程
	    else if (s.waiter == null)
	        s.waiter = w; // establish waiter so can park next iter
	    else if (!timed)
	    	//阻塞目前線程
	        LockSupport.park(this);
	    else if (nanos > spinForTimeoutThreshold)
	        LockSupport.parkNanos(this, nanos);
	}
}
           

4.2.4、不公平政策下隊列圖解

隊列的初始狀态:初始狀态下,頭部指針指向空。

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

假設來了一條線程 A,資料也是 A,頭指針指向位置為空且我們使用是不會逾時的 put 方法,那麼會将資料壓入棧中,并将指針指向棧頂,并将線程阻塞,如下圖:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

接下來再來一條線程 B,帶的資料也是 B。那麼還是會将資料壓入棧中,并将該節點的 next 節點指向最先入棧的節點,并将頭指針指向棧頂(也就是我們的資料 B),最後将線程阻塞。

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

如果我們在來一條線程 C,C 是出隊線程。操作是一樣的,這時,我們會将該取出節點壓入棧中,将頭指針指向出隊的那個節點。結果圖如下:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

接下來,就是線程 C 要進行比對了。這時,非公平政策的 坑爹之處 就出來了。它會和待比對的線程進行比對,但是我們知道棧的資料結構是先進後出,是以它就找了資料 B,為比對對象,喚醒線程 B,線程 B 喚醒後繼續走如下代碼:

if ((h = head) != null && h.next == s)

   casHead(h, s.next); // help s’s fulfiller

   return (E) ((mode == REQUEST) ? m.item : s.item);

而出隊的線程 C 則走如下代碼:

if (m.tryMatch(s)) {

   casHead(s, mn); // 将兩個節點彈出棧

   return (E) ((mode == REQUEST) ? m.item : s.item);

}

線程 C 拿到資料直接傳回,線程 B 也直接傳回。上述中 **casHead(s, mn); ** 就将頭指針指向棧頂,也是就我們的線程 A。而線程 B 和線程 C 就 攜手雙飛 了,一起彈出棧。最後資料結果如下:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

4.3、棧結構小總結

隊列中非公平政策坑爹有一點,假設沒有和它配對,或者每一次來一個配對對象時,都被另一個節點搶了,那就很悲催,一直呆在最底層沒人要。

5、隊列結構

5.1、前期代碼

static final class TransferQueue<E> extends Transferer<E> {
    /** Node class for TransferQueue. */
    static final class QNode {
    	//下一個節點
        volatile QNode next;          // next node in queue
        //資料
        volatile Object item;         // CAS'ed to or from null
        //等待線程
        volatile Thread waiter;       // to control park/unpark
        //判斷資料類型
        final boolean isData;

        QNode(Object item, boolean isData) {
            this.item = item;
            this.isData = isData;
        }

    /** 頭指針 */
    transient volatile QNode head;
    /** 尾指針 */
    transient volatile QNode tail;

    transient volatile QNode cleanMe;
	//初始化時就建構了一個空節點
    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }
           

5.2、核心代碼

隊列結構的實作方式和棧結構的有很大不同,下面我們來看看具體實作

E transfer(E e, boolean timed, long nanos) {
	
	QNode s = null; // constructed/reused as needed
	boolean isData = (e != null);
	
	for (;;) {
		//尾節點
	    QNode t = tail;
	    //頭結點
	    QNode h = head;
	    //沒有初始化時都為空
	    if (t == null || h == null)         // saw uninitialized value
	        continue;                       // spin
		//判斷節點類型,頭節點為空或是入隊類型才可入
	    if (h == t || t.isData == isData) { // empty or same-mode
	        QNode tn = t.next;
	        //判斷尾節點是否有改變(可能有其他線程操作)
	        if (t != tail)                  // inconsistent read
	            continue;
	        //判斷是否有其他線程添加了新節點
	        if (tn != null) {               // lagging tail
	        	//如果有就要設定一下尾節點指針
	            advanceTail(t, tn);
	            continue;
	        }
	        //時間的,這裡可以不理會
	        if (timed && nanos <= 0)        // can't wait
	            return null;
	        //建構一個節點
	        if (s == null)
	            s = new QNode(e, isData);
	        //将節點加入隊列中
	        if (!t.casNext(null, s))        // failed to link in
	            continue;
			//設定尾節點
	        advanceTail(t, s);              // swing tail and wait
	        //線程阻塞
	        Object x = awaitFulfill(s, e, timed, nanos);
	        //判斷線程有沒被中斷,中斷就清除節點
	        if (x == s) {                   // wait was cancelled
	            clean(t, s);
	            return null;
	        }
			//判斷節點是否已經離開隊列
	        if (!s.isOffList()) {           // not already unlinked
	        	//設定頭指針
	            advanceHead(t, s);          // unlink if head
	            if (x != null)              // and forget fields
	                s.item = s;
	            //阻塞線程置空
	            s.waiter = null;
	        }
	        //傳回存儲的值
	        return (x != null) ? (E)x : e;
		//這裡就是出隊操作
	    } else {                            // complementary-mode
	        QNode m = h.next;               // node to fulfill
	        if (t != tail || m == null || h != head)
	            continue;                   // inconsistent read
			//拿到頭節點下的下一個節點
	        Object x = m.item;
	        //判斷是否被另一個線程比對過了
	        if (isData == (x != null) ||    // m already fulfilled
	            x == m ||                   // m cancelled
	            !m.casItem(x, e)) {         // lost CAS
	            advanceHead(h, m);          // dequeue and retry
	            continue;
	        }
			//比對成功就重新設定頭指針
	        advanceHead(h, m);              // successfully fulfilled
	        //線程喚醒
	        LockSupport.unpark(m.waiter);
	        //傳回值
	        return (x != null) ? (E)x : e;
	    }
	}
}
           

5.3、線程阻塞的實作

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
	/* Same idea as TransferStack.awaitFulfill */
	final long deadline = timed ? System.nanoTime() + nanos : 0L;
	Thread w = Thread.currentThread();
	//計算要自旋的次數
	int spins = ((head.next == s) ?
	             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
	for (;;) {
	    if (w.isInterrupted())
	        s.tryCancel(e);
	    Object x = s.item;
	    if (x != e)
	        return x;
	    if (timed) {
	        nanos = deadline - System.nanoTime();
	        if (nanos <= 0L) {
	            s.tryCancel(e);
	            continue;
	        }
	    }
	    //計算自旋次數
	    if (spins > 0)
	        --spins;
	        //儲存阻塞線程
	    else if (s.waiter == null)
	        s.waiter = w;
	    else if (!timed)
	    	//線程阻塞
	        LockSupport.park(this);
	    else if (nanos > spinForTimeoutThreshold)
	        LockSupport.parkNanos(this, nanos);
	}
}
           

5.4、公平政策下隊列圖解

首先,我們在 new SynchronousQueue 隊列使用公平政策的時候就已經建構了一個空節點。

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

對應的代碼如下:

TransferQueue() {

  QNode h = new QNode(null, false); // initialize to dummy node.

  head = h;

  tail = h;

}

接下來我們來了線程 A,線程 A 攜帶的資料也是 A。這時我們調用 put 方法入隊,經過一大堆的判斷,我們将資料入隊成功,将尾節點指針指向資料 A 的節點,并且将線程阻塞,等待被消費,結果如下:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

我們可以看到頭節點一直為一個空節點,目的是為了等待消費線程進來後,和最先入隊的元素進行比對。接下來線程 B 帶着資料 B 又進來了,經過一大堆的判斷,我們将資料入隊成功,資料 A 的 next 指向資料 B,将尾節點指針指向資料 B 的節點,并且将線程阻塞,等待被消費,結果如下:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

這樣,我們就入隊了兩個節點了,接下來來了一條出隊線程 C,C 來了卻不是入隊了,而是找最先入隊的那個節點。找到入隊節點後,會嘗試進行配對,假設配對成功,那會将配對成功的線程 A 出隊,傳回資料 A,被喚醒的線程 A 會繼續走餘下的代碼,上面解釋過,原理差不多就不解釋了。最後結果圖如下:

Java線程池隊列SynchronousQueue的詳細原理分析-劉宇一、什麼是SynchronousQueue?二、SynchronousQueue類的結構圖三、SynchronousQueue的小Demo四、SynchronousQueue源碼分析

5.5、出隊代碼

{                            // complementary-mode
    QNode m = h.next;               // node to fulfill
    if (t != tail || m == null || h != head)
        continue;                   // inconsistent read
	//拿到頭節點下的下一個節點
    Object x = m.item;
    //判斷是否被另一個線程比對過了
    if (isData == (x != null) ||    // m already fulfilled
        x == m ||                   // m cancelled
        !m.casItem(x, e)) {         // lost CAS
        advanceHead(h, m);          // dequeue and retry
        continue;
    }
	//比對成功就重新設定頭指針
    advanceHead(h, m);              // successfully fulfilled
    //線程喚醒
    LockSupport.unpark(m.waiter);
    //傳回值
    return (x != null) ? (E)x : e;
}
           

5.5、隊列結構總結

隊列結構和棧結構是類似的,都是需要配對進行離開,基本原理差不多。不同之處就是現進先出和先進後出的問題。