天天看點

Java多線程Thread-并發協作(生産者消費者設計模式)

生産者消費者問題(英語:Producer-consumer problem),也稱有限緩沖問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了兩個共享固定大小緩沖區的線程——即所謂的“生産者”和“消費者”——在實際運作時會發生的問題。生産者的主要作用是生成一定量的資料放到緩沖區中,然後重複此過程。與此同時,消費者也在緩沖區消耗這些資料。該問題的關鍵就是要保證生産者不會在緩沖區滿時加入資料,消費者也不會在緩沖區中空時消耗資料。

解決辦法

要解決該問題,就必須讓生産者在緩沖區滿時休眠(要麼幹脆就放棄資料),等到下次消費者消耗緩沖區中的資料的時候,生産者才能被喚醒,開始往緩沖區添加資料。同樣,也可以讓消費者在緩沖區空時進入休眠,等到生産者往緩沖區添加資料之後,再喚醒消費者。通常采用程序間通信的方法解決該問題,常用的方法有信号燈法等。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生産者和消費者的情形。

Java是怎麼實作休眠和 放棄的呢?

這裡涉及到Thread的幾個方法:

wait : 放棄cpu和資料鎖

notify:通知wait的Thread重新激活

sleep :睡眠(相當于勞工休息)

用java來實作這個模式Demo:

主要的概念 : storage (倉庫),Goods(商品),Consumer (消費者),Producer (生産者)。

1、當消費者消費後notify通知生産者去生産,當生産者生産完後notify消費者去消費。

2、當消費者發現倉庫中沒有可以消費的Storage 放棄鎖,讓生産者去生産。

3、當生産者發現倉庫已經滿了放棄Storage鎖,放消費者去消費。

Demo中隻New 了一個生産者和一個消費者,new多個也是可行的。

隻需要 多加 new Thread(new Consumer()).start(); 即可擁有2個消費者

Demo 生産者 while (index<=20) 生産完20個後就關閉了,如果想讓永久生産改為while(true) 這樣将會永無止境的循環。

package com.example;
import java.util.LinkedList;
import java.util.List;
public class MyClass {
    //倉庫
    static List<Goods> storage = new LinkedList<>();
    public static void main(String[] ars){
        System.out.print("hello world ");
        new Thread(new Consumer()).start();
        new Thread(new Producer()).start();
    }

    /**
     * 消費者
     */
    static class Consumer implements Runnable{
        @Override
        public void run() {

            while (true){

                synchronized (storage){
                    while (storage.size()==){
                        try {
                            storage.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    Goods goods = storage.remove();
                    System.out.println("Consumer:"+goods);
                    storage.notify();
                }
                try {
                    Thread.sleep((long) (Math.random()*));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }


    /**
     * 生産者
     */
    static class Producer implements Runnable{
        @Override
        public void run() {
            int index =;
            while (index<=){

                synchronized (storage){
                    while (storage.size()>=){
                        try {
                            storage.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    Goods goods = new Goods();
                    goods.name = "ID:"+index++;
                    storage.add(goods);
                    storage.notify();
                    System.out.println("Producer:"+goods);
                }
                try {
                    Thread.sleep((long) (Math.random()*));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 商品
     */
    static class Goods{
        public String name ;

        @Override
        public String toString() {
            return "-"+name;
        }
    }

}