天天看點

用代碼實作生産者消費者模型

題目:代碼實作生産者消費者模型,3個生産者每隔3秒生産1個機關的資料,2個消費者每隔1秒消費一個機關的資料,共享資源有界。要求不能出現死鎖或者占用CPU不釋放的情況。

方法一:用synchronized關鍵字加鎖實作

// 共享資源
public class Resource {
    private LinkedList<String> commonList;
    private int capacity = 0;

    public Resource(int capacity) {
        this.capacity = capacity;
        this.commonList = new LinkedList<>();
    }
    public void addLast(String item) {
        commonList.addLast(item);
    }
    public void removeFirst() {
        commonList.removeFirst();
    }

    public boolean isFull() {
        return commonList.size() == this.capacity;
    }
}
// 生産者
public class Producer extends Thread {

    private int      duration;
    private Resource resource;

    public Producer (int duration, Resource resource) {
        this.duration = duration;
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                synchronized (resource) {
                    if (!resource.isFull()) {
                        resource.addLast("Producer_" + Thread.currentThread().getName());
                        // notify/notifyAll() 方法,喚醒一個或多個正處于等待狀态的線程
                        resource.notifyAll();
                    } else {
                        // wait()使目前線程阻塞,前提是必須先獲得鎖配合synchronized 關鍵字使用
                        resource.wait();
                    }
                }
                Thread.sleep(1000 * duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消費者
public class Consumer extends Thread {

    private int      duration;
    private Resource resource;

    public Consumer (int duration, Resource resource) {
        this.duration = duration;
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                synchronized (resource) {
                    // 共享區有資料才能取資料
                    if (resource.size() > 0) {
                        resource.removeFirst();
                        // 消費資料後喚醒生産者
                        resource.notifyAll();
                    } else {
                        // wait()使目前線程阻塞,前提是 必須先獲得鎖,配合synchronized 關鍵字使用
                        resource.wait();
                    }
                }
                Thread.sleep(1000 * duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 測試
public class TestMain {
    public static void main(String[] args) {
        Resource resource = new Resource(10);
        new Producer(3, resource).start();
        new Producer(3, resource).start();
        new Producer(3, resource).start();
        new Consumer(1, resource).start();
        new Consumer(1, resource).start();
    }
}
           

方法二:用阻塞隊列來充當共享資源,代碼中不用再顯式的處理鎖,例如ArrayBlockingQueue内部就已經使用一個ReentrantLock各兩個Condition來實作了對put操作與take操作的阻塞/喚醒。

// 生産者
public class Producer extends Thread {
    private int   duration;
    private BlockingQueue<String> blockingQueue;

    public Producer (int duration, BlockingQueue<String> blockingQueue) {
        this.duration = duration;
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 隊列滿時,put操作将被阻塞直到隊列有空間為止
                blockingQueue.put("Producer_" + Thread.currentThread().getName());
                Thread.sleep(1000 * duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消費者
public class Consumer extends Thread {
    private int                   duration;
    private BlockingQueue<String> blockingQueue;

    public Consumer (int duration, BlockingQueue<String> blockingQueue) {
        this.duration = duration;
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String e = blockingQueue.take();
                Thread.sleep(1000 * duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


public class TestMain {
    public static void main(String[] args) {
        BlockingQueue<String> resource = new ArrayBlockingQueue<>(24);
        new Producer(3, resource).start();
        new Producer(3, resource).start();
        new Producer(3, resource).start();
        new Consumer(1, resource).start();
        new Consumer(1, resource).start();
    }
}