天天看点

用wait、notifyAll实现生产者消费者模式

学习多线程,生产者消费者模式是一个绕不开的话题。所谓生产者消费者模式就是两组线程共享同一内存区域,生产者不停地从中放置数据,而消费者不停地从中取走数据。上面说到的内存区域应该是一个阻塞队列,如果不是,那就会出现线程安全问题了。阻塞队列可由BlockingQueue实现,也可用wait、notifyAll实现,这两个方法继承自Object类。

下面的示例代码中,Product类表示生产者,Consumer类消费者,SyncStack就表示我们自己实现的一个简单的阻塞队列,SyncStack中的push和pop方法分别用于放置数据到队列中和从队列中取数据。先看push方法,进入方法后会先判断队列的容量是否达到最大值,若是达到最大值,那么当前线程就会调用wait方法,释放锁,进入等待状态。另外,注意上面的判断没有用if语句,而是用while,要是用if的话,那么只会判断一次。要是队列容量已是最大,即便后来队列中的数据已被消费者线程消费,生产者线程也不会再把数据放进队列了。当while中的判别式为false时,程序继续执行,调用notifyAll方法其他在等待的所有线程,这里一定要用notifyAll,而非notify。因为后者只会唤醒等待在该资源上的任意一个线程,要是唤醒的还是生产者线程,那就线程死锁了。队列中数据是满的,没有消费者来消费,而生产者又在一直等消费者来消费后,队列中的数据数量少于其最大容量,这样自己才能往队列中放数据。同样地,pop方法也是类似的道理。

不足之处,欢迎批评指正。

package com.unbrella;

import java.util.concurrent.TimeUnit;

/**
 * @author jiangYue
 */
public class ProcederConsumer {
    public static void main(String[] args) {
        SyncStack ss = new SyncStack();
        Producer producer = new Producer(ss);
        Consumer consumer = new Consumer(ss);
        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
    }

}
class Product {
    int id;

    Product(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                '}';
    }
}
class SyncStack {
    int index;
    Product[] arrWT = new Product[6];

    public synchronized void push(Product product) {
        while (index == arrWT.length) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.notifyAll();
        arrWT[index] = product;
        index++;
    }

    public synchronized Product pop() {
        while (index == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.notifyAll();
        index--;
        return arrWT[index];
    }
}
class Producer implements Runnable {
    SyncStack ss = null;

    public Producer(SyncStack ss) {
        this.ss = ss;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            Product product = new Product(i);
            ss.push(product);
            System.out.println("生产了产品 " + product);
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(1));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
class Consumer implements Runnable {
    SyncStack ss = null;

    public Consumer(SyncStack ss) {
        this.ss = ss;
    }
    @Override
    public void run() {
        for (int i = 0; i < 60; i++) {
            Product product = ss.pop();
            System.out.println("消费了产品 " + product);
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(1));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}