天天看點

生産者消費者模型生産者消費者模型

生産者消費者模型

  • 如果沒有生産者消費者模型?

    我們先來看如果沒有生産着消費者模型會怎麼樣, 假如生産者生産機關資料需要耗時

    10t

    的時間;消費者消費資料需要

    1t

    的時間, 那麼如果我們是生産者消費者一對一的話, 消費者消費的節奏嚴重依賴了生産者的節奏. 消費者将有

    9t

    的時間處于等待狀态 . 這個時候就很僵硬了 , 明明消費者在剩餘的

    9t

    時間裡可以去處理别的生産者産出的資料 , 那我們要怎麼來設計來達到這個效果呢?
  • 生産者消費者模型大體結構

    其實我們可以把生産者跟消費者解耦開來, 讓生産者去依賴一個隊列 , 生産者産出的消息放到隊列裡面,消費者再去隊列裡面取 . 這樣我們可以對生産者和消費者的數量比進行拓展, 想要多少個消費者對應多少個生産者都可以; 比如生産者産出相對消費者消費速度慢,我們可以相對減少消費者的數量; 這樣大大平衡了生産者群體和消費者群體的處理能力 .

  • 舉個惡心點的例子

    生産者線程的任務:拉屎; 消費者線程的任務:吃屎

    生産者拉機關屎的時間是1000ms;消費者吃掉機關屎的時間是100ms;

    如果消費者直接依賴生産者, 那麼消費者隻能等生産者把屎拉出來再吃掉,這個時候相當于消費者吃掉機關屎的時間是1100ms ; 如果消費者間接依賴生産者, 比如消費者依賴公共廁所, 生産者也依賴公共廁所 . 生産者把屎拉在廁所裡, 消費者去廁所裡面取. 這個時候我們可以增加生産者的個數. 比如 我們設定 5個生産者同時在拉屎, 然後消費者去廁所裡面吃掉這些屎. 這樣就能讓一個消費者去處理五個生産者産出的資料, 就業是上面所說的平衡處理能力. 也達到了耦合的作用.

  • 生産者消費者模型
    • 隊列的設計
      • 資料結構
    • 生産者跟消費者的設計
    • 編碼
      • MyBlockingQueue
      • MyConsumer
      • MyProducer

隊列的設計

資料結構

public class MyBlockingQueue<E> {
    MyBlockingQueue(int ca) {
        this.ca = ca;
        Entry<E> e = new Entry<>();
        head = e;
        tail = e;
    }
}
           

1.采用單向連結清單來實作隊列,則至少具有出隊入隊兩個public的函數;

出隊(take)針對消費者,入隊(put)針對生産者;

private static class Entry<E> {
      E datas;
      Entry<E> next;
}
private Entry<E> head;
private Entry<E> tail;
public void put(E e) throws InterruptedException;
public E take() throws InterruptedException;
private Entry<E> head;
private Entry<E> tail
           

2.需要有個固定的容量來規定隊列的長度,這裡采用AtomicInteger

當隊列長度等于這個容量 讓生産者停止生産;

同理,當隊列長度等于0,讓消費者停止消費;

private AtomicInteger size = new AtomicInteger(0);

3.消費鎖 takeLock

串行化多個消費者線程在該隊列展現的消費動作

private final Lock takeLock = new ReentrantLock();

4.産出鎖 putLock

串行化多個消費者線程在該隊列展現的産出動作

private final Lock putLock = new ReentrantLock();

5.消費狀态 notEmpty

提醒消費者進行消費

這個狀态在消費者發現隊列已經空了的時候,會執行await方法進入對象的等待池

在生産者生産完一個數資料是, 會執行signal方法,通知進入對象等待池的消費者線程進入對象的鎖池進行搶鎖消費

private final Condition notEmpty = takeLock.newCondition();

6.産出狀态 notFull

提醒生産者進行産出

這個狀态在生産者發現隊列已經滿了的時候,會執行await方法進入對象的等待池

在消費者消費完一個資料時,會執行signal方法,通知進入對象等待池的生産者線程進入對象的鎖池進行搶鎖産出

private final Condition notFull = putLock.newCondition();

