天天看點

生産者消費者模型的三種實作方式

某個線程或子產品的代碼負責生産資料(工廠),而生産出來的資料卻不得不交給另一子產品(消費者)來對其進行處理,在這之間使用了隊列、棧等類似超市的東西來存儲資料(超市),這就抽象除了我們的生産者/消費者模型。

其中,産生資料的子產品,就形象地稱為生産者;而處理資料的子產品,就稱為消費者;生産者和消費者之間的中介就叫做緩沖區。

生産者消費者模型的三種實作方式

為什麼要使用生産者-消費者模型

生産者消費者模型通過一個容器解決生産者和消費者的強耦合(強度互相依賴)問題。生産者消費者彼此間不直接通訊,而通過阻塞隊列進行通訊,即生産者生産完資料,不用等待消費者消費資料,直接扔給阻塞隊列,消費者不找生産者要資料,而是從阻塞隊列裡取,阻塞隊列相當于一個緩沖區,平衡生産者和消費者的處理能力。這個阻塞隊列就是用給生産者和消費者解耦的。

生産者-消費者模型的優點

1 解耦:降低生産者和消費之間的依賴關系

如果不使用郵筒(緩沖區)需要把信件交給郵差,但是前提是你得認識快遞員(相當于生産者消費者的強耦合),萬一郵差換人了你還得重新認識一下(相當于消費者變化導緻修改生産者代碼)。而對郵筒來說比較固定,你依賴它的成本比較低(相當于和緩沖區之間的弱耦合)。

2 支援并發

即生産者和消費者是兩個可以獨立的并發主體,互不幹擾的運作,從寄信的例子看,如果沒有郵筒就需要在路口等待郵差過來收(相當于生産者阻塞);又或者郵差挨家挨戶的詢問誰要寄信(相當于消費者輪詢)。不管是那種方法效率都比較低下。

3 支援忙閑不均

如果生産資料的速度時快時慢,緩沖區可以對其進行适當緩沖。當生産的資料太塊時,消費者來不及處理,未處理的資料可以暫時存在緩沖區。等生産者的生産速度慢下來,消費者再慢慢處理掉。 

例如寄信的例子,假設郵差一次隻能帶1000封信,萬一某次碰到了中秋節送賀卡,需要郵遞的信封數量超過1000封,這個時候郵筒(緩沖區)就派上用場了,郵差吧來不及帶走的信封暫存在郵筒中,等下次再過來拿。

實作方式

1、使用synchronized(wait()和notify())

public class ProducerConsumer {
    public static void main(String[] args) {
        Resource resource = new Resource();
        Producer p1 = new Producer(resource);
        Producer p2 = new Producer(resource);
        Producer p3 = new Producer(resource);

        Consumer c1 = new Consumer(resource);

        p1.start();
        p2.start();
        p3.start();
        c1.start();
    }
}

//公共資源類
class Resource {
    private int number = 0;
    private int size = 10;

    /**
     * 取資源
     */
    public synchronized void remove() {
        if (number > 0) {
            number--;

            System.out.println("消費者" + Thread.currentThread().getName() + ":" + number);
            notifyAll();
        } else {
            try {
                wait();
                System.out.println("消費者" + Thread.currentThread().getName() + "進入等待");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 添加資源
     */
    public synchronized void add() {
        if (number < size) {
            number++;
            System.out.println("生産者" + Thread.currentThread().getName() + ":" + number);
            notifyAll();
        } else {
            try {
                wait();
                System.out.println("生産者" + Thread.currentThread().getName() + "進入等待");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

//生産者
class Producer extends Thread {
    private Resource resource;

    public Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }
}

//消費者
class Consumer extends Thread {
    private Resource resource;

    public Consumer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
           

2、使用Lock實作(await()和signal())

public class ProducerConsumer {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition producerCondition = lock.newCondition();
        Condition consumerCondition = lock.newCondition();
        Resource resource = new Resource(lock,producerCondition,consumerCondition);
        Producer p1 = new Producer(resource);
        Consumer c1 = new Consumer(resource);
        Consumer c2 = new Consumer(resource);
        Consumer c3 = new Consumer(resource);
        p1.start();;
        c1.start();
        c2.start();
        c3.start();
    }
}
class Resource{
    private int number = 0;
    private int size = 10;
    private Lock lock;
    private Condition producerCondition;
    private Condition consumerCondition;

    public Resource(Lock lock, Condition producerCondition, Condition consumerCondition) {
        this.lock = lock;
        this.producerCondition = producerCondition;
        this.consumerCondition = consumerCondition;
    }
    public void add(){
        lock.lock();
        try {
            if (number < size){
                number++;
                System.out.println(Thread.currentThread().getName()+":"+number);
                //喚醒等待的消費者
                consumerCondition.signalAll();
            }else {
                try {
                    producerCondition.await();
                    System.out.println(Thread.currentThread().getName()+"線程進入等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            lock.unlock();
        }
    }
    //消費者取資源
    public void remove(){
        lock.lock();
        try {
            if (number > 0){
                number--;
                System.out.println(Thread.currentThread().getName()+":"+number);
                //喚醒等待的生産者
                producerCondition.signalAll();
            }else {
                try {
                    consumerCondition.await();
                    System.out.println(Thread.currentThread().getName()+"線程進入等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            lock.unlock();
        }
    }
}
class Producer extends Thread{
    private Resource resource;
    public Producer(Resource resource){
        this.resource = resource;
        setName("生産者");
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }

}
class Consumer extends Thread{
    private Resource resource;
    public Consumer(Resource resource){
        this.resource = resource;
        setName("消費者");
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
           

3、阻塞隊列實作

/**
 * 使用阻塞隊列實作
 */
public class ProducerConsumer {
    public static void main(String[] args) {
        Resource resource = new Resource();
        Producer p1 = new Producer(resource);
        Producer p2 = new Producer(resource);
        Consumer c1 = new Consumer(resource);
        p1.start();
        p2.start();
        c1.start();
    }
}
class Resource{
    BlockingQueue queue = new LinkedBlockingQueue(10);
    //添加資源
    public void add() {
        try {
            queue.put(1);
            System.out.println("生産者"+Thread.currentThread().getName()+":"+queue.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //消費資源
    public void remove(){
        try {
            queue.take();
            System.out.println("消費者"+Thread.currentThread().getName()+":"+queue.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}
class Producer extends Thread{
    private Resource resource;
    public Producer(Resource resource){
        this.resource = resource;
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }
}
class Consumer extends Thread{
    private Resource resource;
    public Consumer(Resource resource){
        this.resource = resource;
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
           

參考:

https://www.cnblogs.com/shiqi17/p/9550678.html

https://blog.csdn.net/yu876876/article/details/81776879

繼續閱讀