天天看點

Java集合中的queue系列源碼分析javaQueue學習

javaQueue學習

Java集合中的queue系列源碼分析javaQueue學習

queue接口

根據上圖的結構先看一下queue接口的源碼

public interface Queue<E> extends Collection<E> {
    //add方法,将一個element加入到隊列中。
  	//源代碼中的解釋是:如果element可以加入到隊列那麼就立即執行添加,并傳回true,如果失敗就會報非法代碼異常
    boolean add(E e);

    //offer方法
  	/*插入一個成員到queue中,如果可以插入就立即插入并傳回true,
     *如果是一個容量有限的queue,且容量已滿那麼這個方法會比add()方法更加可取,該方法會抛出異常并終止插入并傳回false
     */
    boolean offer(E e);

    /**
     * 移除隊列首部的一個element如果queue為空則抱錯(沒找到element異常)
     *
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E remove();

    /**
     * 移除隊列頭部的element并傳回element,如果隊列為空則傳回null
     *
     * @return the head of this queue, or {@code null} if this queue is empty
     */
    E poll();

    /**
     * 傳回隊列頭部的element但是不移除這個element,如果隊列為空則抱錯
     *
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E element();

    /**
     * Retrieves, but does not remove, the head of this queue,
     * or returns {@code null} if this queue is empty.
     *
     * @return the head of this queue, or {@code null} if this queue is empty
     */
    E peek();
}
           

上面就是queue接口的代碼了。可以看到他為全部的queue聲明了一套命名格式規範并且方法功能明确的方法實作規則,全部的queue實作類都會根據自己的特性去實作這些方法的功能。

根據queue的實作特性可以分成:1.非阻塞隊列 2.阻塞隊列 3.雙端隊列三種,而Java中也對應着這三種的抽象類。

abstractqueue抽象類

源代碼:

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {

    /**
     * 構造方法
     */
    protected AbstractQueue() {
    }

    /**
     *根據queue接口中的說明這個add方法功能是添加一個element到queue中如果成功傳回true失敗則抛出代碼非法異常
     *
     * @param e the element to add
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     * @throws IllegalStateException if the element cannot be added at this
     *         time due to capacity restrictions
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null and
     *         this queue does not permit null elements
     * @throws IllegalArgumentException if some property of this element
     *         prevents it from being added to this queue
     */
    public boolean add(E e) {
        if (offer(e))//通過offer方法來插入element成功offer傳回一個true
            return true;//傳回true
        else//這裡代表offer失敗傳回了一個false
            throw new IllegalStateException("Queue full");//抛出異常
    }

    /**
     * 根據queue接口的說明這個方法是:移除head的一個element如果成功傳回element,失敗則抛異常
     *
     * <p>This implementation returns the result of <tt>poll</tt>
     * unless the queue is empty.
     *
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    public E remove() {
        E x = poll();//通過poll方法傳回并移除隊首的element
        if (x != null)//如果移除失敗則是一個null,否則是一個element
            return x;//傳回element
        else
            throw new NoSuchElementException();//是null則抛異常
    }

    /**
     * 根據queue接口說明,這裡要實作傳回一個element但不remove
     *
     * <p>This implementation returns the result of <tt>peek</tt>
     * unless the queue is empty.
     *
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    public E element() {
        E x = peek();//同樣使用peek方法傳回element,或者null
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    /**
     * 移除queue中的全部element
     * The queue will be empty after this call returns.
     *
     * <p>This implementation repeatedly invokes {@link #poll poll} until it
     * returns <tt>null</tt>.
     */
    public void clear() {
        while (poll() != null)//通過循環的poll方法直到queue中為空,傳回false時
            ;
    }

    public boolean addAll(Collection<? extends E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        boolean modified = false;
        for (E e : c)
            if (add(e))
                modified = true;
        return modified;
    }

}
           

上面是隊列的高層抽象類,全部的隊列都繼承這個類,而比如阻塞隊列因為有阻塞的特性還需要實作一個blockingqueue接口。

blockingqueue接口

public interface BlockingQueue<E> extends Queue<E> {
    
    boolean add(E e);
    boolean offer(E e);

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

    /**
     * Inserts the specified element into this queue, waiting up to the
     * specified wait time if necessary for space to become available.
     *
     * @param e the element to add
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    E take() throws InterruptedException;

    /**
     * Retrieves and removes the head of this queue, waiting up to the
     * specified wait time if necessary for an element to become available.
     *
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element is available
     * @throws InterruptedException if interrupted while waiting
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Returns the number of additional elements that this queue can ideally
     * (in the absence of memory or resource constraints) accept without
     * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
     * limit.
     *
     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
     * an element will succeed by inspecting {@code remainingCapacity}
     * because it may be the case that another thread is about to
     * insert or remove an element.
     *
     * @return the remaining capacity
     */
    int remainingCapacity();

