天天看點

并發:Java中的阻塞隊列(BlockingQueue)。什麼是阻塞隊列Java裡的阻塞隊列阻塞隊列的實作原理

本文将介紹什麼是阻塞隊列,以及Java中阻塞隊列的4種處理方式,并介紹Java  7中提供的7種阻塞隊列,最後分析阻塞隊列的一種實作方式。

什麼是阻塞隊列

阻塞隊列是一個支援兩個附加操作的隊列。這兩個附加的操作支援阻塞的插入和移除方法。

  • 支援阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  • 支援阻塞的移除方法:意思是在隊列為空時,擷取元素的線程會等待隊列變為非空。

阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者是從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素、消費者用來擷取元素的容器。

在阻塞隊列不可用時,這兩個附加操作提供了4重處理方式,如下表所示。

方法/處理方式 抛出異常 傳回特殊值 一直阻塞 逾時退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
檢查方法 element() peek() 不可用 不可用
  • 抛出異常:當隊列滿時,如果再往隊列裡插入元素,會抛出IllegalStateException("Queue full")異常。當隊列空時,從隊列裡擷取元素會抛出NoSuchElementException異常。
  • 傳回特殊值:當往隊列插入元素時,會傳回元素是否插入成功,成功傳回true。如果是移除方法,則是從隊列裡取出一個元素,如果沒有則傳回null。
  • 一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡put元素,隊列會一直阻塞生産者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者從隊列裡take元素,隊列會阻塞住消費者線程,直到隊列不為空。
  • 逾時退出:當阻塞隊列滿時,如果生産者線程往隊列裡插入元素,隊列會阻塞生産者線程一段時間,如果超過了指定的時間,生産者線程就會退出。

這兩個附加操作的4種處理方式不友善記憶,是以我找了一下這幾個方法的規律。put和take分别尾首含有字母t,offer和poll都含有字母o。

注意:如果是無界阻塞隊列,隊列不可能會出現滿的情況,是以使用put或offer方法永遠不會被阻塞,而且使用offer方法時,該方法永遠傳回true。

Java裡的阻塞隊列

JDK 7提供了7個阻塞隊列,如下。

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue:一個由連結清單結構組成的有界阻塞隊列。
  • PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實作的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列。

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實作的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。

預設情況下不保證線程公平的通路隊列,所謂公平通路隊列是指阻塞的線程,可以按照阻塞的現後順序通路隊列,即先阻塞線程先通路隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程都可以争奪通路隊列的資格,有可能先阻塞的線程最後才通路隊列。為了保證公平性,通常會降低吞吐量。我們可以使用以下代碼建立一個公平的阻塞隊列。

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000, true);
           

通路者的公平性是使用可重入鎖實作的,代碼如下。

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
           

LinkedBlockingQueue

LinkedBlockingQueue是一個用連結清單實作的有界阻塞隊列。此隊列的預設和最大長度為Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

PriorityBlockingQueue

PriorityBlockingQueue是一個支援優先級的無界阻塞隊列。預設情況下元素采取自然順序升序排列。也可以自定義類實作compareTo()方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。需要注意的是不能保證同優先級元素的順序。

DelayQueue

DelayQueue是一個支援延時擷取元素的無界阻塞隊列。隊列使用PriorityQueue來實作。隊列中的元素必須實作Delayed接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時才能從隊列中提取元素。

DelayQueue非常有用,可以将DelayQueue運用在以下應用場景。

  • 緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦從DelayQueue中擷取元素時,表示緩存有效期到了。
  • 定時任務排程:使用DelayQueue儲存當天将會執行的任務和執行時間,一定從DelayQueue中擷取到任務就開始執行,比如TimerQueue就是使用DelayQueue實作的。

如何實作Delayed接口

DelayQueue隊列的元素必須實作Delayed接口。我們可以參考ScheduledThreadPoolExecutor裡ScheduledFutureTask類的實作,一共有三步。

第一步:在對象建立的時候,初始化基本資料。使用time記錄目前對象延遲到什麼時候可以使用,使用sequenceNumber來辨別元素在隊列中的現後順序。代碼如下。

private static final AtomicLong sequencer = new AtomicLong(0);
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
    }
           

第二步:實作getDelay方法,該方法傳回目前元素還需要延時多長時間,機關是納秒,代碼如下。

public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }
           

通過構造函數可以看出延遲時間參數ns的機關是納秒,自己設計的時候最好使用納秒,因為實作getDelay()方法時可以指定任意機關,一旦以秒或分作為機關,而延時時間又精确不到納秒就麻煩了。使用時請注意當time小于目前時間時,getDelay會傳回負數。

第三步:實作compareTo方法來指定元素的順序。例如,讓延時時間最長的放在隊列的末尾。實作代碼如下。

