見賢思齊焉,見不賢而内自省也。—《論語》
Java5.0 增加了兩種新的容器類型,它們是指:Queue 和 BlockingQueue。Queue 用來臨時儲存一組等待處理的元素。BlockingQueue 擴張了 Queue 接口,增加了可阻塞的插入和擷取等操作。
BlockingQueue 通常運用于一個線程生産對象放入隊列,另一個線程從隊列擷取對象并消費,這是典型的生産者消費者模型。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczLcVmds92czlGZvwVP9EUTDZ0aRJkSwk0LcxGbpZ2LcBDM08CXlpXazRnbvZ2LcRlMMVDT2EWNvwFdu9mZvwVP9E1T0MmaOFTU61UeFRkW0ZkMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2LcRHelR3LcJzLctmch1mclRXY39DO2gjM1QDNwEDMzUDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
生産者線程持續生産新對象并插入隊列,如果隊列已滿,那麼插入對象的操作會一直阻塞,直到隊列中出現可用的空間。
消費者線程持續從隊列擷取對象,如果隊列為空,那麼擷取操作會一直阻塞,直到隊列中出現可用的新對象。BlockingQueue 簡化了生産者-消費者設計的實作過程,它支援任意數量的生産者和消費者。
BlockingQueue 的核心方法:
offer(E): 向隊列插入元素,并傳回插入成功與否。本方法不阻塞目前執行線程。
put(E) : 向隊列插入元素,如果隊列已滿,則會阻塞目前線程直至元素加入隊列。
take() : 擷取隊列的首位元素,如果隊列為空,則阻塞目前線程直至隊列有元素并取走。
poll():擷取隊列首個元素,指定時間内一旦資料可取,則立即傳回;否則傳回失敗。
remove(E):删除隊列中的元素,傳回成功與否。
BlockingQueue 的實作
BlockingQueue是一個接口,是以你必須使用它的實作來使用它。它的實作包括以下幾個:
- ArrayBlockingQueue:基于數組實作的有界隊列(FIFO),使用一把全局鎖并行對 queue 讀寫操作,同時使用兩個 Condition 阻塞隊列為空時的取操作和隊列為滿時的寫操作。
- LinkedBlockingQueue:基于已連結節點的,範圍上限為 Integer.MAX_VALUE 的 blocking queue(FIFO)。主要操作 put 和 take 都是阻塞的。
-
DelayQueue:當指定的延遲時間到了,才能夠從隊列中擷取元素。它沒有大小限制,是以插入元素時不會阻塞,而隻有擷取元素時才會被阻塞。它的用法可以參考下面兩篇部落格:
http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html,
http://www.cnblogs.com/sunzhenchao/p/3515085.html
- PriorityBlockingQueue: 基于優先級的阻塞隊列,但需要注意的是PriorityBlockingQueue并不會阻塞資料生産者,而隻會在沒有可消費的資料時,阻塞資料的消費者。
-
SynchronoutQueue:它的内部同時隻能夠容納單個元素。如果該隊列已有一個元素的話,試圖向隊列插入一個新元素線程會阻塞,知道另一個線程将該元素從隊列中拿走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程将會阻塞,知道另一個線程向隊列中插入一個新的元素。SynchronousQueue适合一對一的比對場景,沒有容量,無法緩存。它的用法強烈推薦部落格:
http://www.cnblogs.com/leesf456/p/5560362.html
BlockingQueue的使用
這是一個使用 BlockingQueue 的例子,本例使用 ArrayBlockingQueue 實作。首先,BlockingQueueTest 建立一個生産者線程 Procucer, 把字元存放進共享隊列。然後建立三個消費者線程 Consumer,把字元串從隊列中取出。Consumer 取到最後一個字元串時,中斷所有消費者線程,結束程式。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
//隊列容量
private static final int SIZE = ;
private static final int CONSUMER_SIZE = ;
//消費者線程退出标志
private static String endString = "num:" + (SIZE*-);
//存放消費者線程
private static List list = new ArrayList<Thread>();
public static void main(String[] args) throws Exception{
//建立固定長度的阻塞隊列
BlockingQueue q = new ArrayBlockingQueue<String>(SIZE);
//建立生産者
Producer producer = new Producer(q);
//啟動生産者線程,生産對象
producer.start();
//啟動消費者線程,擷取隊列對象
for(int i = ; i < CONSUMER_SIZE; i++) {
list.add(new Consumer(q));
((Thread) list.get(i)).start();
}
}
//中斷線程
public static void shutDownThread() {
for(int i = ; i < CONSUMER_SIZE; i++) {
((Thread) list.get(i)).interrupt();
}
}
static class Producer extends Thread{
private BlockingQueue queue = null;
public Producer(BlockingQueue q) {
this.queue = q;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
//生産10個對象,放進隊列
for(int i = ; i < SIZE*; i++) {
String str = "num:" + i;
System.out.println(Thread.currentThread().getName() +":"+"IN: " + str);
queue.put(str);
Thread.sleep();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//消費者線程
static class Consumer extends Thread{
private BlockingQueue queue = null;
public Consumer(BlockingQueue q) {
this.queue = q;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
//擷取隊列的元素,隊列為空時會阻塞
while(true) {
String str = (String) queue.take();
System.out.println(Thread.currentThread().getName() + ":" + "OUT " + str);
//已經取出最後一個元素,消費者線程應該退出,否則程式一直在運作
if(str.equals(endString)) {
shutDownThread();//中斷線程
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
執行結果:
總結
java.util.concurrent 中實作的阻塞隊列包含了足夠的同步機制,進而能夠安全的将對象從生産者線程釋出到消費者線程。對于可變對象,生産者-消費者模型,把對象的所有權安全的從生産者傳遞給消費者。轉移後,消費者線程獲得這個對象的所有權(獨占通路權,可以任意修改它),并且生産者線程不會再通路它。同時,阻塞隊列負責所有的控制流,使得消費者生産者的代碼更加簡單和清晰。
本文完畢,如對你有幫助,請關注我,謝謝~
參考
- https://blog.csdn.net/defonds/article/details/44021605/
- http://www.cnblogs.com/leesf456/p/5428630.html
- 《并發程式設計實戰》
本文原創首發于微信公衆号 [ 林裡少年 ],歡迎關注第一時間擷取更新。
[Java并發程式設計實戰] 阻塞隊列 BlockingQueue(含代碼,生産者-消費者模型)