package me.grass.demo.concuronte;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
/**
* 阻塞隊列 java.util.concurrent.LinkedBlockingQueue 使用
* 阻塞隊列提供兩組入隊、出隊方法,分别為:
* 無阻塞:add();poll();
* 阻塞:put();take();
*/
public class LinkedBlockingQueueDemo {
/**
* 生産慢,消費快,導緻隊裡阻塞
* @param args
* @throws InterruptedException
* @author xxj
*/
public static void main(String[] args) throws InterruptedException {
/*測試邏輯:
* 1.阻塞隊列固定大小50,100個線程寫入将産生寫入阻塞
* 2.消費者比生産者快,消費時會産生讀取阻塞
* */
Date before=new Date();
int pTotalThread =100; //最大線程數(生産者)
int pActivities=5; //最大線程數(生産者)
int cTotalThread =100; //活動線程數(消費者)
int cActivities=5; //活動線程數(消費者)
_lCountDownLatch = new CountDownLatch(pTotalThread+cTotalThread);
startProducer(pActivities,pTotalThread);
startConsumer(cActivities,cTotalThread);
_lCountDownLatch.await();//等待所有線程完成
Date after = new Date();
System.out.println("隊列為空:"+_blockingQueue.isEmpty());
System.out.println("耗時:"+((after.getTime()-before.getTime())/1000));
System.out.println("同步隊列:"+_lCountDownLatch.getCount());
}
static java.util.concurrent.CountDownLatch _lCountDownLatch;
/**
* 阻塞隊列,隊列固定大小50,100個線程寫入将産生寫入阻塞
*/
static java.util.concurrent.LinkedBlockingQueue<Integer> _blockingQueue =
new java.util.concurrent.LinkedBlockingQueue<Integer>(50);
static void startProducer(int active,int totalThread) throws InterruptedException{
java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);
int size =1024*1024*10;//産生 3 M 資料
Thread thread ;
for(int i=0;i<totalThread;i++){
thread = new Thread(new producer(i,size));
pool.execute(thread);
}
}
static void startConsumer(int active,int totalThread) throws InterruptedException{
java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);
Thread thread ;
//啟動x個消費者
for(int i=0;i<totalThread;i++){
thread = new Thread(new consumer());
pool.execute(thread);
}
}
/**
* 生産者
* @author xxj
*
*/
static class producer implements Runnable{
public producer(int key,int size){
_size=size;
_key=key;
}
int _key;
int _size;
public void run() {
try {
//阻塞隊列,使用 put 加入隊列,如果隊列滿了将阻塞
LinkedBlockingQueueDemo._blockingQueue.put(_key);//生産
System.out.println("已建立:"+_key);
Thread.sleep(1000);//讓生産慢點兒
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LinkedBlockingQueueDemo._lCountDownLatch.countDown();//線程同步遞減
}
}
/**
* 消費者
* @author xxj
*
*/
static class consumer implements Runnable{
public consumer(){
}
public void run() {
//循環消費,直到隊列内容為空
try {
//阻塞隊列使用 take() 産生阻塞 ,并發隊列使用 poll()
Integer nInteger = LinkedBlockingQueueDemo._blockingQueue.take();//消費
System.err.println("消費:"+nInteger);
Thread.sleep(100);//每次消費等一會兒
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LinkedBlockingQueueDemo._lCountDownLatch.countDown();//線程同步遞減
}
}
}