目录
- 一、简述
- 二、实现
一、简述
异步模式之生产者/消费者:
- 不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
二、实现
//Message
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
//消息队列实现
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
//使用List保存Message
private LinkedList<Message> queue;
//表示队列的大小
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
//LinkedList非线程安全,使用synchronized加锁
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
log.debug("队列为空");
try {
//调用take的线程进入等待队列
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//直到队列不为空,得到头部的Message
Message message = queue.removeFirst();
//唤醒其他等待取Message的线程
queue.notifyAll();
return message;
}
}
//同样加锁保证线程安全
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
log.debug("队列已满");
try {
//调用put的线程进入等待队列
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//直到队列不满,加入Message
queue.addLast(message);
//唤醒其他等待加入Message的线程
queue.notifyAll();
}
}
}
public static void main(String[] args) {
//队列大小为2
MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 加载数据
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
log.debug("download...");
List<String> response = new ArrayList<>();
//模拟加载数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("try put message({})", id);
messageQueue.put(new Message(id, response));
}, "生产者" + i).start();
}
// 1 个消费者线程, 处理结果
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
List<String> response = (List<String>) message.getMessage();
log.debug("take message({}): [{}] lines", message.getId(), response.size());
}
}, "消费者").start();
}