天天看點

用阻塞隊列LinkedBlockingQueue實作生産者消費者先進先出

LinkedBlockingQueue是一個基于已連結節點的、範圍任意的blocking queue的實作。 由于LinkedBlockingQueue實作是線程安全的,實作了先進先出等特性,是作為生産者消費者的首選,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。

    此隊列按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。 

    新元素插入到隊列的尾部,并且隊列檢索操作會獲得位于隊列頭部的元素。連結隊列的吞吐量通常要高于基于數組的隊列, 

    但是在大多數并發應用程式中,其可預知的性能要低。 

    可選的容量範圍構造方法參數作為防止隊列過度擴充的一種方法。 

    如果未指定容量,則它等于 Integer.MAX_VALUE。除非插入節點會使隊列超出容量,否則每次插入後會動态地建立連結節點。

    1:如果未指定容量,預設容量為Integer.MAX_VALUE ,容量範圍可以在構造方法參數中指定作為防止隊列過度擴充。

    2:此對象是 線程阻塞-安全的 

   3:不接受 null 元素 

    4:它實作了BlockingQueue接口。

    5:實作了 Collection 和 Iterator 接口的所有可選 方法。

    6:在JDK5/6中,LinkedBlockingQueue和ArrayBlocingQueue等對象的poll(long timeout, TimeUnit unit)存在記憶體洩露Leak的對象AbstractQueuedSynchronizer.Node,據稱JDK5會在Update12裡Fix,JDK6會在Update2裡Fix

示例代碼如下:

package com.jh.sms.test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TestBlockingQueue {
	static BlockingQueue <Hamburger> queue=new LinkedBlockingQueue<Hamburger>(10);	  
    public static void main(String[] args) throws InterruptedException {  
        Producer t1 = new Producer();  
        Consumer t2 = new Consumer();  
        t1.start();  
        t2.start();   
        System.out.println("Thread.sleep(1000)");  
        Thread.sleep(1000);  
        t2.interrupt();  
    }  
}  
class Hamburger{  
    int id;  
    public Hamburger(int id) {  
        this.id=id;  
    }  
    @Override  
    public String toString() {  
        return "Hamburger: "+id;  
    }  
}  
class Producer extends Thread{  
    @Override  
    public void run() {  
        int i=0;  
        while(i<10){  
        	Hamburger e = new Hamburger(i);  
            try {  
                System.out.println("Produce Hamburger: "+i);  
                TestBlockingQueue.queue.put(e);  
            } catch (InterruptedException e1) {  
                System.out.println("Hamburger so many, it was closed.");  
                return;  
            }  
            i++;  
        }  
    }  
} 
class Consumer extends Thread{  
    @Override  
    public void run() {  
        while(true){  
            try {  
                System.out.println("Eat Hamburger: "+TestBlockingQueue.queue.take());   
            } catch (InterruptedException e1) {  
                System.out.println("Hamburger so less, It was stopped.");  
                return;  
            }  
        }    
    }  
}  
           

由于阻塞隊列LinkedBlockingQueue,FIFO,使用它的put(),take()會判斷目前隊列是有值,即等待生産再消費,即便是兩個線程并行執行,很簡單友善的解決了生産者消費者問題,但是在這裡需要注意的是在兩個run()方法中,列印目前生産或消費Hamburger 的時候,最好把put()和take()方法放在相應的列印語句中一起執行,否則會發生先消費後生産的後果。 因為列印語句和方法的執行時兩段代碼,由于雙線程同時執行,無法保證執行的相應代碼塊的順序性!!由于最後互相等待會造成死鎖,是以在主線程睡眠1秒後打斷消費者,讓它别等了,抛異常後return結束消費線程,最後整個main方法調用結束。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是Queue的一個安全實作.Queue中元素按FIFO原則進行排序.采用CAS操作,來保證元素的一緻性。

LinkedBlockingQueue是一個線程安全的阻塞隊列,它實作了BlockingQueue接口,BlockingQueue接口繼承自java.util.Queue接口,并在這個接口的基礎上增加了take和put方法,這兩個方法正是隊列操作的阻塞版本。

package com.jh.sms.test;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentLinkedQueueTest {
	private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
    private static int count = 2; // 線程個數
    //CountDownLatch,一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
    private static CountDownLatch latch = new CountDownLatch(count);

    public static void main(String[] args) throws InterruptedException {
        long timeStart = System.currentTimeMillis();
        ExecutorService es = Executors.newFixedThreadPool(4);
        ConcurrentLinkedQueueTest.offer();
        for (int i = 0; i < count; i++) {
            es.submit(new Poll());
        }
        latch.await(); //使得主線程(main)阻塞直到latch.countDown()為零才繼續執行
        System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
        es.shutdown();
    }
    
    /**
     * 生産
     */
    public static void offer() {
        for (int i = 0; i < 100000; i++) {
            queue.offer(i);
        }
    }

    /**
     * 消費
     */
    static class Poll implements Runnable {
        public void run() {
            while (queue.size()>0) {
           // while (!queue.isEmpty()) {
                System.out.println(queue.poll());
            }
            latch.countDown();
        }
    }
}
           

運作結果:

costtime 1415ms

改用while (queue.size()>0)後

運作結果:

cost time 38214ms

結果居然相差那麼大,看了下ConcurrentLinkedQueue的API原來.size()是要周遊一遍集合的,難怪那麼慢,是以盡量要避免用size而改用isEmpty().

總結了下, 在缺乏性能測試下,對自己的程式設計要求更加要嚴格,特别是在生産環境下更是要小心謹慎。