前言
建議先看一下這篇分享,深入了解Condition
阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列。這兩個附加的操作支援阻塞的插入和移除方法。
- 支援阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入的元素,直到隊列不滿
- 支援阻塞的移除方法:意思是在隊列為空時,擷取元素的線程會等待隊列變為非空
阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素,消費者用來擷取元素的容器
方法/處理方式
抛出異常
傳回特殊值
插入方法
boolean add(c)
boolean offer(e)
解釋
添加元素,添加成功傳回true,如果隊列滿了,抛出IllegalStateException
添加元素,添加成功傳回true,如果隊列滿了,傳回false
移除方法
E remove()
E poll()
解釋
傳回頭結點,從隊列中移除頭節點,如果隊列為空,抛出NoSuchElementException
傳回頭結點,從隊列中移除頭節點,如果隊列為空,傳回null
檢查方法
E element()
E peek()
解釋
傳回頭結點,但是不從隊列中移除頭節點,如果隊列為空,抛出NoSuchElementException
傳回頭結點,但是不從隊列中移除頭節點,如果隊列為空,傳回null
方法/處理方式
一直阻塞
逾時退出
插入方法
void put(e)
boolean offer(e, time, unit)
解釋
添加元素,如果隊列已經滿了,則阻塞等待
添加元素,添加成功傳回true,如果隊列已經滿了,則阻塞等待,指定時間已經過去還沒能添加成功元素,則傳回false
移除方法
E take()
E poll(time, unit)
解釋
傳回頭結點,從隊列中移除頭節點,如果隊列為空則阻塞等待
傳回頭結點,從隊列中移除頭節點,隊列中沒元素會一直阻塞等待,指定時間已經過去還沒能拿到頭節點,則傳回null
檢查方法
不可用
不可用
例子
舉一個多生産者,多消費者的例子,隊列的大小為3,即隊列大小為3時,生産者就不再生産
public class Producer implements Runnable {
private ArrayList list;
private int capacity;
public Producer(ArrayList queue,int capacity) {
this.list = queue;
this.capacity = capacity;
}
@Override
public void run() {
synchronized (list) {
while (true) {
while (list.size() == capacity) {
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object object = list.add(new Object());
System.out.println(Thread.currentThread().getName() + " 生産");
list.notifyAll();
}
}
}
}
複制
注意消費者和生産者都是用的notifyAll()[通知所有阻塞的線程]方法,而不是notify()[通知一個阻塞的線程]方法,因為有可能出現“生産者”喚醒“生産者”,消費者“喚醒”消費者的情況,是以有可能造成死鎖,這裡以1個消費者,3個生産者為例說一下,消費者1獲得鎖還沒産品呢,阻塞,接着生産者1獲得鎖生産完了,然後生産者2獲得鎖後生産完了,再然後生産者3獲得鎖生産完了,最後生産者1獲得鎖了,然後阻塞了,現在好了生産者和消費者都阻塞了,造成了死鎖。notifyAll()則不會造成死鎖,接着上面的步驟,生産者3生産完了釋放鎖後,會通知所有阻塞的線程,是以消費者1肯定有機會拿到鎖來進行消費
public class Consumer implements Runnable {
private List list;
public Consumer(List queue) {
this.list = queue;
}
@Override
public void run() {
synchronized (list) {
while (true) {
while (list.size() == 0) {
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object object = list.remove(0);
System.out.println(Thread.currentThread().getName() + " 消費");
list.notifyAll();
}
}
}
}
複制
public class Test {
public static void main(String[] args) {
ArrayList list = new ArrayList(3);
for (int i = 0; i < 3; i++) {
new Thread(new Producer(list, 3), "生産者" + i).start();
new Thread(new Consumer(list), "消費者" + i).start();
}
}
}
複制
一部分結果
生産者0 生産
生産者0 生産
生産者0 生産
消費者1 消費
消費者1 消費
消費者1 消費
生産者1 生産
複制
把這個執行個體用阻塞隊列來改寫,先自己寫一個阻塞隊列,實作BlockingQueue接口,這裡隻展示了一部分重寫的方法
public class MyBlockingQueue<E> implements BlockingQueue<E> {
private int capacity;
private List<E> list;
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
this.list = new ArrayList(capacity);
}
@Override
public void put(E e) throws InterruptedException {
synchronized (this) {
if (list.size() == capacity) {
this.wait();
}
list.add(e);
this.notifyAll();
}
}
@Override
public E take() throws InterruptedException {
synchronized (this) {
while (list.size() == 0) {
this.wait();
}
E value = list.remove(0);
this.notifyAll();
return value;
}
}
}
複制
public class Producer implements Runnable {
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
queue.put(new Object());
System.out.println(Thread.currentThread().getName() + " 生産");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
複制
public class Consumer implements Runnable {
private BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Object object = queue.take();
System.out.println(Thread.currentThread().getName() + " 消費");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
複制
public class Test {
public static void main(String[] args) {
BlockingQueue queue = new MyBlockingQueue(3);
for (int i = 0; i < 3; i++) {
new Thread(new Producer(queue), "生産者" + i).start();
new Thread(new Consumer(queue), "消費者" + i).start();
}
}
}
複制
這裡将BlockingQueue的實作改為ArrayBlockingQueue,程式運作結果一樣,和我們之前的例子比較,BlockingQueue其實就是不用我們自己寫阻塞和喚醒的部分,直接看一下ArrayBlockingQueue的源碼,其實和我自己實作的差不多,隻不過并發這部分源碼用的是ReentLock,而我用的是synchronized
源碼
基于jdk1.8.0_20

先看屬性
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}
複制
構造函數
// 設定同步隊列的大小和鎖是否公平
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
複制
put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 響應中斷的方式上鎖
lock.lockInterruptibly();
try {
while (count == items.length)
// 将目前線程加入notFull這個等待隊列
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
複制
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
複制
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 循環數組實作
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
複制
take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 将目前線程加入notEmpty這個等待隊列
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
複制
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 放到數組的最後一個了,下一次從頭開始放
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
//更新iterators,不再分析
itrs.elementDequeued();
notFull.signal();
return x;
}
複制
其他方法就不再分析,基本上就是上鎖,然後進行相應的操作
最後說一下LZ的了解,個人感覺用ArrayBlockingQueue實作生産者和消費者,比我上面用synchronized的方式應該快很多,畢竟ArrayBlockingQueue隻會是生成者通知消費者,或者消費者通知生産者,而synchronized不是,會造成很多不必要的鎖競争,當然并沒有實驗,有時間可以試試
參考書籍
《Java并發程式設計的藝術》