java.util.concurrent.BlockingQueue是一个 java 队列,它支持在检索和删除元素时等待队列变为非空,并在添加元素时等待队列中的空间变为可用。
Java BlockingQueue
Java BlockingQueue 不接受null值,如果您尝试在队列中存储空值,则会抛出NullPointerException。Java BlockingQueue 实现是线程安全的。所有排队方法本质上都是原子的,并使用内部锁或其他形式的并发控制。Java BlockingQueue 接口是 java 集合框架的一部分,主要用于实现生产者消费者问题。我们不需要担心等待生产者或对象可用的空间可供 BlockingQueue 中的使用者使用,因为它由 BlockingQueue 的实现类处理。Java 提供了几个 BlockingQueue 实现,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。在 BlockingQueue 中实现生产者消费者问题时,我们将使用 ArrayBlockingQueue 实现。以下是您应该知道的一些重要方法。
- put(E e):此方法用于将元素插入队列。如果队列已满,它将等待空间可用。
- E take():此方法从队列的头部检索并删除元素。如果队列为空,则等待元素可用。
现在让我们使用 java BlockingQueue 实现生产者消费者问题。
Java 阻塞队列示例 - 消息
只是一个普通的java对象,将由生产者生成并添加到队列中。您也可以将其称为有效负载或队列消息。
public class Message {
private String msg;
public Message(String str){
this.msg=str;
}
public String getMsg() {
return msg;
}
}
Java BlockingQueue 示例 - Producer
将创建消息并将其放入队列的生产者类。
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<Message> queue;
public Producer(BlockingQueue<Message> q){
this.queue = q;
}
@Override
public void run() {
//produce messages
for(int i = 0; i < 100; i++){
Message msg = new Message(""+i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced "+msg.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//adding exit message
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Java 阻塞队列示例 - 消费者
使用者类,它将处理队列中的消息,并在收到退出消息时终止。
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<Message> queue;
public Consumer(BlockingQueue<Message> q){
this.queue = q;
}
@Override
public void run() {
try{
Message msg;
//consuming messages until exit message is received
while(( !"exit".equals(msg = queue.take()).getMsg())){
Thread.sleep(10);
System.out.println("Consumed "+msg.getMsg());
}
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
Java 阻塞队列示例 - 服务
最后,我们必须为生产者和消费者创建 BlockingQueue 服务。此生产者使用者服务将创建具有固定大小的 BlockingQueue,并与生产者和使用者共享。此服务将启动生产者和使用者线程并退出。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
上述java BlockingQueue示例程序的输出如下所示:
Producer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...