天天看點

聊一聊面試中常問的java阻塞隊列

學習資料結構的時候介紹過隊列,今天介紹一種隊列的其中一種,叫做阻塞隊列。這個知識點屬于多線程中的一個子產品,對于我們了解消息中間件有份非常大的用處,希望對你有幫助。

一、什麼是阻塞隊列

1、概念了解

隊列比較好了解,資料結構中我們都接觸過,先進先出的一種資料結構,那什麼是阻塞隊列呢?從名字可以看出阻塞隊列其實也就是隊列的一種特殊情況。舉個例子來說明一下吧,我們去餐館吃飯,一個接一個的下單,這時候就是一個普通的隊列,萬一這家店生意好,餐館擠滿了人,這時候肯定不能把顧客趕出去,于是餐館就在旁邊設定了一個休息等待區。這就是一個阻塞隊列了。我們使用一張圖來示範一下:

聊一聊面試中常問的java阻塞隊列

2、特點

從上面這張圖我們會發現這樣的規律:

(1)當阻塞隊列為空時,從隊列中擷取元素的操作将會被阻塞,就好比餐館休息區沒人了,此時不能接納新的顧客了。換句話,肚子為空的時候也沒東西吃。

(2)當阻塞隊列滿了,往隊列添加元素的操作将會被阻塞,好比餐館的休息區也擠滿了,後來的顧客隻能走了。

從上面的概念我們類比到線程中去,我們會發現,在某些時候線程可能不能不阻塞,因為CPU核心就那麼幾個,阻塞現狀更加說明了資源的使用率高,換句話來說,阻塞其實是一個好事。

阻塞隊列應用最廣泛的是生産者和消費者模式,待會也會給出一個實際案例示範一下。

還有一點,就是我們看這個阻塞隊列有點線程池的感覺,其實基本上可以這樣了解,阻塞隊列線上程池中确實有着很大的應用。我們可以給出倆例子:

public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
    }
public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
}
           

前面說了這麼久,來個标準點的定義吧:

在多線程中,阻塞的意思是,在某些情況下會挂起線程,一旦條件成熟,被阻塞的線程就會被自動喚醒。

也就是說,之前線程的wait和notify我們程式員需要自己控制,但有了這個阻塞隊列之後我們程式員就不用擔心了,阻塞隊列會自動管理。

歐了,我們對概念先認識到這,我們從代碼中看看,畢竟面試中X疼的就是代碼。

二、常見的BlockQueue方法

BlockQueue接口繼承自collection接口。他的主要實作方法比較多。我們分類來看一下:

聊一聊面試中常問的java阻塞隊列

方法就這些,這些方法一個一個看和示範的話我覺得有點傻,參照網絡上别人的一些部落格也對其進行了分類:

根據插入和取出兩種類型的操作,具體分為下面一些類型:

操作類型 抛出異常 特殊值 阻塞現象 時間逾時
插入 add(o) offer(o) put(o) offer(o, timeout, unit)
取出(删除) remove(o) poll() take() poll(timeout, unit)
  • 抛出異常:這時候插入和取出在不能立即被執行的時候就會抛出異常。
  • 特殊值:插入和取出在不能被立即執行的情況下會傳回一個特殊的值(true 或者 false)
  • 阻塞:插入和取出操作在不能被立即執行時會阻塞線程,直到條件成熟,被其他線程喚醒
  • 逾時:插入和取出操作在不能立即執行的時候會被阻塞一定的時候,如果在指定的時間内沒有被執行,那麼會傳回一個特殊值。

單單從操作的次元來看的話,還是會有點亂,因為有些方法是阻塞方法,有些方法不是,我們從阻塞和不阻塞的次元再來一次劃分:

非阻塞方法 阻塞方法
add(E e) put(E e)
remove()
offer(E e) offer(E e,long timeout, TimeUnit unit)
poll(long timeout, TimeUnit unit)
peek()

現在我們再來看,相信會比較清楚一點,不過需要注意一些特殊的情況,比如offer和poll,以是否包含逾時時間來區分是否阻塞。

三、常見的阻塞隊列

實作了BlockQueue接口的隊列有很多,常見的沒有幾種,我們使用表格的形式給列出來,對比着分析一下:

隊列名 作用
ArrayBlockingQueue 由數組結構組成的有界阻塞隊列
LinkedBlockingQueue 由連結清單結構組成的有界阻塞隊列(預設為Integer.MAX_VALUE)
PriorityBlockingQueue 支援優先級排序的無界阻塞隊列
DelayQueue 使用優先級隊列實作的延遲無界阻塞隊列
SynchronousQueue 不存儲元素的阻塞隊列,也即單個元素的隊列
LinkedTransferQueue 由連結清單結構組成的無界阻塞隊列
LinkedBlockingDeque 由連結清單結構組成的雙向阻塞隊列

