天天看點

java-并發集合-阻塞隊列 LinkedBlockingQueue 示範

package me.grass.demo.concuronte;


import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;


/**
 * 阻塞隊列 java.util.concurrent.LinkedBlockingQueue 使用
 * 阻塞隊列提供兩組入隊、出隊方法,分别為:
 * 無阻塞:add();poll();
 * 阻塞:put();take();
 */
public class LinkedBlockingQueueDemo {
  /**
   * 生産慢,消費快,導緻隊裡阻塞
   * @param args
   * @throws InterruptedException
   *  @author xxj  
   */
  public static void main(String[] args) throws InterruptedException {
    /*測試邏輯:
     * 1.阻塞隊列固定大小50,100個線程寫入将産生寫入阻塞
     * 2.消費者比生産者快,消費時會産生讀取阻塞
     * */
    Date before=new Date();
    int pTotalThread =100; //最大線程數(生産者)
    int pActivities=5; //最大線程數(生産者)
    int cTotalThread =100; //活動線程數(消費者)
    int cActivities=5; //活動線程數(消費者)
    _lCountDownLatch = new CountDownLatch(pTotalThread+cTotalThread);


    startProducer(pActivities,pTotalThread);
    startConsumer(cActivities,cTotalThread);
    
    _lCountDownLatch.await();//等待所有線程完成
    Date after = new Date();


    System.out.println("隊列為空:"+_blockingQueue.isEmpty());
    System.out.println("耗時:"+((after.getTime()-before.getTime())/1000));
    System.out.println("同步隊列:"+_lCountDownLatch.getCount());
  }
  static java.util.concurrent.CountDownLatch _lCountDownLatch;
  /**
   * 阻塞隊列,隊列固定大小50,100個線程寫入将産生寫入阻塞
   */
  static java.util.concurrent.LinkedBlockingQueue<Integer> _blockingQueue = 
      new java.util.concurrent.LinkedBlockingQueue<Integer>(50);
      
  static void startProducer(int active,int totalThread) throws InterruptedException{
    java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);
    int size =1024*1024*10;//産生 3 M 資料
    Thread thread ;
    for(int i=0;i<totalThread;i++){
      thread = new Thread(new producer(i,size));
      pool.execute(thread);
    }
  }
  static void startConsumer(int active,int totalThread) throws InterruptedException{
    java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);    
    Thread thread ;
    //啟動x個消費者
    for(int i=0;i<totalThread;i++){
      thread = new Thread(new consumer());
      pool.execute(thread);
    }
  }
  /**
   * 生産者
   * @author xxj 
   *
   */
  static class producer implements Runnable{
    public producer(int key,int size){
      _size=size;
      _key=key;
    }
    int _key;
    int _size;
    public void run() {
      try {
        //阻塞隊列,使用 put 加入隊列,如果隊列滿了将阻塞
        LinkedBlockingQueueDemo._blockingQueue.put(_key);//生産
        System.out.println("已建立:"+_key);
        Thread.sleep(1000);//讓生産慢點兒
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      LinkedBlockingQueueDemo._lCountDownLatch.countDown();//線程同步遞減
    }
  }
  /**
   * 消費者
   * @author xxj  
   *
   */
  static class consumer implements Runnable{
    public consumer(){
    }
    public void run() {
      //循環消費,直到隊列内容為空
      try {
        //阻塞隊列使用 take() 産生阻塞 ,并發隊列使用 poll()
        Integer nInteger = LinkedBlockingQueueDemo._blockingQueue.take();//消費         
        System.err.println("消費:"+nInteger);
        Thread.sleep(100);//每次消費等一會兒
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      LinkedBlockingQueueDemo._lCountDownLatch.countDown();//線程同步遞減
    }
  }
}