目錄
- 簡介
- Queue接口
- Queue的分類
- BlockingQueue
- Deque
- TransferQueue
- 總結
java中Queue家族簡介
簡介
java中Collection集合有三大家族List,Set和Queue。當然Map也算是一種集合類,但Map并不繼承Collection接口。
List,Set在我們的工作中會經常使用,通常用來存儲結果資料,而Queue由于它的特殊性,通常用在生産者消費者模式中。
現在很火的消息中間件比如:Rabbit MQ等都是Queue這種資料結構的展開。
今天這篇文章将帶大家進入Queue家族。
Queue接口
先看下Queue的繼承關系和其中定義的方法:

Queue繼承自Collection,Collection繼承自Iterable。
Queue有三類主要的方法,我們用個表格來看一下他們的差別:
方法類型 | 方法名稱 | 方法名稱 | 差別 |
Insert | add | offer | 兩個方法都表示向Queue中添加某個元素,不同之處在于添加失敗的情況,add隻會傳回true,如果添加失敗,會抛出異常。offer在添加失敗的時候會傳回false。是以對那些有固定長度的Queue,優先使用offer方法。 |
Remove | remove | poll | 如果Queue是空的情況下,remove會抛出異常,而poll會傳回null。 |
Examine | element | peek | 擷取Queue頭部的元素,但不從Queue中删除。兩者的差別還是在于Queue為空的情況下,element會抛出異常,而peek傳回null。 |
注意,因為對poll和peek來說null是有特殊含義的,是以一般來說Queue中禁止插入null,但是在實作中還是有一些類允許插入null比如LinkedList。
盡管如此,我們在使用中還是要避免插入null元素。
Queue的分類
一般來說Queue可以分為BlockingQueue,Deque和TransferQueue三種。
BlockingQueue
BlockingQueue是Queue的一種實作,它提供了兩種額外的功能:
- 當目前Queue是空的時候,從BlockingQueue中擷取元素的操作會被阻塞。
- 當目前Queue達到最大容量的時候,插入BlockingQueue的操作會被阻塞。
BlockingQueue的操作可以分為下面四類:
操作類型 | Throws exception | Special value | Blocks | Times out |
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
第一類是會抛出異常的操作,當遇到插入失敗,隊列為空的時候抛出異常。
第二類是不會抛出異常的操作。
第三類是會Block的操作。當Queue為空或者達到最大容量的時候。
第四類是time out的操作,在給定的時間裡會Block,逾時會直接傳回。
BlockingQueue是線程安全的Queue,可以在生産者消費者模式的多線程中使用,如下所示:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
最後,在一個線程中向BlockQueue中插入元素之前的操作happens-before另外一個線程中從BlockQueue中删除或者擷取的操作。
Deque
Deque是Queue的子類,它代表double ended queue,也就是說可以從Queue的頭部或者尾部插入和删除元素。
同樣的,我們也可以将Deque的方法用下面的表格來表示,Deque的方法可以分為對頭部的操作和對尾部的操作:
方法類型 | Throws exception | Special value | Throws exception | Special value |
Insert | addFirst(e) | offerFirst(e) | addLast(e) | offerLast(e) |
Remove | removeFirst() | pollFirst() | removeLast() | pollLast() |
Examine | getFirst() | peekFirst() | getLast() | peekLast() |
和Queue的方法描述基本一緻,這裡就不多講了。
當Deque以 FIFO (First-In-First-Out)的方法處理元素的時候,Deque就相當于一個Queue。
當Deque以LIFO (Last-In-First-Out)的方式處理元素的時候,Deque就相當于一個Stack。
TransferQueue
TransferQueue繼承自BlockingQueue,為什麼叫Transfer呢?因為TransferQueue提供了一個transfer的方法,生産者可以調用這個transfer方法,進而等待消費者調用take或者poll方法從Queue中拿取資料。
還提供了非阻塞和timeout版本的tryTransfer方法以供使用。
我們舉個TransferQueue實作的生産者消費者的問題。
先定義一個生産者:
@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private Integer messageCount;
public static final AtomicInteger messageProduced = new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
boolean added = transferQueue.tryTransfer( "第"+i+"個", 2000, TimeUnit.MILLISECONDS);
log.info("transfered {} 是否成功: {}","第"+i+"個",added);
if(added){
messageProduced.incrementAndGet();
}
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
log.info("total transfered {}",messageProduced.get());
}
}
在生産者的run方法中,我們調用了tryTransfer方法,等待2秒鐘,如果沒成功則直接傳回。
再定義一個消費者:
@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private int messageCount;
public static final AtomicInteger messageConsumed = new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
String element = transferQueue.take();
log.info("take {}",element );
messageConsumed.incrementAndGet();
Thread.sleep(500);
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
log.info("total consumed {}",messageConsumed.get());
}
}
在run方法中,調用了transferQueue.take方法來取消息。
下面先看一下一個生産者,零個消費者的情況:
@Test
public void testOneProduceZeroConsumer() throws InterruptedException {
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(10);
Producer producer = new Producer(transferQueue, "ProducerOne", 5);
exService.execute(producer);
exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
exService.shutdown();
}
輸出結果:
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0
可以看到,因為沒有消費者,是以消息并沒有發送成功。
再看下一個有消費者的情況:
@Test
public void testOneProduceOneConsumer() throws InterruptedException {
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(10);
Producer producer = new Producer(transferQueue, "ProducerOne", 2);
Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);
exService.execute(producer);
exService.execute(consumer);
exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
exService.shutdown();
}
輸出結果:
[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2
可以看到Producer和Consumer是一個一個來生産和消費的。
總結
本文介紹了Queue接口和它的三大分類,這三大分類又有非常多的實作類,我們将會在後面的文章中再詳細介紹。