常見的幾種已經加粗了。

ArrayBlockingQueue和LinkedBlockingQueue是最為常用的阻塞隊列,前者使用一個有邊界的數組來作為存儲媒體,而後者使用了一個沒有邊界的連結清單來存儲資料。

PriorityBlockingQueue是一個優先阻塞隊列。所謂優先隊列,就是每次從隊隊列裡面擷取到的都是隊列中優先級最高的,對于優先級,PriorityBlockingQueue需要你為插入其中的元素類型提供一個Comparator,PriorityBlockingQueue使用這個Comparator來确定元素之間的優先級關系。底層的資料結構是堆,也就是我們資料結構中的那個堆。

DelayQueue是一個延時隊列,所謂延時隊列就是消費線程将會延時一段時間來消費元素。

SynchronousQueue是最為複雜的阻塞隊列。SynchronousQueue和前面分析的阻塞隊列都不同,因為SynchronousQueue不存在容量的說法,任何插入操作都需要等待其他線程來消費,否則就會阻塞等待,看到這種隊列心裡面估計就立馬能聯想到生産者消費者的這種模式了,沒錯,就可以使用這個隊列來實作。

現在,我們已經把阻塞隊列的一些基本知識點介紹了,完全帶細節的介紹費時又費力,下面我們針對某個阻塞隊列來看一下原理,其實就是看看源碼是如何實作的。

四、原理

我們以ArrayBlockingQueue為例,以下源碼均來自jdk1.8。還是以變量、構造函數、普通函數的順序來看:

1、變量

//The queued items:底層以數組來存儲元素 
private final E[] items;
//takeIndex和putIndex分别表示隊首元素和隊尾元素的下标
private int takeIndex;
private int putIndex;
//count表示隊列中元素的個數。
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access:可重入鎖 */
private final ReentrantLock lock;
//notEmpty和notFull是等待條件
private final Condition notEmpty;
private final Condition notFull;
           

變量的作用基本上就是這樣,我們再來接着看構造函數

2、構造函數

//1、指定隊列的容量
public ArrayBlockingQueue(int capacity) {}
//2、不僅指定容量,也指定了是否公平
public ArrayBlockingQueue(int capacity, boolean fair) { }
//3、容量、公平性而且還可以對另外一個集合進行初始化
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {}
           

上面的這些其實都是為了給其他操作做鋪墊。

3、put函數

   /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
           

首先檢查是否為空,從這個方法中我們可以看到,首先檢查隊列是否為空,然後擷取鎖,判斷目前元素個數是否等于數組的長度,如果相等,則調用notFull.await()進行等待,如果捕獲到中斷異常,則喚醒線程并抛出異常。當被其他線程喚醒時,通過enqueue(e)方法插入元素,最後解鎖。

我們按照這個源碼來看,真正實作插入操作的是enqueue,我們跟進去看看:

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
           

就幾行代碼,就是一個正常的移動數組插入的過程,不過最後還要再通知一下隊列,插入了元素,此時的隊列就不為空了。

4、take元素

還是看源碼

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
           

take的這個操作根據put反過來看就可以,真正實作的是dequeue,跟進去看看:

   /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
           

取出的時候也是一樣,數組少一個元素,數量少一,最後通過隊列不為空。其他的就不詳述了。

最後我們看看使用。我們舉一個生産者消費者的例子,畢竟這個是一個面試考點:

五、應用

public class Data {
    //flag表示是否生産,預設生産
    private volatile boolean flag = true;
    //aInteger表示産品 
    private AtomicInteger aInteger = new AtomicInteger();
    BlockingQueue<Object> queue = null;
    public Data(BlockingQueue<Object> queue) {
        this.queue = queue;
    }
    public void produce() throws Exception{
        String data = null;
        boolean retValue;
        while(flag){
            data = aInteger.incrementAndGet()+"";
            retValue = queue.offer(data, 2L, TimeUnit.SECONDS);
            System.out.println(Thread.currentThread().getName()+" 插入的結果是:"+retValue);
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+" 休息一會,馬上回來");
    }
    public void consumer() throws Exception{
        Object result = null;
        while(flag){
            result = queue.poll(2L, TimeUnit.SECONDS);
            if(result==null || ((String) result).equalsIgnoreCase("")){
                flag = false;
            }

            System.out.println(Thread.currentThread().getName()+" 消費資源成功");
            TimeUnit.SECONDS.sleep(1);
        }
    }
}