天天看点

线程的并发协作:生产者消费者模式(管程法与信号灯法)

线程的并发协作:生产者消费者模式

应用场景:生产者消费者问题

1、假设仓库只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中的产品取走消费

2、如果仓库没有产品,则生产者将产品放入仓库,否则停止生产并等待,知道仓库中的产品被消费者取走为止

3、如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止

分析:这是一个线程同步问题,生产者消费者共享同一个资源,并且消费者生产者之间相互依赖,互为条件

1、对于生产者,没有生产产品之前,要通知消费者等待;而生产了产品之后,又要马上通知消费者消费

2、对于消费者,在消费之后,要通知生产者已经消费结束,需要继续生产产品以供消费

3、在生产者消费者模式中,仅有synchronized是不够的

     3.1、synchronized可阻止并发更新同一个共享资源,实现了同步

     3.2、synchronized不能用来实现不同线程之间的消息传递(通信)

解决方式1:并发协作模型:“生产者/消费者模式”  -->  管程法,借助缓冲区

生产者:负责生产数据的模块(这里的模块可能是:方法、对象、线程、进程)

消费者:负责处理数据的模块(这里的模块可能是:方法、对象、线程、进程)

缓冲区:消费者不能直接使用生产者的数据,他们之间有个缓冲区;生产者将产品放入缓冲区,消费者从缓冲区拿要处理的数据

- 实现线程的并发协作

    有了缓冲区以后,生产者线程只需要往缓冲区里面放置数据,而不需要管消费者消费的情况;

    同样,消费者只需要从缓冲区拿数据处理即可,也不需要管生产者生产的情况。 

    这样,就从逻辑上实现了“生产者线程”和“消费者线程”的分离。

- 解耦了生产者和消费者

    生产者不需要和消费者直接打交道。

- 解决忙闲不均,提高效率

    生产者生产数据慢时,缓冲区仍有数据,不影响消费者消费;

    消费者处理数据慢时,生产者仍然可以继续往缓冲区里面放置数据 。

解决方式2:并发协作模型:“生产者/消费者模式”  -->  信号灯法,借助标志位

生产者:负责生产数据的模块(这里的模块可能是:方法、对象、线程、进程)

消费者:负责处理数据的模块(这里的模块可能是:方法、对象、线程、进程)

缓冲对象:生产者消费者使用同一资源,他们之间有个标志位,类似于信号灯的作用,通过信号灯控制生产者和消费者的循环使用

注:不同场景使用不同的解决方法

一、使用管程法模拟

/**
 * 协作模型:生产者消费者生产模型方式1:管程法
 * 借助缓冲区
 */
public class ProduceAndConsumption {

    public static void main(String[] args) {
        SynContainer container = new SynContainer(); //缓冲区对象
        new Productor(container).start();            //定义生产者线程
        new Consumer(container).start();             //定义消费者线程

    }
}

//馒头
class SteamedBread {
    private int id;
    SteamedBread(int id){
        this.id = id;
    }

    public int getId(){
        return id;
    }
}

//缓冲区,相当于馒头筐
class SynContainer {
    SteamedBread[] sbs = new SteamedBread[10];
    int count = 0; //计数器
    //存储  生产
    public synchronized void push(SteamedBread sb) {
        if(count == sbs.length){  //缓冲区满,将if替换为while也可以
            //wait后,线程会将持有的锁释放,进入阻塞状态;
            //这样其它需要锁的线程就可以获得锁;
            try {
                this.wait();
                //这里的含义是执行此方法的线程暂停,进入阻塞状态,
                //等消费者消费了馒头后再生产。
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.notify();
        // 唤醒在当前对象等待池中等待的第一个线程。
        // notifyAll叫醒所有在当前对象等待池中等待的所有线程。
        sbs[count++] = sb;
    }

    //消费  获取
    public synchronized SteamedBread pop() {
        //没有可以消费的,需要等待
        if(count == 0){ //缓冲区空 
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.notify();
        //可以消费
        return sbs[--count];
    }

}

//生产者
class Productor extends Thread {
    SynContainer container;
    public Productor(SynContainer container) {
        this.container = container;
    }

    @Override
    public void run() {
        for(int i = 0; i < 20; i++){
            System.out.println("生产的产品号" + i);
            container.push(new SteamedBread(i));
        }
    }
}

//消费者
class Consumer extends Thread{

    SynContainer container;
    public Consumer(SynContainer container) {
        this.container = container;
    }
    @Override
    public void run() {
        for(int i = 0; i < 15; i++){
            System.out.println("----------消费的产品号" + container.pop().getId());
        }
    }
}
           

模拟结果 

生产的产品号0
-----------消费的产品号0
生产的产品号1
生产的产品号2
生产的产品号3
生产的产品号4
-----------消费的产品号2
-----------消费的产品号4
-----------消费的产品号3
-----------消费的产品号1
生产的产品号5
生产的产品号6
生产的产品号7
生产的产品号8
生产的产品号9
生产的产品号10
生产的产品号11
生产的产品号12
-----------消费的产品号9
生产的产品号13
-----------消费的产品号12
生产的产品号14
-----------消费的产品号13
生产的产品号15
-----------消费的产品号14
-----------消费的产品号15
生产的产品号16
-----------消费的产品号11
生产的产品号17
-----------消费的产品号16
-----------消费的产品号17
-----------消费的产品号10
-----------消费的产品号8
生产的产品号18
生产的产品号19

Process finished with exit code 0
           

二、使用信号灯法模拟

/**
 * 协作模型:生产者消费者生产模型方式2:信号灯法
 * 借助标志位
 */
public class ProduceAndConsumption02 {

    public static void main(String[] args) {
        TV tv = new TV(); //缓冲区对象
        new Player(tv).start();            //定义生产者线程
        new Watcher(tv).start();             //定义消费者线程

    }
}

//生产者 演员
class Player extends Thread {
    TV tv;

    public Player(TV tv) {
        this.tv = tv;
    }

    @Override
    public void run() {
        for(int i = 0; i < 10; i++){
            if(i % 2 == 0){
                this.tv.play("hello");
            }else{
                this.tv.play("world");
            }
        }
    }
}

//消费者 观众
class Watcher extends Thread {
    TV tv;

    public Watcher(TV tv) {
        this.tv = tv;
    }

    @Override
    public void run() {
        for(int i = 0; i < 10; i++){
            this.tv.watch();
        }
    }
}

//同一个资源 电视
class TV {
    String voice;

    //加入信号灯
    //true 表示演员表演,观众等待
    //false 表示观众观看,演员等待
    boolean flag = true;

    //表演
    public synchronized void play(String voice){
        //演员等待
        if(!flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //开始表演
        this.voice = voice;
        System.out.println("表演了" + voice);
        //表演结束,切换flag,唤醒观众,
        this.flag = !this.flag; //this.flag = false; 也行,不过建议采用取反的方式,防止错误
        this.notify();
    }
    //观看
    public synchronized void watch(){
        //观众等待
        if(flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //观众开始观看
        System.out.println("----------听到了" + voice);
        //观众观看完,切换flag,唤醒演员
        this.flag = !this.flag;
        this.notify();
    }

}
           

模拟结果

表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
表演了hello
----------听到了hello
表演了world
----------听到了world
           

参考资料:

https://www.sxt.cn/Java_jQuery_in_action/eleven-threadconcurrent-collaboration.html