天天看点

使用wait()/notify()模拟生产者消费者模型

全文概要

在并发性编程中,经常会碰到生产者/消费者模式。本文将通过传统线程来模拟生产者/消费者模型,后续还会使用阻塞队列和Lock等来解决。本文主要内容如下:

  • 对生产者/消费者模型的解释和说明;
  • 通过wait()/notifyAll()来模拟生产者/消费者模式。

生产者/消费者模型说明

生产者/消费者模型在多线程中是一个比较经典的问题,涉及的对象包括“仓库”、“生产者”、“消费者”等,他们的关系如下:

  1. 当仓库满时,生产者停止生产;
  2. 当仓库空时,消费者停止消费。

在线程的世界里,生产者就是产生数据的线程,而消费者就是消费数据的线程;在多线程开发中,若生产者产生数据的速度比消费者消耗的数据快,那么生产者必须要等待消费者处理完,才能继续生产数据。同样的道理,若消费者消耗数据的速度大于生产者生产数据的速度,那么消费者就必须等待生产者。生产者/消费者模式就是为了解决这种生产消费能力不同的问题。

生产者/消费者案例模拟

  • 代码案例

仓库模型代码:

package com.tml.javaCore.thread.comsumerAndProducer;
/**
 * <p>多线程 
 * 生产者-消费者仓库模型
 * @author Administrator
 *
 */
public class Repository {
	/*
	 * 仓库的容量
	 */
	private  int capacity;
	/*
	 * 仓库的实际容量
	 */
	private int size;
	
	public Repository(int capacity){
		this.capacity = capacity;
		this.size = 0;
	}
	
	/**
	 * <p>生产者生产资源
	 * @param produceAmount 生产者预生产量
	 */
	public  synchronized void produce(int produceAmount){
		while(produceAmount > 0){
			try{
				while(size >=  capacity){
					System.out.println("has fulled!");
					this.wait();
				}
				/*
				 * 获取实际的生产量
				 */
				int actAmount = (size + produceAmount ) > capacity ? capacity - size : produceAmount;
				size += actAmount;
				System.out.println(Thread.currentThread().getName() + ":has + :" +
						actAmount + ",the actual size is:" + size);
				produceAmount -= actAmount;
				//通知消费者来消费
				this.notifyAll();
				
			}catch(InterruptedException e){
				e.printStackTrace();
			}
			
		} 
	}
	
	/**
	 * <p>消费者消费资源
	 * @param consumeAmount 消费者预消费量
	 */
	public synchronized void consume(int consumeAmount){
		while(consumeAmount > 0){
			try{
				while(size <= 0){
					System.out.println("empty!");
					this.wait();
				}
				/*
				 * 获取实际的消费量
				 */
				int actAmount = (size < consumeAmount)?
						size:consumeAmount;
				size -= actAmount;
				System.out.println(Thread.currentThread().getName() + ":has - :" + actAmount 
						+ ",the actual size is:" + size);
				consumeAmount -= actAmount;
				//通知生产者来生产
				this.notifyAll();
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}		
	}
	

	@Override
	public String toString() {
		return "Repository [capacity=" + capacity + ", size=" + size + "]";
	}

}
           

测试案例代码:

package com.tml.javaCore.thread.comsumerAndProducer;
/**
 * <p>生产者-消费者
 * @author Administrator
 *
 */
public class TestDemo {
	public static void main(String[] args) {
		//新建一个容量为100的仓库
		Repository repository =new Repository(100);
		
		//模拟生产者,每隔1000毫秒生产23个产品
		new  Thread(new Runnable() {
			public void run() {
				while(true){
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					repository.produce(23);
				}
			}
		},"producer1").start();
		
		//模拟消费者1,每隔2500毫秒消费产品25
		new  Thread(new Runnable() {
			public void run() {
				while(true){
					try {
						Thread.sleep(2500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					
					repository.consume(25);
				}
			}
		},"consumer1").start();
		
		//模拟消费者2,每隔1500毫秒消费产品10
		new  Thread(new Runnable() {
			public void run() {
				while(true){
					try {
						Thread.sleep(1500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					
					repository.consume(10);
				}
				
			}
		},"consumer2").start();
	}

}
           
  • 结果输出

截取某一次部分的输出结果,如下:

    producer1:has + :23,the actual size is:23

    consumer2:has - :10,the actual size is:13

    producer1:has + :23,the actual size is:36

    consumer1:has - :25,the actual size is:11

    consumer2:has - :10,the actual size is:1

    producer1:has + :23,the actual size is:24

    producer1:has + :23,the actual size is:47

    consumer2:has - :10,the actual size is:37

    consumer1:has - :25,the actual size is:12

    producer1:has + :23,the actual size is:35

    consumer2:has - :10,the actual size is:25

    producer1:has + :23,the actual size is:48

    producer1:has + :23,the actual size is:71

    consumer1:has - :25,the actual size is:46

    consumer2:has - :10,the actual size is:36

    producer1:has + :23,the actual size is:59

    consumer2:has - :10,the actual size is:49

    producer1:has + :23,the actual size is:72

    consumer1:has - :25,the actual size is:47

    producer1:has + :23,the actual size is:70

    consumer2:has - :10,the actual size is:60

    producer1:has + :23,the actual size is:83

    consumer2:has - :10,the actual size is:73

    producer1:has + :23,the actual size is:96

    consumer1:has - :25,the actual size is:71

    producer1:has + :23,the actual size is:94

    consumer2:has - :10,the actual size is:84

    producer1:has + :16,the actual size is:100

    has fulled!

    consumer1:has - :25,the actual size is:75

    producer1:has + :7,the actual size is:82

    consumer2:has - :10,the actual size is:72

    producer1:has + :23,the actual size is:95

    consumer2:has - :10,the actual size is:85

    producer1:has + :15,the actual size is:100

    has fulled!

    consumer1:has - :25,the actual size is:75

  • 结果说明
  1. 仓库模型中,仓库为共享资源,针对共享资源的修改,produce()和consume()两个方法都是同步的;
  2. 当生产者向仓库中生产数据资源的时候,若产量大于仓库的最大容量时,就会wait(),直到有一个消费者线程执行了notifyAll(),才会唤醒生产者;
  3. 当消费者消耗仓库中数据资源的时候,若仓库中没有可用的数据资源,就会wait()进入阻塞状态,直到有一个生产者线程执行了notifyAll(),才会唤醒消费者;
  4. 在测试案例中,模拟了一个生产者和两个消费者,生产者每隔1000毫秒就产生23个产品,消费者1每隔2500毫秒消费25个产品,消费者2每隔1500毫秒就消费10个产品;
  5. 测试发现,生产者生产数据能力略强于消费者的消费数据的能力,但是不会导致程序进入死循环或者持续等待状态,整个体系有条不紊,这就是使用传统线程实现生产者/消费者模型的一个简单案例。

继续阅读