线程的并发协作:生产者消费者模式
应用场景:生产者消费者问题
1、假设仓库只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中的产品取走消费
2、如果仓库没有产品,则生产者将产品放入仓库,否则停止生产并等待,知道仓库中的产品被消费者取走为止
3、如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止
分析:这是一个线程同步问题,生产者消费者共享同一个资源,并且消费者生产者之间相互依赖,互为条件
1、对于生产者,没有生产产品之前,要通知消费者等待;而生产了产品之后,又要马上通知消费者消费
2、对于消费者,在消费之后,要通知生产者已经消费结束,需要继续生产产品以供消费
3、在生产者消费者模式中,仅有synchronized是不够的
3.1、synchronized可阻止并发更新同一个共享资源,实现了同步
3.2、synchronized不能用来实现不同线程之间的消息传递(通信)
解决方式1:并发协作模型:“生产者/消费者模式” --> 管程法,借助缓冲区
生产者:负责生产数据的模块(这里的模块可能是:方法、对象、线程、进程)
消费者:负责处理数据的模块(这里的模块可能是:方法、对象、线程、进程)
缓冲区:消费者不能直接使用生产者的数据,他们之间有个缓冲区;生产者将产品放入缓冲区,消费者从缓冲区拿要处理的数据
- 实现线程的并发协作
有了缓冲区以后,生产者线程只需要往缓冲区里面放置数据,而不需要管消费者消费的情况;
同样,消费者只需要从缓冲区拿数据处理即可,也不需要管生产者生产的情况。
这样,就从逻辑上实现了“生产者线程”和“消费者线程”的分离。
- 解耦了生产者和消费者
生产者不需要和消费者直接打交道。
- 解决忙闲不均,提高效率
生产者生产数据慢时,缓冲区仍有数据,不影响消费者消费;
消费者处理数据慢时,生产者仍然可以继续往缓冲区里面放置数据 。
解决方式2:并发协作模型:“生产者/消费者模式” --> 信号灯法,借助标志位
生产者:负责生产数据的模块(这里的模块可能是:方法、对象、线程、进程)
消费者:负责处理数据的模块(这里的模块可能是:方法、对象、线程、进程)
缓冲对象:生产者消费者使用同一资源,他们之间有个标志位,类似于信号灯的作用,通过信号灯控制生产者和消费者的循环使用
注:不同场景使用不同的解决方法
一、使用管程法模拟
/**
* 协作模型:生产者消费者生产模型方式1:管程法
* 借助缓冲区
*/
public class ProduceAndConsumption {
public static void main(String[] args) {
SynContainer container = new SynContainer(); //缓冲区对象
new Productor(container).start(); //定义生产者线程
new Consumer(container).start(); //定义消费者线程
}
}
//馒头
class SteamedBread {
private int id;
SteamedBread(int id){
this.id = id;
}
public int getId(){
return id;
}
}
//缓冲区,相当于馒头筐
class SynContainer {
SteamedBread[] sbs = new SteamedBread[10];
int count = 0; //计数器
//存储 生产
public synchronized void push(SteamedBread sb) {
if(count == sbs.length){ //缓冲区满,将if替换为while也可以
//wait后,线程会将持有的锁释放,进入阻塞状态;
//这样其它需要锁的线程就可以获得锁;
try {
this.wait();
//这里的含义是执行此方法的线程暂停,进入阻塞状态,
//等消费者消费了馒头后再生产。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
// 唤醒在当前对象等待池中等待的第一个线程。
// notifyAll叫醒所有在当前对象等待池中等待的所有线程。
sbs[count++] = sb;
}
//消费 获取
public synchronized SteamedBread pop() {
//没有可以消费的,需要等待
if(count == 0){ //缓冲区空
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
//可以消费
return sbs[--count];
}
}
//生产者
class Productor extends Thread {
SynContainer container;
public Productor(SynContainer container) {
this.container = container;
}
@Override
public void run() {
for(int i = 0; i < 20; i++){
System.out.println("生产的产品号" + i);
container.push(new SteamedBread(i));
}
}
}
//消费者
class Consumer extends Thread{
SynContainer container;
public Consumer(SynContainer container) {
this.container = container;
}
@Override
public void run() {
for(int i = 0; i < 15; i++){
System.out.println("----------消费的产品号" + container.pop().getId());
}
}
}
模拟结果
生产的产品号0
-----------消费的产品号0
生产的产品号1
生产的产品号2
生产的产品号3
生产的产品号4
-----------消费的产品号2
-----------消费的产品号4
-----------消费的产品号3
-----------消费的产品号1
生产的产品号5
生产的产品号6
生产的产品号7
生产的产品号8
生产的产品号9
生产的产品号10
生产的产品号11
生产的产品号12
-----------消费的产品号9
生产的产品号13
-----------消费的产品号12
生产的产品号14
-----------消费的产品号13
生产的产品号15
-----------消费的产品号14
-----------消费的产品号15
生产的产品号16
-----------消费的产品号11
生产的产品号17
-----------消费的产品号16
-----------消费的产品号17
-----------消费的产品号10
-----------消费的产品号8
生产的产品号18
生产的产品号19
Process finished with exit code 0
二、使用信号灯法模拟
/**
* 协作模型:生产者消费者生产模型方式2:信号灯法
* 借助标志位
*/
public class ProduceAndConsumption02 {
public static void main(String[] args) {
TV tv = new TV(); //缓冲区对象
new Player(tv).start(); //定义生产者线程
new Watcher(tv).start(); //定义消费者线程
}
}
//生产者 演员
class Player extends Thread {
TV tv;
public Player(TV tv) {
this.tv = tv;
}
@Override
public void run() {
for(int i = 0; i < 10; i++){
if(i % 2 == 0){
this.tv.play("hello");
}else{
this.tv.play("world");
}
}
}
}
//消费者 观众
class Watcher extends Thread {
TV tv;
public Watcher(TV tv) {
this.tv = tv;
}
@Override
public void run() {
for(int i = 0; i < 10; i++){
this.tv.watch();
}
}
}
//同一个资源 电视
class TV {
String voice;
//加入信号灯
//true 表示演员表演,观众等待
//false 表示观众观看,演员等待
boolean flag = true;
//表演
public synchronized void play(String voice){
//演员等待
if(!flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//开始表演
this.voice = voice;
System.out.println("表演了" + voice);
//表演结束,切换flag,唤醒观众,
this.flag = !this.flag; //this.flag = false; 也行,不过建议采用取反的方式,防止错误
this.notify();
}
//观看
public synchronized void watch(){
//观众等待
if(flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//观众开始观看
System.out.println("----------听到了" + voice);
//观众观看完,切换flag,唤醒演员
this.flag = !this.flag;
this.notify();
}
}
模拟结果
表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
参考资料:
https://www.sxt.cn/Java_jQuery_in_action/eleven-threadconcurrent-collaboration.html