天天看點

Java使用wait()、notify()、notifyAll()實作生産者消費者模式

前言:

1. wait()、notify()、notifyAll()三個方法不是随随便便就能用的,這三個方法一定要在同步代碼塊中調用的。

2. 同步代碼塊中的鎖最好是使用臨界資源,也最好使用臨界資源對象的 wait()、notify()、notifyAll()。

3. 如果有多個生産者和多個消費者,那麼盡量不要調用notify()、而要用notifyAll()。因為如果某一生産者使用notify()喚醒的是另一個生産者,其他線程消費者可能調用的是wait()處于同步等待,wait()調用後,是需要有其他線程喚醒才會去競争鎖的,那麼這種情況下,所有的線程就都會在同步等待,因為沒有線程去搶鎖。

4.多個生産者和多個消費者的情況下,為了讓每一個消費者和消費者喲相同的幾率被調用,生産者盡量不要生産滿了再喚醒其他消費者,也就是說生産一部分或者一個就要喚醒其他線程,讓其他線程有生産或者消費的機會。消費者也同樣如此。

5. 若臨界容器夠大,那麼消費者和生産者需要避免頻繁的上下文切換,解決方案是,不要生産或一個就喚醒其他線程,要生産或者消費一定數量再喚醒其他線程。

下面就來實作一下:

首先是生産者:

class Producer extends Thread {

    private String name;

    private LinkedList<Integer> appleBox;

    private Integer boxSize;

    private AtomicInteger appleCurrentNum;

