天天看点

Java 生产者-消费者模式的 3 种实现(1)原生:Object 的 wait() 与 notify 方法+ synchronized(2)队列 + ConditionObject 的 await() 与 signal() +ReentrantLock(3)阻塞队列 LinkedBlockingQueue

文章目录

  • (1)原生:Object 的 wait() 与 notify 方法+ synchronized
  • (2)队列 + ConditionObject 的 await() 与 signal() +ReentrantLock
  • (3)阻塞队列 LinkedBlockingQueue

    "生产者-消费者"模式,需要提供一个缓冲区,生产者与消费者不直接交互。

(1)原生:Object 的 wait() 与 notify 方法+ synchronized

    笔试时写出来的是 Object 类的 wait()、notify() 方法+synchronized 也就是 JVM 内置的监视器锁 实现的,思路是这样的,没有真实的队列做缓冲区,而是 记录产品数量,10为上限,如果数量为0 ,说明消费者要被阻塞挂起,CPU 交给生产者;如果数量为 10,说明生产者要被阻塞挂起,CPU 给消费者。并且生产者生产、消费者消费后,就会改变产品数量,因此每次需要唤醒对方被阻塞的线程。再有要注意的是,判断数量时需要用 while ,避免虚假唤醒的情况。

    必须有 sleep() ,否则一直输出的是 10 的阶乘。

public class BlackSpace
{
    String lock="lock";
    final int MAXSIZE=10;
    int n=0;
    int count=0;
    public static void main(String[] args) {
        BlackSpace blackSpace=new BlackSpace();
        new Thread(blackSpace.new Processor()).start();
        new Thread(blackSpace.new Consumer()).start();
    }

    //生产者
    class Processor implements Runnable
    {
        @Override
        public void run() {

            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock)
                {while (count==10) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                    n++;
                    count++;
                    lock.notifyAll();}
            }
        }
    }

    //消费者
    class Consumer implements Runnable
    {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock)
                {     while (count==0) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                    System.out.println(Calculaten(n));
                    count--;
                    lock.notifyAll();}
            }
        }
    }

    //求阶乘
    public static int Calculaten(int n) {
        if(n==0) return 0;
        int result = 1;
        for (int i = 1; i <= n; i++) {
            result = result * i;
        }
        return result;
    }
}
           

(2)队列 + ConditionObject 的 await() 与 signal() +ReentrantLock

    由阻塞队列充当缓冲区,生产者遇到 队列满 需要把自己挂起,并提醒之前 因队列空 被挂起的 消费者线程;同理,消费者遇到 队列空 需要把自己挂起,并提醒之前 因队列满 被挂起的 生产者线程;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class nMultifyCO
{
  Queue<Integer> queue=new ArrayDeque<>();
    Lock lock=new ReentrantLock();
    Condition notFull=lock.newCondition();
    Condition notEmp=lock.newCondition();
    final int MAXSIZE=10;
    int n=0;
    public static void main(String[] args) {
        nMultifyCO co=new nMultifyCO();
     new Thread(co.new Processor()).start();
        new Thread(co.new Consumer()).start();
    }

//生产者
    class Processor implements Runnable
{
    @Override
    public void run() {

        for(int i=0;i<10;i++)
        {    lock.lock();
        while (queue.size()==10)
        {
            try {
                notFull.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        queue.add(n);
            n++;
            notEmp.signalAll();
        lock.unlock();
    }
    }
}

    //消费者
class Consumer implements Runnable
    {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                lock.lock();
                while (queue.isEmpty())
                {
                    try {
                        notEmp.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Calculaten(queue.poll()));
                notFull.signalAll();
                lock.unlock();
            }
        }
    }

   //求阶乘
    public static int Calculaten(int n) {
        if(n==0) return 0;
        int result = 1;
        for (int i = 1; i <= n; i++) {
            result = result * i;
        }
        return result;
    }
}
           

(3)阻塞队列 LinkedBlockingQueue

    要注意的是,put() 和 take() 方法是阻塞的,有用到 条件队列;而 offer() 和 poll() ,如果队满 或 队空,是返回 false 的,非阻塞的。**不 sleep 也行。**因为是队列实现的 ,保证了 先进先出,肯定是 从 0 到 9 的阶乘。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlackSpace {
    BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);
    int n = 0;

    public static void main(String[] args) {
        BlackSpace blackSpace=new BlackSpace();
        new Thread(blackSpace.new Producer()).start();
        new Thread(blackSpace.new Consumer()).start();
    }
    //生产者
    class Producer implements Runnable {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    blockingQueue.put(n);
                    n++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }

    //消费者
    class Consumer implements Runnable
    {
        @Override
        public void run() {
            for(int i=0;i<10;i++)
            {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println(Calculaten(blockingQueue.take()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }



    //求阶乘
    public static int Calculaten(int n) {
        if(n==0) return 0;
        int result = 1;
        for (int i = 1; i <= n; i++) {
            result = result * i;
        }
        return result;
    }
}