天天看点

Java生产者、消费者模式的几种实现方式方式一:BlockingQueue方式(最优方式)方式二:Synchronized+wait/notifyAll方式方式三:ReentrantLock+Condition方式几种方式对比参考

文章目录

  • 方式一:BlockingQueue方式(最优方式)
  • 方式二:Synchronized+wait/notifyAll方式
  • 方式三:ReentrantLock+Condition方式
  • 几种方式对比
  • 参考

方式一:BlockingQueue方式(最优方式)

private static final String THREAD_PRODUCE = "生产者";
private static final String THREAD_CONSUME = "消费者";
//阻塞队列中最多存5个
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5);

public static void main(String[] args) {
    new Thread(new Task(THREAD_PRODUCE)).start();
    new Thread(new Task(THREAD_CONSUME)).start();
}

static class Task implements Runnable {
    private String threadName;

    Task(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                if (threadName.equals(THREAD_PRODUCE)) {
                    //生产者
                    int produceNum = new Random().nextInt(1000);
                    System.out.println("++" + Thread.currentThread().getName() + "生产元素:" + produceNum);
                    queue.put(produceNum);
                } else if (threadName.equals(THREAD_CONSUME)) {
                    //消费者
                    int consumeNum = queue.take();
                    System.out.println("--" + Thread.currentThread().getName() + "消费元素:" + consumeNum);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
                return;
            }
        }
    }
}
           

执行结果:

++Thread-0生产元素:402
--Thread-1消费元素:402
++Thread-0生产元素:609
--Thread-1消费元素:609
++Thread-0生产元素:192
--Thread-1消费元素:192
++Thread-0生产元素:377
--Thread-1消费元素:377
++Thread-0生产元素:53
--Thread-1消费元素:53
......
           

方式二:Synchronized+wait/notifyAll方式

private static final String THREAD_PRODUCE = "生产者";
private static final String THREAD_CONSUME = "消费者";
//公共资源
private static PublicResource resource = new PublicResource();

public static void main(String[] args) {
    new Thread(new Task(THREAD_PRODUCE)).start();
    new Thread(new Task(THREAD_CONSUME)).start();
}

static class Task implements Runnable {
    private String threadName;

    Task(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (THREAD_PRODUCE.equals(threadName)) {
                    //生产者
                    resource.increase();
                } else if (THREAD_CONSUME.equals(threadName)) {
                    //消费者
                    resource.decrease();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public static class PublicResource {
    private int number = 0;//当前总共生产个数
    private final int size = 5;//最大产品个数
    private final Object object = new Object();

    //生产者生产产品
    public void increase() throws InterruptedException {
        synchronized (object) {
            //如果产品个数大于最大个数 等待
            while (number >= size) {
                object.wait();
            }
            Thread.sleep(500);
            number++;
            System.out.println("++" + Thread.currentThread().getName() + ":生产了一个,总共有" + number);
            object.notifyAll();
        }
    }

    //消费者消费产品
    public void decrease() throws InterruptedException {
        synchronized (object) {
            //如果产品个数小于等于0 等待
            while (number <= 0) {
                object.wait();
            }
            Thread.sleep(1000);
            number--;
            System.out.println("--" + Thread.currentThread().getName() + ":消费了一个,总共有" + number);
            object.notifyAll();
        }
    }
}
           

执行结果:

++Thread-0:生产了一个,总共有1
++Thread-0:生产了一个,总共有2
++Thread-0:生产了一个,总共有3
++Thread-0:生产了一个,总共有4
++Thread-0:生产了一个,总共有5
--Thread-1:消费了一个,总共有4
--Thread-1:消费了一个,总共有3
--Thread-1:消费了一个,总共有2
--Thread-1:消费了一个,总共有1
--Thread-1:消费了一个,总共有0
++Thread-0:生产了一个,总共有1
++Thread-0:生产了一个,总共有2
++Thread-0:生产了一个,总共有3
......
           

方式三:ReentrantLock+Condition方式

private static final String THREAD_PRODUCE = "生产者";
private static final String THREAD_CONSUME = "消费者";
private static TaskResource resource = new TaskResource();

public static void main(String[] args) {
    for (int i = 0; i < 2; i++) {
        new Thread(new TaskRunnable(THREAD_PRODUCE + i)).start();
    }
    for (int i = 0; i < 3; i++) {
        new Thread(new TaskRunnable(THREAD_CONSUME + i)).start();
    }

}

static class TaskRunnable implements Runnable {
    private String threadName;

    TaskRunnable(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (threadName.startsWith(THREAD_PRODUCE)) {
                //生产者
                resource.produce();
            } else if (threadName.startsWith(THREAD_CONSUME)) {
                //消费者
                resource.consume();
            }
        }
    }
}

static class TaskResource {
    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();
    private final int maxSize = 10;
    AtomicInteger resourceNum = new AtomicInteger(0);

    public void produce() {
        try {
            reentrantLock.lock();
            while (resourceNum.get() == maxSize) {
                condition.await();
            }
            resourceNum.getAndIncrement();
            System.out.println("++" + Thread.currentThread().getName() + "生产一个,当前总数为:" + resourceNum.get());
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }

    public void consume() {
        try {
            reentrantLock.lock();
            while (resourceNum.get() <= 0) {
                condition.await();
            }
            resourceNum.getAndDecrement();
            System.out.println("--" + Thread.currentThread().getName() + "消费一个,当前总数为:" + resourceNum.get());
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }
}
           

执行结果:

++Thread-1生产一个,当前总数为:1
--Thread-4消费一个,当前总数为:0
++Thread-0生产一个,当前总数为:1
--Thread-3消费一个,当前总数为:0
++Thread-1生产一个,当前总数为:1
--Thread-4消费一个,当前总数为:0
++Thread-0生产一个,当前总数为:1
--Thread-2消费一个,当前总数为:0
++Thread-1生产一个,当前总数为:1
......
           

几种方式对比

  • BlockingQueue方式是最优实现方式,使用ArrayBlockingQueue或LinkedBlockingQueue都可以实现,内部是通过两个ReentrantLock+Condition实现的生产者线程之间的同步(进队)、消费者线程之间的同步(出队);
  • 方式二、方式三效率相对于方式一来说,效率会差一些,因为当操作数据的时候,同一时间只能有一个生产者或消费者线程去操作他,而方式一中的阻塞队列方式在条件允许的条件下是允许多个生产者和消费者线程同时去操作数据的。

参考

【1】https://monkeysayhi.github.io/2017/10/08/Java实现生产者-消费者模型/

【2】https://www.cnblogs.com/xrq730/p/4855663.html

【3】https://juejin.im/entry/596343686fb9a06bbd6f888c

【4】https://zhuanlan.zhihu.com/p/20300609

继续阅读