通常情況下,有5種方式來實作
- synchronized + wait() + notify() 方式
- 可重入鎖ReentrantLock (配合Condition)
- BlockingQueue 阻塞隊列方式
- 信号量Semaphore 方式
- 管道輸入輸出流PipedInputStream和PipedOutputStream 方式
下面代碼是 采用 第二種方式
package interview.producer_consumer;
import lombok.SneakyThrows;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
public static void main(String[] args) {
MyDeque myDeque = new MyDeque(10);
Producer_ producer_ = new Producer_(myDeque);
Consumer_ consumer_ = new Consumer_(myDeque);
Producer_ producer_2 = new Producer_(myDeque);
Consumer_ consumer_2 = new Consumer_(myDeque);
new Thread( producer_).start();
new Thread( consumer_).start();
new Thread( producer_2).start();
new Thread( consumer_2).start();
}
}
class MyDeque {
int max_size;
LinkedList<Integer> linkedList = new LinkedList<Integer>();
ReentrantLock reentrantLock = new ReentrantLock();
Condition producer_condition = reentrantLock.newCondition();//生産者條件:消息隊列滿了阻塞
Condition consumer_condition = reentrantLock.newCondition();//消費者條件:消息隊列為空阻塞
volatile int data = 1;
public MyDeque(int max_size){
this.max_size = max_size;
}
public void put(){
reentrantLock.lock();
try {
if (linkedList.size() >= this.max_size){
System.out.println(" 隊列已滿" + Thread.currentThread().getName() + "無法添加資料");
producer_condition.await();
}
System.out.print( Thread.currentThread().getName() +"生産者添加資料" + data + "\t");
linkedList.add( this.data);
data++;
System.out.println("隊列資料" + linkedList);
consumer_condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
}
public void take(){
reentrantLock.lock();
try {
if (linkedList.size() >0 ){
Integer integer = linkedList.removeFirst();
System.out.println(Thread.currentThread().getName() + "消費者 消費資料" + integer);
}else {
System.out.println("隊列中無資料, 消費者等待");
producer_condition.signalAll();
consumer_condition.await();
}
}catch (Exception e){
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
}
}
class Producer_ implements Runnable{
MyDeque myDeque;
public Producer_(MyDeque myDeque ){
this.myDeque = myDeque;
}
@SneakyThrows
@Override
public void run() {
while (true){
myDeque.put();
Thread.sleep(1000);
}
}
}
class Consumer_ implements Runnable{
MyDeque myDeque;
public Consumer_(MyDeque myDeque ){
this.myDeque = myDeque;
}
@SneakyThrows
@Override
public void run() {
while (true){
myDeque.take();
Thread.sleep(1500);
}
}
}