    boolean remove(Object o);

    /**
     * Returns {@code true} if this queue contains the specified element.
     * More formally, returns {@code true} if and only if this queue contains
     * at least one element {@code e} such that {@code o.equals(e)}.
     *
     * @param o object to be checked for containment in this queue
     * @return {@code true} if this queue contains the specified element
     * @throws ClassCastException if the class of the specified element
     *         is incompatible with this queue
     *         (<a href="../Collection.html#optional-restrictions" target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow" >optional</a>)
     * @throws NullPointerException if the specified element is null
     *         (<a href="../Collection.html#optional-restrictions" target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow" >optional</a>)
     */
    public boolean contains(Object o);

    /**
     * Removes all available elements from this queue and adds them
     * to the given collection.  This operation may be more
     * efficient than repeatedly polling this queue.  A failure
     * encountered while attempting to add elements to
     * collection {@code c} may result in elements being in neither,
     * either or both collections when the associated exception is
     * thrown.  Attempts to drain a queue to itself result in
     * {@code IllegalArgumentException}. Further, the behavior of
     * this operation is undefined if the specified collection is
     * modified while the operation is in progress.
     *
     * @param c the collection to transfer elements into
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
     */
    int drainTo(Collection<? super E> c);

    /**
     * Removes at most the given number of available elements from
     * this queue and adds them to the given collection.  A failure
     * encountered while attempting to add elements to
     * collection {@code c} may result in elements being in neither,
     * either or both collections when the associated exception is
     * thrown.  Attempts to drain a queue to itself result in
     * {@code IllegalArgumentException}. Further, the behavior of
     * this operation is undefined if the specified collection is
     * modified while the operation is in progress.
     *
     * @param c the collection to transfer elements into
     * @param maxElements the maximum number of elements to transfer
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
     */
    int drainTo(Collection<? super E> c, int maxElements);
}
           

根據上面可以看出,這個接口為阻塞隊列規定了兩個阻塞方法(take,put)和一些逾時方法。

LinkedBlockingQueue的容量是沒有上限的(說的不準确,在不指定時容量為Integer.MAX_VALUE,不要然的話在put時怎麼會受阻呢),但是也可以選擇指定其最大容量,它是基于連結清單的隊列,此隊列按 FIFO(先進先出)排序元素。

ArrayBlockingQueue在構造時需要指定容量, 并可以選擇是否需要公平性,如果公平參數被設定true,等待時間最長的線程會優先得到處理(其實就是通過将ReentrantLock設定為true來 達到這種公平性的:即等待時間最長的線程會先操作)。通常,公平性會使你在性能上付出代價,隻有在的确非常需要的時候再使用它。它是基于數組的阻塞循環隊 列,此隊列按 FIFO(先進先出)原則對元素進行排序。

PriorityBlockingQueue是一個帶優先級的 隊列,而不是先進先出隊列。元素按優先級順序被移除,該隊列也沒有上限(看了一下源碼,PriorityBlockingQueue是對 PriorityQueue的再次包裝,是基于堆資料結構的,而PriorityQueue是沒有容量限制的,與ArrayList一樣,是以在優先阻塞 隊列上put時是不會受阻的。雖然此隊列邏輯上是無界的,但是由于資源被耗盡,是以試圖執行添加操作可能會導緻 OutOfMemoryError),但是如果隊列為空,那麼取元素的操作take就會阻塞,是以它的檢索操作take是受阻的。另外,往入該隊列中的元 素要具有比較能力。

DelayQueue(基于PriorityQueue來實作的)是一個存放Delayed 元素的無界阻塞隊列,隻有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則隊列沒有頭部,并且poll将傳回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法傳回一個小于或等于零的值時,則出現期滿,poll就以移除這個元素了。此隊列不允許使用 null 元素。

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

   
    private static final long serialVersionUID = -817911632652898426L;

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    transient Itrs itrs = null;
           

從上面可以看出這是通過一個數組來實作的queue,并且使用了重入鎖來維護同步隊列,并将壓入和彈出的等待隊列交給兩個condition來維護。

既然是數組實作那麼就一定是一個容量有限的queue

LinkedBlockingQueue

這個與上數組實作的差別就是他的容量是無限的,或者說是在達到最大容量之前他都是可以填加的。

下面是在源碼中他的預設容量和最大容量。

private static final int DEFAULT_INITIAL_CAPACITY = 11;

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;