天天看點

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

一、SynchronousQueue簡介

  Java 6的并發程式設計包中的SynchronousQueue是一個沒有資料緩沖的BlockingQueue,生産者線程對其的插入操作put必須等待消費者的移除操作take,反過來也一樣。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并沒有資料緩存空間,你不能調用peek()方法來看隊列中是否有資料元素,因為資料元素隻有當你試着取走的時候才可能存在,不取走而隻想偷窺一下是不行的,當然周遊這個隊列的操作也是不允許的。隊列頭元素是第一個排隊要插入資料的線程,而不是要交換的資料。資料是在配對的生産者和消費者線程之間直接傳遞的,并不會将資料緩沖資料到隊列中。可以這樣來了解:生産者和消費者互相等待對方,握手,然後一起離開。

特點: 

1、不能在同步隊列上進行 peek,因為僅在試圖要取得元素時,該元素才存在; 

2、除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)添加元素;也不能疊代隊列,因為其中沒有元素可用于疊代。隊列的頭是嘗試添加到隊列中的首個已排隊線程元素; 如果沒有已排隊線程,則不添加元素并且頭為 null。 

3、對于其他 Collection 方法(例如 contains),SynchronousQueue 作為一個空集合。此隊列不允許 null 元素。

4、它非常适合于傳遞性設計,在這種設計中,在一個線程中運作的對象要将某些資訊、事件或任務傳遞給在另一個線程中運作的對象,它就必須與該對象同步。 

5、對于正在等待的生産者和使用者線程而言,此類支援可選的公平排序政策。預設情況下不保證這種排序。 但是,使用公平設定為 true 所構造的隊列可保證線程以 FIFO 的順序進行通路。 公平通常會降低吞吐量,但是可以減小可變性并避免得不到服務。 

6、SynchronousQueue的以下方法: 

    * iterator() 永遠傳回空,因為裡面沒東西。 

    * peek() 永遠傳回null。 

    * put() 往queue放進去一個element以後就一直wait直到有其他thread進來把這個element取走。 

    * offer() 往queue裡放一個element後立即傳回,如果碰巧這個element被另一個thread取走了,offer方法傳回true,認為offer成功;否則傳回false。 

    * offer(2000, TimeUnit.SECONDS) 往queue裡放一個element但是等待指定的時間後才傳回,傳回的邏輯和offer()方法一樣。 

    * take() 取出并且remove掉queue裡的element(認為是在queue裡的。。。),取不到東西他會一直等。 

    * poll() 取出并且remove掉queue裡的element(認為是在queue裡的。。。),隻有到碰巧另外一個線程正在往queue裡offer資料或者put資料的時候,該方法才會取到東西。否則立即傳回null。 

    * poll(2000, TimeUnit.SECONDS) 等待指定的時間然後取出并且remove掉queue裡的element,其實就是再等其他的thread來往裡塞。 

    * isEmpty()永遠是true。 

    * remainingCapacity() 永遠是0。 

    * remove()和removeAll() 永遠是false。 

SynchronousQueue 内部沒有容量,但是由于一個插入操作總是對應一個移除操作,反過來同樣需要滿足。那麼一個元素就不會再SynchronousQueue 裡面長時間停留,一旦有了插入線程和移除線程,元素很快就從插入線程移交給移除線程。也就是說這更像是一種信道(管道),資源從一個方向快速傳遞到另一方 向。顯然這是一種快速傳遞元素的方式,也就是說在這種情況下元素總是以最快的方式從插入着(生産者)傳遞給移除着(消費者),這在多任務隊列中是最快處理任務的方式。線上程池裡的一個典型應用是Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)建立新的線程,如果有空閑線程則會重複使用,線程空閑了60秒後會被回收。

二、 使用示例

package com.dxz.queue.block;

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

        Thread putThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("put thread start");
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                }
                System.out.println("put thread end");
            }
        });

        Thread takeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("take thread start");
                try {
                    System.out.println("take from putThread: " + queue.take());
                } catch (InterruptedException e) {
                }
                System.out.println("take thread end");
            }
        });

        putThread.start();
        Thread.sleep(1000);
        takeThread.start();
    }
}      

結果:

put thread start
take thread start
take from putThread: 1
take thread end
put thread end      

三、實作原理

3.1、阻塞算法實作

3.1.1、使用wait和notify實作

  阻塞算法實作通常在内部采用一個鎖來保證多個線程中的put()和take()方法是串行執行的。采用鎖的開銷是比較大的,還會存在一種情況是線程A持有線程B需要的鎖,B必須一直等待A釋放鎖,即使A可能一段時間内因為B的優先級比較高而得不到時間片運作。是以在高性能的應用中我們常常希望規避鎖的使用。

package com.dxz.queue.block;

public class NativeSynchronousQueue<E> {
    boolean putting = false;
    E item = null;

    public synchronized E take() throws InterruptedException {
        while (item == null)
            wait();
        E e = item;
        item = null;
        notifyAll();
        return e;
    }

