某個線程或子產品的代碼負責生産資料(工廠),而生産出來的資料卻不得不交給另一子產品(消費者)來對其進行處理,在這之間使用了隊列、棧等類似超市的東西來存儲資料(超市),這就抽象除了我們的生産者/消費者模型。
其中,産生資料的子產品,就形象地稱為生産者;而處理資料的子產品,就稱為消費者;生産者和消費者之間的中介就叫做緩沖區。
為什麼要使用生産者-消費者模型
生産者消費者模型通過一個容器解決生産者和消費者的強耦合(強度互相依賴)問題。生産者消費者彼此間不直接通訊,而通過阻塞隊列進行通訊,即生産者生産完資料,不用等待消費者消費資料,直接扔給阻塞隊列,消費者不找生産者要資料,而是從阻塞隊列裡取,阻塞隊列相當于一個緩沖區,平衡生産者和消費者的處理能力。這個阻塞隊列就是用給生産者和消費者解耦的。
生産者-消費者模型的優點
1 解耦:降低生産者和消費之間的依賴關系
如果不使用郵筒(緩沖區)需要把信件交給郵差,但是前提是你得認識快遞員(相當于生産者消費者的強耦合),萬一郵差換人了你還得重新認識一下(相當于消費者變化導緻修改生産者代碼)。而對郵筒來說比較固定,你依賴它的成本比較低(相當于和緩沖區之間的弱耦合)。
2 支援并發
即生産者和消費者是兩個可以獨立的并發主體,互不幹擾的運作,從寄信的例子看,如果沒有郵筒就需要在路口等待郵差過來收(相當于生産者阻塞);又或者郵差挨家挨戶的詢問誰要寄信(相當于消費者輪詢)。不管是那種方法效率都比較低下。
3 支援忙閑不均
如果生産資料的速度時快時慢,緩沖區可以對其進行适當緩沖。當生産的資料太塊時,消費者來不及處理,未處理的資料可以暫時存在緩沖區。等生産者的生産速度慢下來,消費者再慢慢處理掉。
例如寄信的例子,假設郵差一次隻能帶1000封信,萬一某次碰到了中秋節送賀卡,需要郵遞的信封數量超過1000封,這個時候郵筒(緩沖區)就派上用場了,郵差吧來不及帶走的信封暫存在郵筒中,等下次再過來拿。
實作方式
1、使用synchronized(wait()和notify())
public class ProducerConsumer {
public static void main(String[] args) {
Resource resource = new Resource();
Producer p1 = new Producer(resource);
Producer p2 = new Producer(resource);
Producer p3 = new Producer(resource);
Consumer c1 = new Consumer(resource);
p1.start();
p2.start();
p3.start();
c1.start();
}
}
//公共資源類
class Resource {
private int number = 0;
private int size = 10;
/**
* 取資源
*/
public synchronized void remove() {
if (number > 0) {
number--;
System.out.println("消費者" + Thread.currentThread().getName() + ":" + number);
notifyAll();
} else {
try {
wait();
System.out.println("消費者" + Thread.currentThread().getName() + "進入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 添加資源
*/
public synchronized void add() {
if (number < size) {
number++;
System.out.println("生産者" + Thread.currentThread().getName() + ":" + number);
notifyAll();
} else {
try {
wait();
System.out.println("生産者" + Thread.currentThread().getName() + "進入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//生産者
class Producer extends Thread {
private Resource resource;
public Producer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}
}
//消費者
class Consumer extends Thread {
private Resource resource;
public Consumer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
2、使用Lock實作(await()和signal())
public class ProducerConsumer {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition producerCondition = lock.newCondition();
Condition consumerCondition = lock.newCondition();
Resource resource = new Resource(lock,producerCondition,consumerCondition);
Producer p1 = new Producer(resource);
Consumer c1 = new Consumer(resource);
Consumer c2 = new Consumer(resource);
Consumer c3 = new Consumer(resource);
p1.start();;
c1.start();
c2.start();
c3.start();
}
}
class Resource{
private int number = 0;
private int size = 10;
private Lock lock;
private Condition producerCondition;
private Condition consumerCondition;
public Resource(Lock lock, Condition producerCondition, Condition consumerCondition) {
this.lock = lock;
this.producerCondition = producerCondition;
this.consumerCondition = consumerCondition;
}
public void add(){
lock.lock();
try {
if (number < size){
number++;
System.out.println(Thread.currentThread().getName()+":"+number);
//喚醒等待的消費者
consumerCondition.signalAll();
}else {
try {
producerCondition.await();
System.out.println(Thread.currentThread().getName()+"線程進入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
}
//消費者取資源
public void remove(){
lock.lock();
try {
if (number > 0){
number--;
System.out.println(Thread.currentThread().getName()+":"+number);
//喚醒等待的生産者
producerCondition.signalAll();
}else {
try {
consumerCondition.await();
System.out.println(Thread.currentThread().getName()+"線程進入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
}
}
class Producer extends Thread{
private Resource resource;
public Producer(Resource resource){
this.resource = resource;
setName("生産者");
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}
}
class Consumer extends Thread{
private Resource resource;
public Consumer(Resource resource){
this.resource = resource;
setName("消費者");
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
3、阻塞隊列實作
/**
* 使用阻塞隊列實作
*/
public class ProducerConsumer {
public static void main(String[] args) {
Resource resource = new Resource();
Producer p1 = new Producer(resource);
Producer p2 = new Producer(resource);
Consumer c1 = new Consumer(resource);
p1.start();
p2.start();
c1.start();
}
}
class Resource{
BlockingQueue queue = new LinkedBlockingQueue(10);
//添加資源
public void add() {
try {
queue.put(1);
System.out.println("生産者"+Thread.currentThread().getName()+":"+queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消費資源
public void remove(){
try {
queue.take();
System.out.println("消費者"+Thread.currentThread().getName()+":"+queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Producer extends Thread{
private Resource resource;
public Producer(Resource resource){
this.resource = resource;
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}
}
class Consumer extends Thread{
private Resource resource;
public Consumer(Resource resource){
this.resource = resource;
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
參考:
https://www.cnblogs.com/shiqi17/p/9550678.html
https://blog.csdn.net/yu876876/article/details/81776879