天天看點

阻塞隊列--BlockingQueue

概述

阻塞隊列是一個支援兩個附加操作的隊列。這兩個附加的操作支援阻塞的插入和移除方法。

1. 支援阻塞的插入方法:當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿

2. 支援阻塞的移除方法:當隊列為空時,擷取元素的線程會等待隊列變為非空。

阻塞隊列适用與生産者和消費者的場景。在阻塞隊列不可用時(滿了或者為空),這兩個附件操作提供了4中處理方式:抛出異常,傳回特殊值、一直阻塞、逾時退出

方法/處理方式 抛出異常 傳回特殊值 一直阻塞 逾時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll(e) take() offer(e,time,unit)
檢查方法 element() peek(e) 不可用 不可用

1.抛出異常:當隊列滿時,如果再往隊列添加元素,會抛出IllegalStateException異常。當隊列為空時,如果從隊列裡擷取元素,會抛出NoSuchElementException

2. 傳回特殊值:當往隊列插入元素時,會傳回元素是否插入成功,成功傳回true,如果是移除方法,則是從隊列裡取出一個元素,如果沒有則傳回null。

3. 一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡put元素,隊列會一直阻塞生産者線程,直到隊列可用或者響應中斷退出。當隊列為空時,如果消費者線程從隊列裡take元素,隊列會阻塞消費線程,直到隊列不為空。

4. 逾時推出:當阻塞隊列滿時,如果生産者線程往隊列裡插入元素,隊列會阻塞生産者線程一段時間,如果超過了指定時間,生産者線程就會退出。

注意,如果阻塞隊列是無界的,那麼隊列不可能會出現滿的情況,是以使用put或者offer方法永遠不會被阻塞,而且使用offer方法時,永遠傳回true

JDK7提供的7個阻塞隊列:

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列
  • LinkedBlockingQueue:一個有連結清單組成的有界阻塞隊列
  • PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列
  • DelayQueue:一個使用優先級隊列實作的無界阻塞隊列
  • SynchronousQueue:一個不存儲元素的阻塞隊列
  • LinkedTransferQueue:一個有連結清單組成的無界阻塞隊列
  • LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列

ArrayBlockingQueue

預設情況下是不保證線程公平的通路隊列,即當隊列可用時,阻塞的線程都可以加入到争奪通路隊列的資格,有可能先阻塞的線程最後才通路到隊列。為了保證公平性,通常會降低吞吐量。

/**
    *capacity 隊列的容量
    *fair 是否公平性
    **/
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= )
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);//可重入鎖
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
           

LinkedBlockingQueue

預設和最大長度都為Integer.MAX_VALUE。按照先進先出的原則對元素排序。

PriorityBlockingQueue

支援優先級的無界阻塞隊列。預設情況下元素采取自然順序升序排列。也可以自定義類實作CompareTo()方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。但是不保證通優先級元素的順序。

DelayQueue

支援延時擷取元素的無界阻塞隊列。隊列使用PriorityQueue來實作。隊列中的元素必須實作Delayed接口,在建立元素時,可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時,才能從隊列中提前元素。

應用場景:

緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue擷取元素時,表示緩存有效期到了。

定時任務排程:使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從DelayQueue中擷取到任務就開始執行。

如何實作Delayed接口

可以參考ScheduledThreadPoolExecutor裡的ScheduledFutureTask類的實作:

第一步:在對象建立的時候,初始化基本資料。使用time記錄目前對象延遲到什麼時候可以使用,使用sequenceNumber來辨別元素在隊列中的先後順序:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }
           

第二步:執行個體getDelay()方法,該方法傳回目前元素還需要延時多長時間,機關是納秒

public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }
           

自己設計的時候最好使用納秒!當time小于目前時間時,會傳回負數

第三部:實作compareTo方法來指定元素的順序。

public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return ;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < )
                    return -;
                else if (diff > )
                    return ;
                else if (sequenceNumber < x.sequenceNumber)
                    return -;
                else
                    return ;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == ) ?  : ((d < ) ? - : );
        }
           

如何實作延時阻塞隊列

當消費者從隊列裡擷取元素時,如果元素沒有達到延時時間,則阻塞目前線程。

public RunnableScheduledFuture take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture first = queue[];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= )
                            return finishPoll(first);
                        else if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[] != null)
                    available.signal();
                lock.unlock();
            }
        }
           

leader是一個等待擷取隊列頭部元素的線程。如果leader不等于空,說明已經有線程在等待擷取隊列的頭元素。是以,使用await方法讓目前線程等待信号。如果leader等于空,則把目前線程設定成leader,并使用awaitNanos()方法讓目前線程等待接收信号或等待delay時間。

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作都必須等待下一個take操作,否則不能繼續添加元素。支援公平通路隊列,預設為非公平政策。

SynchronousQueue适合傳遞性場景,負責把生産線程處理的資料直接傳遞給消費者。它的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue,因為不需要緩存。

LinkedTransferQueue

LinkedTransferQueue是一個有連結清單組成的無界阻塞TransferQueue隊列。相比其他阻塞隊列,多了tryTransfer和transfer方法

1. transfer方法

如果目前有消費者正在等待接收元素(消費者使用take()方法或者帶時間限制的poll()方法),transfer方法可以把生産者傳入的元素立刻傳輸到消費者。如果沒有消費者在等待接收元素,transfer方法會将元素存放在隊列的tail節點,并等到該元素被消費者消費了才傳回。

2. tryTransfer方法

tryTransfer方法是用來試探生産者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則傳回false。和transfer方法的差別是tryTransfer方法無論消費者是否接收,方法立即傳回,而transfer方法是必須等到消費者消費了才傳回。

LinkedBlockingDeque

LinkedBlockingDeque是一個由連結清單結構組成的雙向阻塞隊列,即可以從隊列的兩端插入和移除元素。以其他阻塞隊列相比,多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法。以First結尾的方法,表示插入或者擷取或者移除隊列的第一個元素。以Last結尾的表示插入、擷取或者移除隊列的最後一個元素。另外,插入方法add等同于addLast,移除方法remove等同與removeFirst,但是take方法等同與takeFirst。是以使用時,還是使用帶First或者Last字尾的方法,比較清楚。