生産者跟消費者的設計

生産者消費者本身是個線程, 是以繼承Thread或者實作Runable接口就好了, 然後run方法隻需對隊列(構造函數傳入)進行不斷的put/take 操作就好了

編碼

MyBlockingQueue

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Jiangjiaze
 * @version Id: MyBlockingQueue.java, v 0.1 2017/3/8 0:21 FancyKong Exp $$
 */
public class MyBlockingQueue<E> {
    private final Lock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final Lock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

    private Entry<E> head;
    private Entry<E> tail;
    private AtomicInteger size = new AtomicInteger(0);
    private int ca;

    MyBlockingQueue(int ca) {
        this.ca = ca;
        Entry<E> e = new Entry<>();
        head = e;
        tail = e;
    }
    private static class Entry<E> {
        E datas;
        Entry<E> next;
    }

    public void put(E e) throws InterruptedException {
        putLock.lock();
        try {
            while (size.intValue() == ca) {
                System.out.println("廁所滿了");
                notFull.await();
            }//之是以要用while是因為可能我醒過來之後,廁所有可能又滿了.
            tail.datas = e;
            tail.next = new Entry<>();
            tail = tail.next;
            if(size.getAndIncrement() +1 <= ca){
                notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        //拉完要叫人來吃
        signalConsumers();

    }

    private void signalConsumers() {
        takeLock.lock();
        try {
            notEmpty.signal();
        }finally {
            takeLock.unlock();
        }
    }

    public E take() throws InterruptedException {
        takeLock.lock();
        E toReturn;
        try{
            while(size.intValue()==0){
                System.out.println("都吃完了");
                notEmpty.await();//沒有元素,不能吃
            }
            toReturn = head.datas;
            head = head.next;
            if(size.getAndDecrement() -1 > 0){ //我吃完都還有剩下
                notEmpty.signal();
            }
        }finally {
            takeLock.unlock();
        }
        //吃完之後呢,我得順便通知一下生産者繼續拉
        if(toReturn != null) signalProducer();
        return toReturn;
    }

    private void signalProducer() {
        this.putLock.lock();
        try {
            notFull.signal(); // 不空啦,快來拉
        }finally {
            this.putLock.unlock();
        }
    }
}
           

MyConsumer

import com.fancy.prodcons.Message;

/**
 * @author Jiangjiaze
 * @version Id: MyConsumer.java, v 0.1 2017/3/8 0:57 FancyKong Exp $$
 */
public class MyConsumer extends Thread{
    public static void main(String[] args) {
        //測試的用戶端
        MyBlockingQueue<Message> queue = new MyBlockingQueue<>(10);
        new MyConsumer("消費者1",queue).start();
        new MyProducer("生産者1",queue).start();
        new MyProducer("生産者2",queue).start();
        new MyProducer("生産者3",queue).start();
        /*new MyConsumer("消費者2",queue).start();
        new MyConsumer("消費者3",queue).start();
        new MyConsumer("消費者4",queue).start();
        new MyConsumer("消費者5",queue).start();
        new MyConsumer("消費者6",queue).start();
        new MyConsumer("消費者7",queue).start();*/
    }
    private MyBlockingQueue<Message> queue;
    MyConsumer(String name,MyBlockingQueue<Message> queue){
        setName(name);
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true)
            try {
                sleep(100); //消費需要時間100ms
                Message m  = queue.take();
                System.out.println(getName()+"吃掉了"+m);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }
}
           

MyProducer

import com.fancy.prodcons.Message;
/**
 * @author Jiangjiaze
 * @version Id: Producer.java, v 0.1 2017/3/1 17:32 FancyKong Exp $$
 */
public class MyProducer extends Thread{
    MyBlockingQueue<Message> blockingQueue;

    public MyProducer(String name,MyBlockingQueue<Message> blockingQueue) {
        this.blockingQueue = blockingQueue;
        setName(name);
    }

    @Override
    public void run() {
        while (true){
            try {
                sleep(1000); //生産需要時間1000s
                Message e = new Message((int)(Math.random()*100),"屎");
                blockingQueue.put(e);
                System.out.println(getName()+"拉了"+e);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           

繼續閱讀