天天看點

多線程設計模式之生産者/消費者模式一、簡述二、實作

目錄

  • 一、簡述
  • 二、實作

一、簡述

異步模式之生産者/消費者:

  • 不需要産生結果和消費結果的線程一一對應
  • 消費隊列可以用來平衡生産和消費的線程資源
  • 生産者僅負責産生結果資料,不關心資料該如何處理,而消費者專心處理結果資料
  • 消息隊列是有容量限制的,滿時不會再加入資料,空時不會再消耗資料
  • JDK 中各種阻塞隊列,采用的就是這種模式
    多線程設計模式之生産者/消費者模式一、簡述二、實作

二、實作

//Message
class Message {
    private int id;
    private Object message;
    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
    public int getId() {
        return id;
    }
    public Object getMessage() {
        return message;
    }
}
           
//消息隊列實作
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
	//使用List儲存Message
    private LinkedList<Message> queue;
    //表示隊列的大小
    private int capacity;

	
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
	//LinkedList非線程安全,使用synchronized加鎖
    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("隊列為空");
                try {
                	//調用take的線程進入等待隊列
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //直到隊列不為空,得到頭部的Message
            Message message = queue.removeFirst();
            //喚醒其他等待取Message的線程
            queue.notifyAll();
            return message;
        }
    }

	//同樣加鎖保證線程安全
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("隊列已滿");
                try {
                //調用put的線程進入等待隊列
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //直到隊列不滿,加入Message
            queue.addLast(message);
            //喚醒其他等待加入Message的線程
            queue.notifyAll();
        }
    }
}
           
public static void main(String[] args) {
		//隊列大小為2
        MessageQueue messageQueue = new MessageQueue(2);
        // 4 個生産者線程, 加載資料
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                log.debug("download...");
                List<String> response = new ArrayList<>();
                //模拟加載資料
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("try put message({})", id);
                messageQueue.put(new Message(id, response));

            }, "生産者" + i).start();
        }

        // 1 個消費者線程, 處理結果
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                List<String> response = (List<String>) message.getMessage();
                log.debug("take message({}): [{}] lines", message.getId(), response.size());
            }
        }, "消費者").start();
    }
           

繼續閱讀