天天看点

Java多线程Thread-并发协作(生产者消费者设计模式)

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

解决办法

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

Java是怎么实现休眠和 放弃的呢?

这里涉及到Thread的几个方法:

wait : 放弃cpu和数据锁

notify:通知wait的Thread重新激活

sleep :睡眠(相当于工人休息)

用java来实现这个模式Demo:

主要的概念 : storage (仓库),Goods(商品),Consumer (消费者),Producer (生产者)。

1、当消费者消费后notify通知生产者去生产,当生产者生产完后notify消费者去消费。

2、当消费者发现仓库中没有可以消费的Storage 放弃锁,让生产者去生产。

3、当生产者发现仓库已经满了放弃Storage锁,放消费者去消费。

Demo中只New 了一个生产者和一个消费者,new多个也是可行的。

只需要 多加 new Thread(new Consumer()).start(); 即可拥有2个消费者

Demo 生产者 while (index<=20) 生产完20个后就关闭了,如果想让永久生产改为while(true) 这样将会永无止境的循环。

package com.example;
import java.util.LinkedList;
import java.util.List;
public class MyClass {
    //仓库
    static List<Goods> storage = new LinkedList<>();
    public static void main(String[] ars){
        System.out.print("hello world ");
        new Thread(new Consumer()).start();
        new Thread(new Producer()).start();
    }

    /**
     * 消费者
     */
    static class Consumer implements Runnable{
        @Override
        public void run() {

            while (true){

                synchronized (storage){
                    while (storage.size()==){
                        try {
                            storage.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    Goods goods = storage.remove();
                    System.out.println("Consumer:"+goods);
                    storage.notify();
                }
                try {
                    Thread.sleep((long) (Math.random()*));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }


    /**
     * 生产者
     */
    static class Producer implements Runnable{
        @Override
        public void run() {
            int index =;
            while (index<=){

                synchronized (storage){
                    while (storage.size()>=){
                        try {
                            storage.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    Goods goods = new Goods();
                    goods.name = "ID:"+index++;
                    storage.add(goods);
                    storage.notify();
                    System.out.println("Producer:"+goods);
                }
                try {
                    Thread.sleep((long) (Math.random()*));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 商品
     */
    static class Goods{
        public String name ;

        @Override
        public String toString() {
            return "-"+name;
        }
    }

}