    public synchronized void put(E e) throws InterruptedException {
        if (e == null)
            return;
        while (putting)
            wait();
        putting = true;
        item = e;
        notifyAll();
        while (item != null)
            wait();
        putting = false;
        notifyAll();
    }
}

package com.dxz.queue.block;

public class NativeSynchronousQueueTest {

    public static void main(String[] args) throws InterruptedException {
        final NativeSynchronousQueue<String> queue = new NativeSynchronousQueue<String>();
        Thread putThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("put thread start");
                try {
                    queue.put("1");
                } catch (InterruptedException e) {
                }
                System.out.println("put thread end");
            }
        });

        Thread takeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("take thread start");
                try {
                    System.out.println("take from putThread: " + queue.take());
                } catch (InterruptedException e) {
                }
                System.out.println("take thread end");
            }
        });

        putThread.start();
        Thread.sleep(1000);
        takeThread.start();
    }

}      
結果:      

put thread start

take thread start

put thread end

take from putThread: 1

take thread end

 3.1.2、信号量實作

經典同步隊列實作采用了三個信号量,代碼很簡單,比較容易了解:

package com.dxz.queue.block;

import java.util.concurrent.Semaphore;

public class SemaphoreSynchronousQueue<E> {
    E item = null;
    Semaphore sync = new Semaphore(0);
    Semaphore send = new Semaphore(1);
    Semaphore recv = new Semaphore(0);
 
    public E take() throws InterruptedException {
        recv.acquire();
        E x = item;
        sync.release();
        send.release();
        return x;
    }
 
    public void put (E x) throws InterruptedException{
        send.acquire();
        item = x;
        recv.release();
        sync.acquire();
    }
}

package com.dxz.queue.block;

public class SemaphoreSynchronousQueueTest {

    public static void main(String[] args) throws InterruptedException {
        final SemaphoreSynchronousQueue<String> queue = new SemaphoreSynchronousQueue<String>();
        Thread putThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("put thread start");
                try {
                    queue.put("1");
                } catch (InterruptedException e) {
                }
                System.out.println("put thread end");
            }
        });

        Thread takeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("take thread start");
                try {
                    System.out.println("take from putThread: " + queue.take());
                } catch (InterruptedException e) {
                }
                System.out.println("take thread end");
            }
        });

        putThread.start();
        Thread.sleep(1000);
        takeThread.start();
    }

}
      

  

結果:
put thread start
take thread start
take from putThread: 1
take thread end
put thread end      

在多核機器上,上面方法的同步代價仍然較高,作業系統排程器需要上千個時間片來阻塞或喚醒線程,而上面的實作即使在生産者put()時已經有一個消費者在等待的情況下,阻塞和喚醒的調用仍然需要。

 3.1.3、Java 5實作

package com.dxz.queue.block;

import java.util.Queue;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;

public class Java5SynchronousQueue<E> {
    ReentrantLock qlock = new ReentrantLock();
    Queue waitingProducers = new Queue();
    Queue waitingConsumers = new Queue();

    static class Node extends AbstractQueuedSynchronizer {
        E item;
        Node next;

        Node(Object x) { item = x; }
        void waitForTake() { /* (uses AQS) */ }
           E waitForPut() { /* (uses AQS) */ }
    }

    public E take() {
        Node node;
        boolean mustWait;
        qlock.lock();
        node = waitingProducers.pop();
        if(mustWait = (node == null))
           node = waitingConsumers.push(null);
         qlock.unlock();

        if (mustWait)
           return node.waitForPut();
        else
            return node.item;
    }

    public void put(E e) {
         Node node;
         boolean mustWait;
         qlock.lock();
         node = waitingConsumers.pop();
         if (mustWait = (node == null))
             node = waitingProducers.push(e);
         qlock.unlock();

         if (mustWait)
             node.waitForTake();
         else
            node.item = e;
    }
}      

Java 5的實作相對來說做了一些優化,隻使用了一個鎖,使用隊列代替信号量也可以允許釋出者直接釋出資料,而不是要首先從阻塞在信号量處被喚醒。

 3.1.4、Java6實作

Java 6的SynchronousQueue的實作采用了一種性能更好的無鎖算法 — 擴充的“Dual stack and Dual queue”算法。性能比Java5的實作有較大提升。競争機制支援公平和非公平兩種:非公平競争模式使用的資料結構是後進先出棧(Lifo Stack);公平競争模式則使用先進先出隊列(Fifo Queue),性能上兩者是相當的,一般情況下,Fifo通常可以支援更大的吞吐量,但Lifo可以更大程度的保持線程的本地化。

代碼實作裡的Dual Queue或Stack内部是用連結清單(LinkedList)來實作的,其節點狀态為以下三種情況:

  1. 持有資料 – put()方法的元素
  2. 持有請求 – take()方法

