天天看點

Java 生産者-消費者模式的 3 種實作(1)原生:Object 的 wait() 與 notify 方法+ synchronized(2)隊列 + ConditionObject 的 await() 與 signal() +ReentrantLock(3)阻塞隊列 LinkedBlockingQueue

文章目錄

  • (1)原生:Object 的 wait() 與 notify 方法+ synchronized
  • (2)隊列 + ConditionObject 的 await() 與 signal() +ReentrantLock
  • (3)阻塞隊列 LinkedBlockingQueue

    "生産者-消費者"模式,需要提供一個緩沖區,生産者與消費者不直接互動。

(1)原生:Object 的 wait() 與 notify 方法+ synchronized

    筆試時寫出來的是 Object 類的 wait()、notify() 方法+synchronized 也就是 JVM 内置的螢幕鎖 實作的,思路是這樣的,沒有真實的隊列做緩沖區,而是 記錄産品數量,10為上限,如果數量為0 ,說明消費者要被阻塞挂起,CPU 交給生産者;如果數量為 10,說明生産者要被阻塞挂起,CPU 給消費者。并且生産者生産、消費者消費後,就會改變産品數量,是以每次需要喚醒對方被阻塞的線程。再有要注意的是,判斷數量時需要用 while ,避免虛假喚醒的情況。

    必須有 sleep() ,否則一直輸出的是 10 的階乘。

public class BlackSpace
{
    String lock="lock";
    final int MAXSIZE=10;
    int n=0;
    int count=0;
    public static void main(String[] args) {
        BlackSpace blackSpace=new BlackSpace();
        new Thread(blackSpace.new Processor()).start();
        new Thread(blackSpace.new Consumer()).start();
    }

    //生産者
    class Processor implements Runnable
    {
        @Override
        public void run() {

            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock)
                {while (count==10) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                    n++;
                    count++;
                    lock.notifyAll();}
            }
        }
    }

    //消費者
    class Consumer implements Runnable
    {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock)
                {     while (count==0) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                    System.out.println(Calculaten(n));
                    count--;
                    lock.notifyAll();}
            }
        }
    }

    //求階乘
    public static int Calculaten(int n) {
        if(n==0) return 0;
        int result = 1;
        for (int i = 1; i <= n; i++) {
            result = result * i;
        }
        return result;
    }
}
           

(2)隊列 + ConditionObject 的 await() 與 signal() +ReentrantLock

    由阻塞隊列充當緩沖區,生産者遇到 隊列滿 需要把自己挂起,并提醒之前 因隊列空 被挂起的 消費者線程;同理,消費者遇到 隊列空 需要把自己挂起,并提醒之前 因隊列滿 被挂起的 生産者線程;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class nMultifyCO
{
  Queue<Integer> queue=new ArrayDeque<>();
    Lock lock=new ReentrantLock();
    Condition notFull=lock.newCondition();
    Condition notEmp=lock.newCondition();
    final int MAXSIZE=10;
    int n=0;
    public static void main(String[] args) {
        nMultifyCO co=new nMultifyCO();
     new Thread(co.new Processor()).start();
        new Thread(co.new Consumer()).start();
    }

//生産者
    class Processor implements Runnable
{
    @Override
    public void run() {

        for(int i=0;i<10;i++)
        {    lock.lock();
        while (queue.size()==10)
        {
            try {
                notFull.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        queue.add(n);
            n++;
            notEmp.signalAll();
        lock.unlock();
    }
    }
}

    //消費者
class Consumer implements Runnable
    {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                lock.lock();
                while (queue.isEmpty())
                {
                    try {
                        notEmp.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Calculaten(queue.poll()));
                notFull.signalAll();
                lock.unlock();
            }
        }
    }

   //求階乘
    public static int Calculaten(int n) {
        if(n==0) return 0;
        int result = 1;
        for (int i = 1; i <= n; i++) {
            result = result * i;
        }
        return result;
    }
}
           

(3)阻塞隊列 LinkedBlockingQueue

    要注意的是,put() 和 take() 方法是阻塞的,有用到 條件隊列;而 offer() 和 poll() ,如果隊滿 或 隊空,是傳回 false 的,非阻塞的。**不 sleep 也行。**因為是隊列實作的 ,保證了 先進先出,肯定是 從 0 到 9 的階乘。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlackSpace {
    BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);
    int n = 0;

    public static void main(String[] args) {
        BlackSpace blackSpace=new BlackSpace();
        new Thread(blackSpace.new Producer()).start();
        new Thread(blackSpace.new Consumer()).start();
    }
    //生産者
    class Producer implements Runnable {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    blockingQueue.put(n);
                    n++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }

    //消費者
    class Consumer implements Runnable
    {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println(Calculaten(blockingQueue.take()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }



    //求階乘
    public static int Calculaten(int n) {
        if(n==0) return 0;
        int result = 1;
        for (int i = 1; i <= n; i++) {
            result = result * i;
        }
        return result;
    }
}