天天看點

Java并發程式設計:阻塞隊列及實作生産者-消費者模式

1 什麼是阻塞隊列

  JDK 1.5的java.util.concurrent包提供了多種阻塞隊列。阻塞隊列相對于PriorityQueue、LinkedList等非阻塞隊列的特點是提供了,隊列阻塞的操作,優化了隊列為空向隊列取資料或者隊列滿向隊列加資料時的阻塞操作。以生産者-消費者模式為例,當隊列為空時消費者線程會被挂起,等到隊列中有資料時會自動的恢複并消費。

1.1 常見的阻塞隊列

  BlockingQueue接口的主要實作有如下幾種:

  ArrayBlockingQueue :基于數組的有界阻塞隊列,構造時可以指定隊列大小,預設為非公平(不保證等待最久的任務最先處理)。

  LinkedBlockingQueue :基于連結清單的有界阻塞隊列,如果不指定大小則預設為Integer.MAX_VALUE,基本可以認為是無界的。

  PriorityBlockingQueue :優先級排序的無界阻塞隊列,元素出隊列的順序按照優先級排序。

  DelayQueue:基于優先級隊列的無界阻塞隊列。隊列中的元素隻有到達規定的延時才能從隊列中取出。

  SynchronousQueue:不存儲元素的阻塞隊列,隻有前一個将隊列中的元素取走時才能加入新的元素。

1.2 阻塞隊列常見的方法

  常見的非阻塞隊列的操作清單如下:

  add(E e):将元素e插入到隊列末尾,如果插入成功,則傳回true;如果插入失敗(即隊列已滿),則會抛出異常;

  remove():移除隊首元素,若移除成功,則傳回true;如果移除失敗(隊列為空),則會抛出異常;

  offer(E e):将元素e插入到隊列末尾,如果插入成功,則傳回true;如果插入失敗(即隊列已滿),則傳回false;

  poll():移除并擷取隊首元素,若成功,則傳回隊首元素;否則傳回null;

  peek():擷取隊首元素,若成功,則傳回隊首元素;否則傳回null

阻塞隊列實作了非阻塞隊列的操作方法,為了實作“阻塞”提供了take和put方法。

  take():擷取并移除隊首元素,如果隊列為空則阻塞直到隊列中有元素。

  put():向隊尾添加元素,如果隊列滿則等待直到可以添加。

2 LinkedBlockingQueue源碼分析

  LinkedBlockingQueue是阻塞隊列中比較常用的,ThreadPoolExecutor類的實作中多是用的這個隊列。下面通過源碼分下阻塞隊列的工作原理。

2.1 構造方法源碼分析

  LinkedBlockingQueue共有三個構造方法,分别功能為預設大小,指定大小以及帶初始化的構造方法。

/**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE); //無參的構造函數,最大容量為Integer(4位元組)的最大表示值
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) { //指定容量的構造函數,大小為capacity
        if (capacity <= ) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}, initially containing the elements of the
     * given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public LinkedBlockingQueue(Collection<? extends E> c) { //帶初始化的構造方法,可以将指定集合中的元素初始化到阻塞隊列中
        this(Integer.MAX_VALUE);  //最大容量為Integer(4位元組)的最大表示值 
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = ;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
}
           

2.2 put方法源碼分析

  put方法内部通過Condition的await和signal方法實作了線程之間的同步,和使用線程同步實作生産者消費者的代碼邏輯差不多。同步隊列采用了兩把鎖,讀鎖(takeLock)和寫鎖(putLock)。

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary for space to become available.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock; //定義可重入   寫鎖
    final AtomicInteger count = this.count; //原子類
    putLock.lockInterruptibly(); //進入臨界區,他和lock的差別是lockInterruptibly不進行中斷而是向上層抛出異常
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        while (count.get() == capacity) { //容量已滿,線程進入阻塞狀态,交出鎖并且交出CPU
            notFull.await();  //寫鎖的Condition
        }
        enqueue(node); //加入隊列
        c = count.getAndIncrement();
        if (c +  < capacity) //隊列未滿,喚醒一個等待寫入的線程
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == ) //隊列第一次不為空,喚醒一個等待讀取的線程
        signalNotEmpty(); 
}
           

2.3 take方法源碼分析

public E take() throws InterruptedException {
    E x;
    int c = -;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock; //定義可重入鎖  讀鎖
    takeLock.lockInterruptibly(); //進入臨界區,他和lock的差別是lockInterruptibly不進行中斷而是向上層抛出異常
    try {
        while (count.get() == ) { //如果隊列為空, 讀取線程進入阻塞狀态,交出讀鎖和CPU
            notEmpty.await();
        }
        x = dequeue(); //擷取隊首元素
        c = count.getAndDecrement();
        if (c > ) //如果取完隊列中還存在資料,則喚醒其他等待讀取的線程
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity) //沒取之前隊列為滿的,取完之後要喚醒一個寫入線程
        signalNotFull();
    return x;
}
           

3 生産者-消費者模式實作

  使用阻塞隊列實作生産者-消費者模式不需要我們自己編碼控制讀寫線程的阻塞和喚醒操作,由上節分析的take()、put()方法可知,阻塞隊列内部替我們實作了線程的阻塞和喚醒操作。

public class BlockingQueue {
    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); //阻塞隊列

    public static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                    Integer element = queue.take();
                    System.out.println(Thread.currentThread().getName() + "消費了一個産品...");
            } catch (InterruptedException e) {}
        }
    }

    public static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                queue.put();
                System.out.println(Thread.currentThread().getName() + "生産了一個産品...");
            } catch (InterruptedException e) {}
        }
    }

    public static void main(String[] args) {
        ExecutorService comsumerPool = Executors.newSingleThreadExecutor();
        ExecutorService producerPool = Executors.newSingleThreadExecutor();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        int i = ;
        while (true) {
            comsumerPool.execute(consumer);
            producerPool.execute(producer);
            if (i++ > ) {
                break;
            }
        }
        try {
            TimeUnit.SECONDS.sleep();
        } catch (InterruptedException e) {}

        comsumerPool.shutdownNow();
        producerPool.shutdownNow();
    }
}
           

  部分執行結果如下:

pool--thread-生産了一個産品...
pool--thread-生産了一個産品...
pool--thread-生産了一個産品...
pool--thread-生産了一個産品...
pool--thread-生産了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
pool--thread-消費了一個産品...
pool--thread-生産了一個産品...
           

  由執行結果看出,阻塞隊列很好的完成了生産者消費者模型,并且代碼實作簡單。

參考:

[1] http://www.cnblogs.com/dolphin0520/p/3932906.html

[2] http://www.infoq.com/cn/articles/java-blocking-queue

[3] http://blog.csdn.net/ghsau/article/details/8108292

[4] http://blog.csdn.net/ns_code/article/details/17511147