這個算法的特點就是任何操作都可以根據節點的狀态判斷執行,而不需要用到鎖。

其核心接口是Transfer,生産者的put或消費者的take都使用這個接口,根據第一個參數來差別是入列(棧)還是出列(棧)。

/**
     * Shared internal API for dual stacks and queues.
     */
    static abstract class Transferer {
        /**
         * Performs a put or take.
         *
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract Object transfer(Object e, boolean timed, long nanos);
    }      

TransferQueue實作如下(摘自Java 6源代碼),入列和出列都基于Spin和CAS方法:

/**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed &amp;&amp; nanos &lt;= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null)? x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null)? x : e;
                }
            }
        }      

3.2、SynchronousQueue實作原理

   不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞隊列依賴AQS實作并發操作,SynchronousQueue直接使用CAS實作線程的安全通路。由于源碼中充斥着大量的CAS代碼,不易于了解,是以按照筆者的風格,接下來會使用簡單的示例來描述背後的實作模型。

隊列的實作政策通常分為公平模式和非公平模式,接下來将分别進行說明。

3.2.1、公平模式下的模型:

  公平模式下,底層實作使用的是TransferQueue這個内部隊列,它有一個head和tail指針,用于指向目前正在等待比對的線程節點。

初始化時,TransferQueue的狀态如下:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

接着我們進行一些操作:

1、線程put1執行 put(1)操作,由于目前沒有配對的消費線程,是以put1線程入隊列,自旋一小會後睡眠等待,這時隊列狀态如下:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

2、接着,線程put2執行了put(2)操作,跟前面一樣,put2線程入隊列,自旋一小會後睡眠等待,這時隊列狀态如下:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

3、這時候,來了一個線程take1,執行了 take操作,由于tail指向put2線程,put2線程跟take1線程配對了(一put一take),這時take1線程不需要入隊,但是請注意了,這時候,要喚醒的線程并不是put2,而是put1。為何? 大家應該知道我們現在講的是公平政策,所謂公平就是誰先入隊了,誰就優先被喚醒,我們的例子明顯是put1應該優先被喚醒。至于讀者可能會有一個疑問,明明是take1線程跟put2線程比對上了,結果是put1線程被喚醒消費,怎麼確定take1線程一定可以和次首節點(head.next)也是比對的呢?其實大家可以拿個紙畫一畫,就會發現真的就是這樣的。

公平政策總結下來就是:隊尾比對隊頭出隊。

執行後put1線程被喚醒,take1線程的 take()方法傳回了1(put1線程的資料),這樣就實作了線程間的一對一通信,這時候内部狀态如下:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

4、最後,再來一個線程take2,執行take操作,這時候隻有put2線程在等候,而且兩個線程比對上了,線程put2被喚醒,

take2線程take操作傳回了2(線程put2的資料),這時候隊列又回到了起點,如下所示:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

以上便是公平模式下,SynchronousQueue的實作模型。總結下來就是:隊尾比對隊頭出隊,先進先出,展現公平原則。

非公平模式下的模型:

我們還是使用跟公平模式下一樣的操作流程,對比兩種政策下有何不同。非公平模式底層的實作使用的是TransferStack,

一個棧,實作中用head指針指向棧頂,接着我們看看它的實作模型:

1、線程put1執行 put(1)操作,由于目前沒有配對的消費線程,是以put1線程入棧,自旋一小會後睡眠等待,這時棧狀态如下:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

2、接着,線程put2再次執行了put(2)操作,跟前面一樣,put2線程入棧,自旋一小會後睡眠等待,這時棧狀态如下:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

3、這時候,來了一個線程take1,執行了take操作,這時候發現棧頂為put2線程,比對成功,但是實作會先把take1線程入棧,然後take1線程循環執行比對put2線程邏輯,一旦發現沒有并發沖突,就會把棧頂指針直接指向 put1線程

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

4、最後,再來一個線程take2,執行take操作,這跟步驟3的邏輯基本是一緻的,take2線程入棧,然後在循環中比對put1線程,最終全部比對完畢,棧變為空,恢複初始狀态,如下圖所示:

【轉載】阻塞隊列之三:SynchronousQueue同步隊列 阻塞算法的3種實作

可以從上面流程看出,雖然put1線程先入棧了,但是卻是後比對,這就是非公平的由來。

總結

SynchronousQueue由于其獨有的線程一一配對通信機制,在大部分平常開發中,可能都不太會用到,但線程池技術中會有所使用,由于内部沒有使用AQS,而是直接使用CAS,是以代碼了解起來會比較困難,但這并不妨礙我們了解底層的實作模型,在了解了模型的基礎上,有興趣的話再查閱源碼,就會有方向感,看起來也會比較容易,希望本文有所借鑒意義。

轉自:Java并發包中的同步隊列SynchronousQueue實作原理

作者的原創文章,轉載須注明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對于轉載了部落客的原創文章,不标注出處的,作者将依法追究版權,請尊重作者的成果。