    Producer(String name, LinkedList<Integer> appleBox, Integer boxSize, AtomicInteger appleCurrentNum) {
        this.name = name;
        this.appleBox = appleBox;
        this.boxSize = boxSize;
        this.appleCurrentNum = appleCurrentNum;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (appleBox) {
                if (appleBox.size() < boxSize) {
                    Integer apple = appleCurrentNum.addAndGet(1);
                    appleBox.addFirst(apple);
                    System.out.println("[ " + name + " 生産者 線程:" +this.getName() + " ] 生産了編号為-" + apple + "-号蘋果,目前蘋果有:" + appleBox.size() + "個");
                    try {
                        //模拟生産者生産過程,sleep不會釋放鎖
                        Thread.sleep(new Random().nextInt(10));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //喚醒消費者
                    appleBox.notifyAll();//随機生産和消費,但是不會立即釋放鎖,要等到下面的代碼執行完才釋放鎖
                    //如果加了下面這個,生産者就不會生産到滿,消費者就可能會消費了,更适合實際情況,其他的生産者線程也會有生産的機會
                    try {
                        //生産之後進入立刻阻塞狀态,讓消費者消費
                        appleBox.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        appleBox.notifyAll();//如果發生異常,喚醒其他線程繼續執行
                    }
                } else {
                    try {
                        System.out.println("[ " + name +" 生産者 線程 "+ this.getName() + " ] 正在生産蘋果 ,但是隊列已滿等待消費者消費,目前蘋果有:" + appleBox.size() +"個");
                        //隊列已滿,進入阻塞狀态,等待消費者消費
                        appleBox.wait();//1)随機生産和消費
                    } catch (InterruptedException e) {
                        appleBox.notifyAll();
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
           

然後是消費者:

class Consumer extends Thread {

        private String name;

        private LinkedList<Integer> appleBox;

        private Integer boxSize;

        Consumer(String name, LinkedList<Integer> appleBox, Integer boxSize) {
            this.name = name;
            this.appleBox = appleBox;
            this.boxSize = boxSize;

        }

        @Override
        public void run() {
            while (true) {
                synchronized (appleBox) {
                    //如果隊列是空的,消費者進入阻塞狀态,等待生産者生産并喚醒
                    if (appleBox.isEmpty()) {
                        System.out.println("[ " + name +" 消費者,線程: "+ this.getName() + "] 沒有蘋果可以消費,進入阻塞狀态等待生産者生産,目前蘋果有:" + appleBox.size() + "個");
                        try {
                            //進入阻塞狀态釋放隊列鎖,因為隻有兩個線程,是以生産者一定會擷取到隊列鎖執行
                            appleBox.wait();//1)随機生産和消費
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            //如果發送異常,主動喚醒生産者線程執行
                            appleBox.notifyAll();
                        }
                    } else {
                        //如果隊列不空,就消費産品,并喚醒生産者
                        //注意喚醒生産者,在消費者執行完畢釋放鎖之後,不一定生産者就會獲得鎖,也許消費者會繼續擷取鎖執行
                        //但是如果不喚醒生産者,那麼如果生産者處于阻塞狀态,當隊列為空,消費者也進入阻塞狀态那麼就沒有線程可以擷取鎖繼續執行了
                        appleBox.notifyAll();//随機生産和消費,這裡可能喚醒的是生産者,有可能喚醒的是消費者,但是不會馬上釋放鎖,要等到下面的代碼執行完才釋放
                        try {
                            //模拟消費者消費過程,sleep不會釋放鎖
                            Thread.sleep(new Random().nextInt(10));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Integer apple = appleBox.removeLast();
                        System.out.println("[ " + name +" 消費者,線程:"+ this.getName() + " ] 消費了編号為-" + apple + "-号蘋果,目前蘋果有:" + appleBox.size() + "個");
                        //如果加了下面這個,消費者者就不會消費到完,生産者者就可能會生産了,更适合實際情況,其他的消費者線程也會有消費的機會
                        try {
                            //消費之後立刻進入阻塞狀态,讓生産者生産
                            appleBox.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            appleBox.notifyAll();
                        }
                    }
                }

            }
        }
}
           

測試代碼:

public class conusmerAndProductTest {

    public static void main(String[] args) {
        LinkedList<Integer> appleBox = new LinkedList<>();
        Integer boxSize = 5;
        AtomicInteger initNum = new AtomicInteger(0);
        Producer producer1 = new Producer("P_1",appleBox,boxSize,initNum);
        Producer producer2 = new Producer("P_2",appleBox,boxSize,initNum);
        Producer producer3 = new Producer("P_3",appleBox,boxSize,initNum);
        Consumer consumer1 = new Consumer("C_1",appleBox,boxSize);
        Consumer consumer2 = new Consumer("C_2",appleBox,boxSize);
        Consumer consumer3 = new Consumer("C_3",appleBox,boxSize);

        consumer1.start();
        consumer2.start();
        consumer3.start();
        producer1.start();
        producer2.start();
        producer3.start();

    }

}
           

為什麼要有三個生産者和三個消費者,因為為了不頻繁的産生生産者生産滿和消費者消費完的情況,減少無效上下文切換的開銷。

首先理一理線程的幾種狀态:

建立狀态:new 一個線程,就進入了建立狀态。

就緒狀态:調用某一線程的start()、被其他線程jion()完、目前線程sleep()完、目前線程I/O完、或者阻塞狀态(同步阻塞狀态)搶到鎖。

運作狀态:目前線程搶到鎖和被配置設定CPU,就進入了運作狀态。

阻塞狀态:在目前線程調用wait()、或者被其他線程join()、目前線程sleep()、目前線程在I/O、或者被其他線程notify()、notifyAll()但是沒有搶到鎖。就進入了阻塞狀态。

死亡狀态:線程執行完,或者線程執行發生了異常,線程就死亡了。

其中:

阻塞狀态又分三種:

等待阻塞:目前線程調用wait()、進入了等待阻塞、這個時候不能搶鎖。

同步阻塞:被notify()、或者notifyAll(),但是沒有搶到鎖,或者就緒狀态沒有搶到鎖。

其他阻塞:I/O、或者被jion()、或者目前線程sleep()。

下面我們考慮一下極限情況:

所有線程就緒後:

1. 某一個消費者先搶到鎖,那麼它直接等待,讓其他線程喚醒。如果一開始連續搶到cpu的都是消費者,那麼三個消費者都在等待喚醒,在等待阻塞狀态,并且三個線程都釋放了鎖,不能競争鎖。這時候生産者是同步等待狀态,他們他們都是可以競争鎖的,是以生産者搶到鎖擁有CPU就會生産,生産完notifyAll(),那麼三個消費者又從等待阻塞狀态進入同步阻塞狀态,他們又能競争鎖了。線程是可以在這種正常的循環往複的運作的。

2. 根據上面的代碼,假設一開始有連續五次都是生産者搶到鎖,第六次也是生産者。那麼會不會消費者沒有執行的機會呢? 答案是:不會,因為第一個生産者生産完了之後,是wait()等待阻塞狀态。但是第二個生産完調用的是notifyAll(),那麼第一個生産者馬上又會進入同步阻塞狀态。

總結:

實作生産者消費者模式的注意點就是:

1. 臨界資源要有鎖

2. 要確定每一個線程都有搶鎖的機會,而不是執行完之後沒有線程去喚醒它。

3. 為了確定效率和系統開銷,盡量不要頻繁的切換上下文。