public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }
           

如何實作延時阻塞隊列

延時阻塞隊列的實作很簡單,當消費者從隊列裡擷取元素時,如果元素沒有達到延時時間,就阻塞目前隊列。

long delay = first.getDelay(TimeUnit.NANOSECONDS);
if(delay <= 0) return q.poll();
else if(leader != null) available.await();
else {
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
        available.awaitNanos(delay);
    } finally {
        if(leader == thisThread) leader = null
    }
}
           

代碼中的變量leader是一個等待擷取隊列頭部元素的線程。如果leader不等于空,表示已經有線程在等待擷取隊列的頭元素。是以,使用await()方法讓目前線程等待信号。如果leader等于空,則把目前線程設定成leader,并使用awaitNanos()方法讓目前線程等待接收信号或等待delay時間。

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。

他支援公平通路隊列。預設情況下線程采用非公平性政策通路隊列。使用以下構造方法可以建立公平性通路的SynchronousQueue,如果設定為true,則等待的線程會采用先進先出的順序通路隊列。

public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }
           

SynchronousQueue可以看成是一個傳球手,負責把生産者線程處理的資料直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常适合傳遞性場景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrrayBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue是一個由連結清單結構組成的無界阻塞TransferQueue隊列。相對于其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法

如果目前有消費者正在等待接收元素(消費者使用take()方法或待時間限制的poll()方法時),transfer方法可以把生産者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會将元素存放在隊列的tail節點,并等待該元素被消費者消費了才傳回。transfer方法的關鍵代碼如下。

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
           

第一行代碼是試圖把存放目前元素的s節點作為tail節點。第二行代碼是讓CPU自旋等待消費者元素。因為自旋會消耗CPU,是以自旋一定的次數後使用Thread.yield()方法來暫停目前正在執行的線程,并執行其他線程。

tryTransfer方法

tryTransfer方法是用來試探生産者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則傳回false。和transfer方法的差別是tryTransfer方法無論消費者是否接收,方法立即傳回,而transfer方法是必須等到消費者消費了才傳回。

對于帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,試圖把生産者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再傳回,如果逾時還沒消費元素,則傳回false,如果在逾時時間内消費了元素,則傳回true。

LinkedBlockingDeque

LinkedBlockingDeque是一個由連結清單結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素。雙向隊列因為多個一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競争。相對其他的阻塞隊列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、擷取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、擷取或移除雙端隊列的最後一個元素。另外,插入方法add等同于addList,移除方法remove等效于removeFirst。

在初始化LinkedBlockingDeque時可以設定容量防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。

阻塞隊列的實作原理

如果隊列是空的,消費者會一直等待,當生産者添加元素時,消費者是如何知道目前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,如何讓生産者和消費者進行高效率的通信呢?讓我們先來看看JDK是如何實作的。

使用通知模式實作。所謂通知模式,就是當生産者往滿的隊列裡添加元素時會阻塞住生産者,當消費者消費了一個隊列中的元素後,會通知生産者目前隊列可用。通過檢視JDK源碼發現ArrayBlockingQueue使用了Condition來實作,代碼如下。

/** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }
           

當往隊列裡插入一個元素時,如果隊列不可用,那麼阻塞生産者主要通過LockSupport.park(this)來實作。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
           

繼續進入源碼,發現調用setBlocker先儲存一下将要阻塞的線程,然後調用unsafe.park阻塞目前線程。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }
           

unsafe.park是個native方法,代碼如下。

  • park這個方法會阻塞目前線程,隻有以下4種情況中的一種發生時,該方法才會傳回。
  • 與park對應的unpark執行或已經執行時。“已經執行”是指unpark先執行,然後再執行park的情況。
  • 線程被中斷時。
  • 等待完time參數指定的毫秒數時。
  • 異常現象發生時,這個異常現象沒有任何原因。

繼續看一下JVM是如何實作park方法:park在不同的作業系統中使用不同的方式實作,在Linux下使用的是系統方法pthread_cond_wait實作。實作代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp裡的os::PlatformEvent::park方法,代碼如下。

并發:Java中的阻塞隊列(BlockingQueue)。什麼是阻塞隊列Java裡的阻塞隊列阻塞隊列的實作原理

pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思可以了解為線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數:一個共享變量_cond,一個互斥量_mutex。而unpark方法在Linux下是使用pthread_cond_signal實作的。park方法在Windows下則是使用WaitForSingleObject實作的。

當線程被阻塞隊列阻塞時,線程會進入WAITING(parking)狀态。我們可以使用jstack dump阻塞的生産者線程看到這點,如下。

并發:Java中的阻塞隊列(BlockingQueue)。什麼是阻塞隊列Java裡的阻塞隊列阻塞隊列的實作原理