天天看點

深入了解阻塞隊列

前言

建議先看一下這篇分享,深入了解Condition

阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列。這兩個附加的操作支援阻塞的插入和移除方法。

  1. 支援阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入的元素,直到隊列不滿
  2. 支援阻塞的移除方法:意思是在隊列為空時,擷取元素的線程會等待隊列變為非空

阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素,消費者用來擷取元素的容器

方法/處理方式

抛出異常

傳回特殊值

插入方法

boolean add(c)

boolean offer(e)

解釋

添加元素,添加成功傳回true,如果隊列滿了,抛出IllegalStateException

添加元素,添加成功傳回true,如果隊列滿了,傳回false

移除方法

E remove()

E poll()

解釋

傳回頭結點,從隊列中移除頭節點,如果隊列為空,抛出NoSuchElementException

傳回頭結點,從隊列中移除頭節點,如果隊列為空,傳回null

檢查方法

E element()

E peek()

解釋

傳回頭結點,但是不從隊列中移除頭節點,如果隊列為空,抛出NoSuchElementException

傳回頭結點,但是不從隊列中移除頭節點,如果隊列為空,傳回null

方法/處理方式

一直阻塞

逾時退出

插入方法

void put(e)

boolean offer(e, time, unit)

解釋

添加元素,如果隊列已經滿了,則阻塞等待

添加元素,添加成功傳回true,如果隊列已經滿了,則阻塞等待,指定時間已經過去還沒能添加成功元素,則傳回false

移除方法

E take()

E poll(time, unit)

解釋

傳回頭結點,從隊列中移除頭節點,如果隊列為空則阻塞等待

傳回頭結點,從隊列中移除頭節點,隊列中沒元素會一直阻塞等待,指定時間已經過去還沒能拿到頭節點,則傳回null

檢查方法

不可用

不可用

例子

舉一個多生産者,多消費者的例子,隊列的大小為3,即隊列大小為3時,生産者就不再生産

public class Producer implements Runnable {

    private ArrayList list;

    private int capacity;

    public Producer(ArrayList queue,int capacity) {
        this.list = queue;
        this.capacity = capacity;
    }

    @Override
    public void run() {
        synchronized (list) {
            while (true) {
                while (list.size() == capacity) {
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Object object = list.add(new Object());
                System.out.println(Thread.currentThread().getName() + " 生産");
                list.notifyAll();
            }
        }
    }
}           

複制

注意消費者和生産者都是用的notifyAll()[通知所有阻塞的線程]方法,而不是notify()[通知一個阻塞的線程]方法,因為有可能出現“生産者”喚醒“生産者”,消費者“喚醒”消費者的情況,是以有可能造成死鎖,這裡以1個消費者,3個生産者為例說一下,消費者1獲得鎖還沒産品呢,阻塞,接着生産者1獲得鎖生産完了,然後生産者2獲得鎖後生産完了,再然後生産者3獲得鎖生産完了,最後生産者1獲得鎖了,然後阻塞了,現在好了生産者和消費者都阻塞了,造成了死鎖。notifyAll()則不會造成死鎖,接着上面的步驟,生産者3生産完了釋放鎖後,會通知所有阻塞的線程,是以消費者1肯定有機會拿到鎖來進行消費

public class Consumer implements Runnable {

    private List list;

    public Consumer(List queue) {
        this.list = queue;
    }

    @Override
    public void run() {
        synchronized (list) {
            while (true) {
                while (list.size() == 0) {
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Object object = list.remove(0);
                System.out.println(Thread.currentThread().getName() + " 消費");
                list.notifyAll();
            }
        }
    }
}           

複制

public class Test {

    public static void main(String[] args) {

        ArrayList list = new ArrayList(3);
        for (int i = 0; i < 3; i++) {
            new Thread(new Producer(list, 3), "生産者" + i).start();
            new Thread(new Consumer(list), "消費者" + i).start();
        }
    }
}           

複制

一部分結果

生産者0 生産
生産者0 生産
生産者0 生産
消費者1 消費
消費者1 消費
消費者1 消費
生産者1 生産           

複制

把這個執行個體用阻塞隊列來改寫,先自己寫一個阻塞隊列,實作BlockingQueue接口,這裡隻展示了一部分重寫的方法

public class MyBlockingQueue<E> implements BlockingQueue<E> {

    private int capacity;
    private List<E> list;

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.list = new ArrayList(capacity);
    }

    @Override
    public void put(E e) throws InterruptedException {
        synchronized (this) {
            if (list.size() == capacity) {
                this.wait();
            }
            list.add(e);
            this.notifyAll();
        }
    }

    @Override
    public E take() throws InterruptedException {
        synchronized (this) {
            while (list.size() == 0) {
                this.wait();
            }
            E value = list.remove(0);
            this.notifyAll();
            return value;
        }
    }

}           

複制

public class Producer implements Runnable {

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                queue.put(new Object());
                System.out.println(Thread.currentThread().getName() + " 生産");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}           

複制

public class Consumer implements Runnable {

    private BlockingQueue queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Object object = queue.take();
                System.out.println(Thread.currentThread().getName() + " 消費");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}           

複制

public class Test {

    public static void main(String[] args) {
        BlockingQueue queue = new MyBlockingQueue(3);
        for (int i = 0; i < 3; i++) {
            new Thread(new Producer(queue), "生産者" + i).start();
            new Thread(new Consumer(queue), "消費者" + i).start();
        }
    }
}           

複制

這裡将BlockingQueue的實作改為ArrayBlockingQueue,程式運作結果一樣,和我們之前的例子比較,BlockingQueue其實就是不用我們自己寫阻塞和喚醒的部分,直接看一下ArrayBlockingQueue的源碼,其實和我自己實作的差不多,隻不過并發這部分源碼用的是ReentLock,而我用的是synchronized

源碼

基于jdk1.8.0_20

深入了解阻塞隊列

先看屬性

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

}           

複制

構造函數

// 設定同步隊列的大小和鎖是否公平
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}           

複制

put方法

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 響應中斷的方式上鎖
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 将目前線程加入notFull這個等待隊列
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}           

複制

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}           

複制

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    // 循環數組實作
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}           

複制

take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 将目前線程加入notEmpty這個等待隊列
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}           

複制

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // 放到數組的最後一個了,下一次從頭開始放
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        //更新iterators,不再分析
        itrs.elementDequeued();
    notFull.signal();
    return x;
}           

複制

其他方法就不再分析,基本上就是上鎖,然後進行相應的操作

最後說一下LZ的了解,個人感覺用ArrayBlockingQueue實作生産者和消費者,比我上面用synchronized的方式應該快很多,畢竟ArrayBlockingQueue隻會是生成者通知消費者,或者消費者通知生産者,而synchronized不是,會造成很多不必要的鎖競争,當然并沒有實驗,有時間可以試試

參考書籍

《Java并發程式設計的藝術》