目錄
- 一、簡述
- 二、實作
一、簡述
異步模式之生産者/消費者:
- 不需要産生結果和消費結果的線程一一對應
- 消費隊列可以用來平衡生産和消費的線程資源
- 生産者僅負責産生結果資料,不關心資料該如何處理,而消費者專心處理結果資料
- 消息隊列是有容量限制的,滿時不會再加入資料,空時不會再消耗資料
- JDK 中各種阻塞隊列,采用的就是這種模式
二、實作
//Message
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
//消息隊列實作
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
//使用List儲存Message
private LinkedList<Message> queue;
//表示隊列的大小
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
//LinkedList非線程安全,使用synchronized加鎖
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
log.debug("隊列為空");
try {
//調用take的線程進入等待隊列
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//直到隊列不為空,得到頭部的Message
Message message = queue.removeFirst();
//喚醒其他等待取Message的線程
queue.notifyAll();
return message;
}
}
//同樣加鎖保證線程安全
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
log.debug("隊列已滿");
try {
//調用put的線程進入等待隊列
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//直到隊列不滿,加入Message
queue.addLast(message);
//喚醒其他等待加入Message的線程
queue.notifyAll();
}
}
}
public static void main(String[] args) {
//隊列大小為2
MessageQueue messageQueue = new MessageQueue(2);
// 4 個生産者線程, 加載資料
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
log.debug("download...");
List<String> response = new ArrayList<>();
//模拟加載資料
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("try put message({})", id);
messageQueue.put(new Message(id, response));
}, "生産者" + i).start();
}
// 1 個消費者線程, 處理結果
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
List<String> response = (List<String>) message.getMessage();
log.debug("take message({}): [{}] lines", message.getId(), response.size());
}
}, "消費者").start();
}