天天看點

一文弄懂java中的Queue家族

目錄

  • ​​簡介​​
  • ​​Queue接口​​
  • ​​Queue的分類​​
  • ​​BlockingQueue​​
  • ​​Deque​​
  • ​​TransferQueue​​
  • ​​總結​​

java中Queue家族簡介

簡介

java中Collection集合有三大家族List,Set和Queue。當然Map也算是一種集合類,但Map并不繼承Collection接口。

List,Set在我們的工作中會經常使用,通常用來存儲結果資料,而Queue由于它的特殊性,通常用在生産者消費者模式中。

現在很火的消息中間件比如:Rabbit MQ等都是Queue這種資料結構的展開。

今天這篇文章将帶大家進入Queue家族。

Queue接口

先看下Queue的繼承關系和其中定義的方法:

一文弄懂java中的Queue家族

Queue繼承自Collection,Collection繼承自Iterable。

Queue有三類主要的方法,我們用個表格來看一下他們的差別:

方法類型 方法名稱 方法名稱 差別
Insert add offer 兩個方法都表示向Queue中添加某個元素,不同之處在于添加失敗的情況,add隻會傳回true,如果添加失敗,會抛出異常。offer在添加失敗的時候會傳回false。是以對那些有固定長度的Queue,優先使用offer方法。
Remove remove poll 如果Queue是空的情況下,remove會抛出異常,而poll會傳回null。
Examine element peek 擷取Queue頭部的元素,但不從Queue中删除。兩者的差別還是在于Queue為空的情況下,element會抛出異常,而peek傳回null。

注意,因為對poll和peek來說null是有特殊含義的,是以一般來說Queue中禁止插入null,但是在實作中還是有一些類允許插入null比如LinkedList。

盡管如此,我們在使用中還是要避免插入null元素。

Queue的分類

一般來說Queue可以分為BlockingQueue,Deque和TransferQueue三種。

BlockingQueue

BlockingQueue是Queue的一種實作,它提供了兩種額外的功能:

  1. 當目前Queue是空的時候,從BlockingQueue中擷取元素的操作會被阻塞。
  2. 當目前Queue達到最大容量的時候,插入BlockingQueue的操作會被阻塞。

BlockingQueue的操作可以分為下面四類:

操作類型 Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

第一類是會抛出異常的操作,當遇到插入失敗,隊列為空的時候抛出異常。

第二類是不會抛出異常的操作。

第三類是會Block的操作。當Queue為空或者達到最大容量的時候。

第四類是time out的操作,在給定的時間裡會Block,逾時會直接傳回。

BlockingQueue是線程安全的Queue,可以在生産者消費者模式的多線程中使用,如下所示:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }      

最後,在一個線程中向BlockQueue中插入元素之前的操作happens-before另外一個線程中從BlockQueue中删除或者擷取的操作。

Deque

Deque是Queue的子類,它代表double ended queue,也就是說可以從Queue的頭部或者尾部插入和删除元素。

同樣的,我們也可以将Deque的方法用下面的表格來表示,Deque的方法可以分為對頭部的操作和對尾部的操作:

方法類型 Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()

和Queue的方法描述基本一緻,這裡就不多講了。

當Deque以 FIFO (First-In-First-Out)的方法處理元素的時候,Deque就相當于一個Queue。

當Deque以LIFO (Last-In-First-Out)的方式處理元素的時候,Deque就相當于一個Stack。

TransferQueue

TransferQueue繼承自BlockingQueue,為什麼叫Transfer呢?因為TransferQueue提供了一個transfer的方法,生産者可以調用這個transfer方法,進而等待消費者調用take或者poll方法從Queue中拿取資料。

還提供了非阻塞和timeout版本的tryTransfer方法以供使用。

我們舉個TransferQueue實作的生産者消費者的問題。

先定義一個生産者:

@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
    private TransferQueue<String> transferQueue;

    private String name;

    private Integer messageCount;

    public static final AtomicInteger messageProduced = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                boolean added = transferQueue.tryTransfer( "第"+i+"個", 2000, TimeUnit.MILLISECONDS);
                log.info("transfered {} 是否成功: {}","第"+i+"個",added);
                if(added){
                    messageProduced.incrementAndGet();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total transfered {}",messageProduced.get());
    }
}      

在生産者的run方法中,我們調用了tryTransfer方法,等待2秒鐘,如果沒成功則直接傳回。

再定義一個消費者:

@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {

    private TransferQueue<String> transferQueue;

    private String name;

    private int messageCount;

    public static final AtomicInteger messageConsumed = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                String element = transferQueue.take();
                log.info("take {}",element );
                messageConsumed.incrementAndGet();
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total consumed {}",messageConsumed.get());
    }

}      

在run方法中,調用了transferQueue.take方法來取消息。

下面先看一下一個生産者,零個消費者的情況:

@Test
    public void testOneProduceZeroConsumer() throws InterruptedException {

        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 5);

        exService.execute(producer);

        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }      

輸出結果:

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0      

可以看到,因為沒有消費者,是以消息并沒有發送成功。

再看下一個有消費者的情況:

@Test
    public void testOneProduceOneConsumer() throws InterruptedException {

        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 2);
        Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);

        exService.execute(producer);
        exService.execute(consumer);

        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }      

輸出結果:

[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2      

可以看到Producer和Consumer是一個一個來生産和消費的。

總結

本文介紹了Queue接口和它的三大分類,這三大分類又有非常多的實作類,我們将會在後面的文章中再詳細介紹。