天天看點

Java BlockingQueue阻塞隊列

作者:農非農

java.util.concurrent.BlockingQueue是一個 java 隊列,它支援在檢索和删除元素時等待隊列變為非空,并在添加元素時等待隊列中的空間變為可用。

Java BlockingQueue阻塞隊列

Java BlockingQueue

Java BlockingQueue 不接受null值,如果您嘗試在隊列中存儲空值,則會抛出NullPointerException。Java BlockingQueue 實作是線程安全的。所有排隊方法本質上都是原子的,并使用内部鎖或其他形式的并發控制。Java BlockingQueue 接口是 java 集合架構的一部分,主要用于實作生産者消費者問題。我們不需要擔心等待生産者或對象可用的空間可供 BlockingQueue 中的使用者使用,因為它由 BlockingQueue 的實作類處理。Java 提供了幾個 BlockingQueue 實作,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。在 BlockingQueue 中實作生産者消費者問題時,我們将使用 ArrayBlockingQueue 實作。以下是您應該知道的一些重要方法。

  • put(E e):此方法用于将元素插入隊列。如果隊列已滿,它将等待空間可用。
  • E take():此方法從隊列的頭部檢索并删除元素。如果隊列為空,則等待元素可用。

現在讓我們使用 java BlockingQueue 實作生産者消費者問題。

Java 阻塞隊列示例 - 消息

隻是一個普通的java對象,将由生産者生成并添加到隊列中。您也可以将其稱為有效負載或隊列消息。

public class Message {
    private String msg;
    
    public Message(String str){
        this.msg=str;
    }

    public String getMsg() {
        return msg;
    }
}           

Java BlockingQueue 示例 - Producer

将建立消息并将其放入隊列的生産者類。

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue<Message> queue;
    
    public Producer(BlockingQueue<Message> q){
        this.queue = q;
    }
    @Override
    public void run() {
        //produce messages
        for(int i = 0; i < 100;  i++){
            Message msg = new Message(""+i);
            try {
                Thread.sleep(i);
                queue.put(msg);
                System.out.println("Produced "+msg.getMsg());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //adding exit message
        Message msg = new Message("exit");
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}           

Java 阻塞隊列示例 - 消費者

使用者類,它将處理隊列中的消息,并在收到退出消息時終止。

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

private BlockingQueue<Message> queue;
    
    public Consumer(BlockingQueue<Message> q){
        this.queue = q;
    }

    @Override
    public void run() {
        try{
            Message msg;
            //consuming messages until exit message is received
            while(( !"exit".equals(msg = queue.take()).getMsg())){
            Thread.sleep(10);
            System.out.println("Consumed "+msg.getMsg());
            }
        }catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}           

Java 阻塞隊列示例 - 服務

最後,我們必須為生産者和消費者建立 BlockingQueue 服務。此生産者使用者服務将建立具有固定大小的 BlockingQueue,并與生産者和使用者共享。此服務将啟動生産者和使用者線程并退出。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerService {

    public static void main(String[] args) {
        //Creating BlockingQueue of size 10
        BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        //starting producer to produce messages in queue
        new Thread(producer).start();
        //starting consumer to consume messages from queue
        new Thread(consumer).start();
        System.out.println("Producer and Consumer has been started");
    }
}           

上述java BlockingQueue示例程式的輸出如下所示:

Producer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...           

繼續閱讀