生産者消費者模型
-
如果沒有生産者消費者模型?
我們先來看如果沒有生産着消費者模型會怎麼樣, 假如生産者生産機關資料需要耗時
的時間;消費者消費資料需要10t
的時間, 那麼如果我們是生産者消費者一對一的話, 消費者消費的節奏嚴重依賴了生産者的節奏. 消費者将有1t
的時間處于等待狀态 . 這個時候就很僵硬了 , 明明消費者在剩餘的9t
時間裡可以去處理别的生産者産出的資料 , 那我們要怎麼來設計來達到這個效果呢?9t
-
生産者消費者模型大體結構
其實我們可以把生産者跟消費者解耦開來, 讓生産者去依賴一個隊列 , 生産者産出的消息放到隊列裡面,消費者再去隊列裡面取 . 這樣我們可以對生産者和消費者的數量比進行拓展, 想要多少個消費者對應多少個生産者都可以; 比如生産者産出相對消費者消費速度慢,我們可以相對減少消費者的數量; 這樣大大平衡了生産者群體和消費者群體的處理能力 .
-
舉個惡心點的例子
生産者線程的任務:拉屎; 消費者線程的任務:吃屎
生産者拉機關屎的時間是1000ms;消費者吃掉機關屎的時間是100ms;
如果消費者直接依賴生産者, 那麼消費者隻能等生産者把屎拉出來再吃掉,這個時候相當于消費者吃掉機關屎的時間是1100ms ; 如果消費者間接依賴生産者, 比如消費者依賴公共廁所, 生産者也依賴公共廁所 . 生産者把屎拉在廁所裡, 消費者去廁所裡面取. 這個時候我們可以增加生産者的個數. 比如 我們設定 5個生産者同時在拉屎, 然後消費者去廁所裡面吃掉這些屎. 這樣就能讓一個消費者去處理五個生産者産出的資料, 就業是上面所說的平衡處理能力. 也達到了耦合的作用.
- 生産者消費者模型
- 隊列的設計
- 資料結構
- 生産者跟消費者的設計
- 編碼
- MyBlockingQueue
- MyConsumer
- MyProducer
- 隊列的設計
隊列的設計
資料結構
public class MyBlockingQueue<E> {
MyBlockingQueue(int ca) {
this.ca = ca;
Entry<E> e = new Entry<>();
head = e;
tail = e;
}
}
1.采用單向連結清單來實作隊列,則至少具有出隊入隊兩個public的函數;
出隊(take)針對消費者,入隊(put)針對生産者;
private static class Entry<E> {
E datas;
Entry<E> next;
}
private Entry<E> head;
private Entry<E> tail;
public void put(E e) throws InterruptedException;
public E take() throws InterruptedException;
private Entry<E> head;
private Entry<E> tail
2.需要有個固定的容量來規定隊列的長度,這裡采用AtomicInteger
當隊列長度等于這個容量 讓生産者停止生産;
同理,當隊列長度等于0,讓消費者停止消費;
private AtomicInteger size = new AtomicInteger(0);
3.消費鎖 takeLock
串行化多個消費者線程在該隊列展現的消費動作
private final Lock takeLock = new ReentrantLock();
4.産出鎖 putLock
串行化多個消費者線程在該隊列展現的産出動作
private final Lock putLock = new ReentrantLock();
5.消費狀态 notEmpty
提醒消費者進行消費
這個狀态在消費者發現隊列已經空了的時候,會執行await方法進入對象的等待池
在生産者生産完一個數資料是, 會執行signal方法,通知進入對象等待池的消費者線程進入對象的鎖池進行搶鎖消費
private final Condition notEmpty = takeLock.newCondition();
6.産出狀态 notFull
提醒生産者進行産出
這個狀态在生産者發現隊列已經滿了的時候,會執行await方法進入對象的等待池
在消費者消費完一個資料時,會執行signal方法,通知進入對象等待池的生産者線程進入對象的鎖池進行搶鎖産出
private final Condition notFull = putLock.newCondition();
生産者跟消費者的設計
生産者消費者本身是個線程, 是以繼承Thread或者實作Runable接口就好了, 然後run方法隻需對隊列(構造函數傳入)進行不斷的put/take 操作就好了
編碼
MyBlockingQueue
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Jiangjiaze
* @version Id: MyBlockingQueue.java, v 0.1 2017/3/8 0:21 FancyKong Exp $$
*/
public class MyBlockingQueue<E> {
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private Entry<E> head;
private Entry<E> tail;
private AtomicInteger size = new AtomicInteger(0);
private int ca;
MyBlockingQueue(int ca) {
this.ca = ca;
Entry<E> e = new Entry<>();
head = e;
tail = e;
}
private static class Entry<E> {
E datas;
Entry<E> next;
}
public void put(E e) throws InterruptedException {
putLock.lock();
try {
while (size.intValue() == ca) {
System.out.println("廁所滿了");
notFull.await();
}//之是以要用while是因為可能我醒過來之後,廁所有可能又滿了.
tail.datas = e;
tail.next = new Entry<>();
tail = tail.next;
if(size.getAndIncrement() +1 <= ca){
notFull.signal();
}
} finally {
putLock.unlock();
}
//拉完要叫人來吃
signalConsumers();
}
private void signalConsumers() {
takeLock.lock();
try {
notEmpty.signal();
}finally {
takeLock.unlock();
}
}
public E take() throws InterruptedException {
takeLock.lock();
E toReturn;
try{
while(size.intValue()==0){
System.out.println("都吃完了");
notEmpty.await();//沒有元素,不能吃
}
toReturn = head.datas;
head = head.next;
if(size.getAndDecrement() -1 > 0){ //我吃完都還有剩下
notEmpty.signal();
}
}finally {
takeLock.unlock();
}
//吃完之後呢,我得順便通知一下生産者繼續拉
if(toReturn != null) signalProducer();
return toReturn;
}
private void signalProducer() {
this.putLock.lock();
try {
notFull.signal(); // 不空啦,快來拉
}finally {
this.putLock.unlock();
}
}
}
MyConsumer
import com.fancy.prodcons.Message;
/**
* @author Jiangjiaze
* @version Id: MyConsumer.java, v 0.1 2017/3/8 0:57 FancyKong Exp $$
*/
public class MyConsumer extends Thread{
public static void main(String[] args) {
//測試的用戶端
MyBlockingQueue<Message> queue = new MyBlockingQueue<>(10);
new MyConsumer("消費者1",queue).start();
new MyProducer("生産者1",queue).start();
new MyProducer("生産者2",queue).start();
new MyProducer("生産者3",queue).start();
/*new MyConsumer("消費者2",queue).start();
new MyConsumer("消費者3",queue).start();
new MyConsumer("消費者4",queue).start();
new MyConsumer("消費者5",queue).start();
new MyConsumer("消費者6",queue).start();
new MyConsumer("消費者7",queue).start();*/
}
private MyBlockingQueue<Message> queue;
MyConsumer(String name,MyBlockingQueue<Message> queue){
setName(name);
this.queue = queue;
}
@Override
public void run() {
while (true)
try {
sleep(100); //消費需要時間100ms
Message m = queue.take();
System.out.println(getName()+"吃掉了"+m);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
MyProducer
import com.fancy.prodcons.Message;
/**
* @author Jiangjiaze
* @version Id: Producer.java, v 0.1 2017/3/1 17:32 FancyKong Exp $$
*/
public class MyProducer extends Thread{
MyBlockingQueue<Message> blockingQueue;
public MyProducer(String name,MyBlockingQueue<Message> blockingQueue) {
this.blockingQueue = blockingQueue;
setName(name);
}
@Override
public void run() {
while (true){
try {
sleep(1000); //生産需要時間1000s
Message e = new Message((int)(Math.random()*100),"屎");
blockingQueue.put(e);
System.out.println(getName()+"拉了"+e);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}