天天看点

生产者消费者模型的三种实现方式

某个线程或模块的代码负责生产数据(工厂),而生产出来的数据却不得不交给另一模块(消费者)来对其进行处理,在这之间使用了队列、栈等类似超市的东西来存储数据(超市),这就抽象除了我们的生产者/消费者模型。

其中,产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者;生产者和消费者之间的中介就叫做缓冲区。

生产者消费者模型的三种实现方式

为什么要使用生产者-消费者模型

生产者消费者模型通过一个容器解决生产者和消费者的强耦合(强度相互依赖)问题。生产者消费者彼此间不直接通讯,而通过阻塞队列进行通讯,即生产者生产完数据,不用等待消费者消费数据,直接扔给阻塞队列,消费者不找生产者要数据,而是从阻塞队列里取,阻塞队列相当于一个缓冲区,平衡生产者和消费者的处理能力。这个阻塞队列就是用给生产者和消费者解耦的。

生产者-消费者模型的优点

1 解耦:降低生产者和消费之间的依赖关系

如果不使用邮筒(缓冲区)需要把信件交给邮递员,但是前提是你得认识快递员(相当于生产者消费者的强耦合),万一邮递员换人了你还得重新认识一下(相当于消费者变化导致修改生产者代码)。而对邮筒来说比较固定,你依赖它的成本比较低(相当于和缓冲区之间的弱耦合)。

2 支持并发

即生产者和消费者是两个可以独立的并发主体,互不干扰的运行,从寄信的例子看,如果没有邮筒就需要在路口等待邮递员过来收(相当于生产者阻塞);又或者邮递员挨家挨户的询问谁要寄信(相当于消费者轮询)。不管是那种方法效率都比较低下。

3 支持忙闲不均

如果生产数据的速度时快时慢,缓冲区可以对其进行适当缓冲。当生产的数据太块时,消费者来不及处理,未处理的数据可以暂时存在缓冲区。等生产者的生产速度慢下来,消费者再慢慢处理掉。 

例如寄信的例子,假设邮递员一次只能带1000封信,万一某次碰到了中秋节送贺卡,需要邮递的信封数量超过1000封,这个时候邮筒(缓冲区)就派上用场了,邮递员吧来不及带走的信封暂存在邮筒中,等下次再过来拿。

实现方式

1、使用synchronized(wait()和notify())

public class ProducerConsumer {
    public static void main(String[] args) {
        Resource resource = new Resource();
        Producer p1 = new Producer(resource);
        Producer p2 = new Producer(resource);
        Producer p3 = new Producer(resource);

        Consumer c1 = new Consumer(resource);

        p1.start();
        p2.start();
        p3.start();
        c1.start();
    }
}

//公共资源类
class Resource {
    private int number = 0;
    private int size = 10;

    /**
     * 取资源
     */
    public synchronized void remove() {
        if (number > 0) {
            number--;

            System.out.println("消费者" + Thread.currentThread().getName() + ":" + number);
            notifyAll();
        } else {
            try {
                wait();
                System.out.println("消费者" + Thread.currentThread().getName() + "进入等待");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 添加资源
     */
    public synchronized void add() {
        if (number < size) {
            number++;
            System.out.println("生产者" + Thread.currentThread().getName() + ":" + number);
            notifyAll();
        } else {
            try {
                wait();
                System.out.println("生产者" + Thread.currentThread().getName() + "进入等待");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

//生产者
class Producer extends Thread {
    private Resource resource;

    public Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }
}

//消费者
class Consumer extends Thread {
    private Resource resource;

    public Consumer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
           

2、使用Lock实现(await()和signal())

public class ProducerConsumer {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition producerCondition = lock.newCondition();
        Condition consumerCondition = lock.newCondition();
        Resource resource = new Resource(lock,producerCondition,consumerCondition);
        Producer p1 = new Producer(resource);
        Consumer c1 = new Consumer(resource);
        Consumer c2 = new Consumer(resource);
        Consumer c3 = new Consumer(resource);
        p1.start();;
        c1.start();
        c2.start();
        c3.start();
    }
}
class Resource{
    private int number = 0;
    private int size = 10;
    private Lock lock;
    private Condition producerCondition;
    private Condition consumerCondition;

    public Resource(Lock lock, Condition producerCondition, Condition consumerCondition) {
        this.lock = lock;
        this.producerCondition = producerCondition;
        this.consumerCondition = consumerCondition;
    }
    public void add(){
        lock.lock();
        try {
            if (number < size){
                number++;
                System.out.println(Thread.currentThread().getName()+":"+number);
                //唤醒等待的消费者
                consumerCondition.signalAll();
            }else {
                try {
                    producerCondition.await();
                    System.out.println(Thread.currentThread().getName()+"线程进入等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            lock.unlock();
        }
    }
    //消费者取资源
    public void remove(){
        lock.lock();
        try {
            if (number > 0){
                number--;
                System.out.println(Thread.currentThread().getName()+":"+number);
                //唤醒等待的生产者
                producerCondition.signalAll();
            }else {
                try {
                    consumerCondition.await();
                    System.out.println(Thread.currentThread().getName()+"线程进入等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            lock.unlock();
        }
    }
}
class Producer extends Thread{
    private Resource resource;
    public Producer(Resource resource){
        this.resource = resource;
        setName("生产者");
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }

}
class Consumer extends Thread{
    private Resource resource;
    public Consumer(Resource resource){
        this.resource = resource;
        setName("消费者");
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
           

3、阻塞队列实现

/**
 * 使用阻塞队列实现
 */
public class ProducerConsumer {
    public static void main(String[] args) {
        Resource resource = new Resource();
        Producer p1 = new Producer(resource);
        Producer p2 = new Producer(resource);
        Consumer c1 = new Consumer(resource);
        p1.start();
        p2.start();
        c1.start();
    }
}
class Resource{
    BlockingQueue queue = new LinkedBlockingQueue(10);
    //添加资源
    public void add() {
        try {
            queue.put(1);
            System.out.println("生产者"+Thread.currentThread().getName()+":"+queue.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //消费资源
    public void remove(){
        try {
            queue.take();
            System.out.println("消费者"+Thread.currentThread().getName()+":"+queue.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}
class Producer extends Thread{
    private Resource resource;
    public Producer(Resource resource){
        this.resource = resource;
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }
}
class Consumer extends Thread{
    private Resource resource;
    public Consumer(Resource resource){
        this.resource = resource;
    }
    public void run(){
        while (true){
            try {
                Thread.sleep((long) (1000+Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
           

参考:

https://www.cnblogs.com/shiqi17/p/9550678.html

https://blog.csdn.net/yu876876/article/details/81776879

继续阅读