天天看點

JAVA并發容器-阻塞隊列

在并發程式設計中,有時候需要使用線程安全的隊列。如果要實作一個線程安全的隊列有兩 種方式:一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖 (入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實作。非阻塞的實作方式則可以使用循環CAS的方式來實作。

阻塞隊列

阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列。這兩個附加的操作支援阻塞的插入和移除方法。

  1. 支援阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  2. 支援阻塞的移除方法:意思是在隊列為空時,擷取元素的線程會等待隊列變為非空。

應用場景

阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者是 從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素、消費者用來擷取元素的容器。

插入和移除操作的4中處理方式

JAVA并發容器-阻塞隊列
  • 抛出異常:當隊列滿時,如果再往隊列裡插入元素,會抛出IllegalStateException(“Queue full”)異常。當隊列空時,從隊列裡擷取元素會抛出NoSuchElementException異常。
  • 傳回特殊值:當往隊列插入元素時,會傳回元素是否插入成功,成功傳回true。如果是移除方法,則是從隊列裡取出一個元素,如果沒有則傳回null。
  • 一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡put元素,隊列會一直阻塞生産者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列裡take元素,隊列會阻塞消費者線程,直到隊列不為空。
  • 逾時退出:當阻塞隊列滿時,如果生産者線程往隊列裡插入元素,隊列會阻塞生産者線程 一段時間,如果超過了指定的時間,生産者線程就會退出。
注意: 如果是無界阻塞隊列,隊列不可能會出現滿的情況,是以使用put或offer方法永 遠不會被阻塞,而且使用offer方法時,該方法永遠傳回true。

Java裡的阻塞隊列

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue:一個由連結清單結構組成的有界阻塞隊列。
  • PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實作的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列。

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實作的有界阻塞隊列,在初始化時需要指定隊列的長度。此隊列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證線程公平的通路隊列,但是在初始化的隊列的時候指定阻塞隊列的公平性,如:​

​ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);​

​​。它使用​

​ReentrantLock​

​來實作隊列的線程安全。

JAVA并發容器-阻塞隊列

核心屬性

/** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;      

核心方法

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }      

LinkedBlockingQueue

LinkedBlockingQueue是一個用連結清單實作的有界阻塞隊列。此隊列的預設和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。出隊和入隊使用兩把鎖來實作。

核心屬性

/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();      

PriorityBlockingQueue

PriorityBlockingQueue是一個支援優先級的無界阻塞隊列。預設情況下元素采取自然順序升序排列。也可以自定義類實作compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。需要注意的是不能保證 同優先級元素的順序。底層使用數組實作,預設初始容量是11,最大值是​

​Integer.MAX_VALUE - 8​

​。容量不夠時會進行擴容

核心方法

// 入隊
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    // 擴容
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }      

DelayQueue

DelayQueue是一個支援延時擷取元素的無界阻塞隊列。隊列使用PriorityQueue來實作。隊 列中的元素必須實作Delayed接口和Comparable接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。 隻有在延遲期滿時才能從隊列中提取元素。

應用場景

  • 緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了。
  • 定時任務排程:使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從 DelayQueue中擷取到任務就開始執行,比如TimerQueue就是使用DelayQueue實作的。

如何實作Delayed接口

DelayQueue隊列的元素必須實作Delayed接口。我們可以參考ScheduledThreadPoolExecutor 裡ScheduledFutureTask類的實作,一共有三步。

第一步:在對象建立的時候,初始化基本資料。使用time記錄目前對象延遲到什麼時候可 以使用,使用sequenceNumber來辨別元素在隊列中的先後順序。代碼如下:

private static final AtomicLong sequencer = new AtomicLong(0);

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    ScheduledFutureTask(Runnable r, V result, long ns, long period){
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
}      

第二步:實作getDelay方法,該方法傳回目前元素還需要延時多長時間,機關是納秒,代碼 如下:

public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}      

注意當time小于目前時間時,getDelay會傳回負數,這時才可以出隊。

第三步:實作compareTo方法來指定元素的順序。例如,讓延時時間最長的放在隊列的末 尾。實作代碼如下:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
        ScheduledThreadPoolExecutor.ScheduledFutureTask<?> x = (ScheduledThreadPoolExecutor.ScheduledFutureTask<?>)other;
        // 過期時間小的排前面,大的排後面,如果一樣就使用sequenceNumber 來排序。
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    // 快要過期的排在前面
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}      

如何實作延時阻塞隊列

延時阻塞隊列的實作很簡單,當消費者從隊列裡擷取元素時,如果元素沒有達到延時時 間,就阻塞目前線程。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    // 隊列為NULL,阻塞線程直到逾時
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // 等待時間小于第一個元素的過期時間
                if (nanos < delay || leader != null)
                    // 阻塞線程直到逾時
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待時間大于第一個元素的過期時間,阻塞線程直到第一個元素過期
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 喚醒其他阻塞線程
            available.signal();
        lock.unlock();
    }
}      

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作, 否則不能繼續添加元素。 它支援公平通路隊列。預設情況下線程采用非公平性政策通路隊列。

SynchronousQueue可以看成是一個傳球手,負責把生産者線程處理的資料直接傳遞給消費 者線程。隊列本身并不存儲任何元素,非常适合傳遞性場景。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue是一個由連結清單結構組成的無界阻塞TransferQueue隊列。相對于其他阻 塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法

如果目前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法 時),transfer方法可以把生産者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等 待接收元素,transfer方法會将元素存放在隊列的tail節點,并等到該元素被消費者消費了才返 回。

LinkedBlockingDeque

LinkedBlockingDeque是一個由連結清單結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以 從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊 時,也就減少了一半的競争。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、 擷取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、擷取或移除雙 端隊列的最後一個元素。另外,插入方法add等同于addLast,移除方法remove等效于 removeFirst。但是take方法卻等同于takeFirst,不知道是不是JDK的bug,使用時還是用帶有First 和Last字尾的方法更清楚。

在初始化LinkedBlockingDeque時可以設定容量防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。

參考

《java并發程式設計的藝術》

源碼

​​https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases​​

layering-cache

繼續閱讀