全文概要
在并发性编程中,经常会碰到生产者/消费者模式。本文将通过传统线程来模拟生产者/消费者模型,后续还会使用阻塞队列和Lock等来解决。本文主要内容如下:
- 对生产者/消费者模型的解释和说明;
- 通过wait()/notifyAll()来模拟生产者/消费者模式。
生产者/消费者模型说明
生产者/消费者模型在多线程中是一个比较经典的问题,涉及的对象包括“仓库”、“生产者”、“消费者”等,他们的关系如下:
- 当仓库满时,生产者停止生产;
- 当仓库空时,消费者停止消费。
在线程的世界里,生产者就是产生数据的线程,而消费者就是消费数据的线程;在多线程开发中,若生产者产生数据的速度比消费者消耗的数据快,那么生产者必须要等待消费者处理完,才能继续生产数据。同样的道理,若消费者消耗数据的速度大于生产者生产数据的速度,那么消费者就必须等待生产者。生产者/消费者模式就是为了解决这种生产消费能力不同的问题。
生产者/消费者案例模拟
- 代码案例
仓库模型代码:
package com.tml.javaCore.thread.comsumerAndProducer;
/**
* <p>多线程
* 生产者-消费者仓库模型
* @author Administrator
*
*/
public class Repository {
/*
* 仓库的容量
*/
private int capacity;
/*
* 仓库的实际容量
*/
private int size;
public Repository(int capacity){
this.capacity = capacity;
this.size = 0;
}
/**
* <p>生产者生产资源
* @param produceAmount 生产者预生产量
*/
public synchronized void produce(int produceAmount){
while(produceAmount > 0){
try{
while(size >= capacity){
System.out.println("has fulled!");
this.wait();
}
/*
* 获取实际的生产量
*/
int actAmount = (size + produceAmount ) > capacity ? capacity - size : produceAmount;
size += actAmount;
System.out.println(Thread.currentThread().getName() + ":has + :" +
actAmount + ",the actual size is:" + size);
produceAmount -= actAmount;
//通知消费者来消费
this.notifyAll();
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
/**
* <p>消费者消费资源
* @param consumeAmount 消费者预消费量
*/
public synchronized void consume(int consumeAmount){
while(consumeAmount > 0){
try{
while(size <= 0){
System.out.println("empty!");
this.wait();
}
/*
* 获取实际的消费量
*/
int actAmount = (size < consumeAmount)?
size:consumeAmount;
size -= actAmount;
System.out.println(Thread.currentThread().getName() + ":has - :" + actAmount
+ ",the actual size is:" + size);
consumeAmount -= actAmount;
//通知生产者来生产
this.notifyAll();
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
@Override
public String toString() {
return "Repository [capacity=" + capacity + ", size=" + size + "]";
}
}
测试案例代码:
package com.tml.javaCore.thread.comsumerAndProducer;
/**
* <p>生产者-消费者
* @author Administrator
*
*/
public class TestDemo {
public static void main(String[] args) {
//新建一个容量为100的仓库
Repository repository =new Repository(100);
//模拟生产者,每隔1000毫秒生产23个产品
new Thread(new Runnable() {
public void run() {
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
repository.produce(23);
}
}
},"producer1").start();
//模拟消费者1,每隔2500毫秒消费产品25
new Thread(new Runnable() {
public void run() {
while(true){
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
repository.consume(25);
}
}
},"consumer1").start();
//模拟消费者2,每隔1500毫秒消费产品10
new Thread(new Runnable() {
public void run() {
while(true){
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
repository.consume(10);
}
}
},"consumer2").start();
}
}
- 结果输出
截取某一次部分的输出结果,如下:
producer1:has + :23,the actual size is:23
consumer2:has - :10,the actual size is:13
producer1:has + :23,the actual size is:36
consumer1:has - :25,the actual size is:11
consumer2:has - :10,the actual size is:1
producer1:has + :23,the actual size is:24
producer1:has + :23,the actual size is:47
consumer2:has - :10,the actual size is:37
consumer1:has - :25,the actual size is:12
producer1:has + :23,the actual size is:35
consumer2:has - :10,the actual size is:25
producer1:has + :23,the actual size is:48
producer1:has + :23,the actual size is:71
consumer1:has - :25,the actual size is:46
consumer2:has - :10,the actual size is:36
producer1:has + :23,the actual size is:59
consumer2:has - :10,the actual size is:49
producer1:has + :23,the actual size is:72
consumer1:has - :25,the actual size is:47
producer1:has + :23,the actual size is:70
consumer2:has - :10,the actual size is:60
producer1:has + :23,the actual size is:83
consumer2:has - :10,the actual size is:73
producer1:has + :23,the actual size is:96
consumer1:has - :25,the actual size is:71
producer1:has + :23,the actual size is:94
consumer2:has - :10,the actual size is:84
producer1:has + :16,the actual size is:100
has fulled!
consumer1:has - :25,the actual size is:75
producer1:has + :7,the actual size is:82
consumer2:has - :10,the actual size is:72
producer1:has + :23,the actual size is:95
consumer2:has - :10,the actual size is:85
producer1:has + :15,the actual size is:100
has fulled!
consumer1:has - :25,the actual size is:75
- 结果说明
- 仓库模型中,仓库为共享资源,针对共享资源的修改,produce()和consume()两个方法都是同步的;
- 当生产者向仓库中生产数据资源的时候,若产量大于仓库的最大容量时,就会wait(),直到有一个消费者线程执行了notifyAll(),才会唤醒生产者;
- 当消费者消耗仓库中数据资源的时候,若仓库中没有可用的数据资源,就会wait()进入阻塞状态,直到有一个生产者线程执行了notifyAll(),才会唤醒消费者;
- 在测试案例中,模拟了一个生产者和两个消费者,生产者每隔1000毫秒就产生23个产品,消费者1每隔2500毫秒消费25个产品,消费者2每隔1500毫秒就消费10个产品;
- 测试发现,生产者生产数据能力略强于消费者的消费数据的能力,但是不会导致程序进入死循环或者持续等待状态,整个体系有条不紊,这就是使用传统线程实现生产者/消费者模型的一个简单案例。