LinkedBlockingQueue是一個基于已連結節點的、範圍任意的blocking queue的實作。 由于LinkedBlockingQueue實作是線程安全的,實作了先進先出等特性,是作為生産者消費者的首選,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
此隊列按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。
新元素插入到隊列的尾部,并且隊列檢索操作會獲得位于隊列頭部的元素。連結隊列的吞吐量通常要高于基于數組的隊列,
但是在大多數并發應用程式中,其可預知的性能要低。
可選的容量範圍構造方法參數作為防止隊列過度擴充的一種方法。
如果未指定容量,則它等于 Integer.MAX_VALUE。除非插入節點會使隊列超出容量,否則每次插入後會動态地建立連結節點。
1:如果未指定容量,預設容量為Integer.MAX_VALUE ,容量範圍可以在構造方法參數中指定作為防止隊列過度擴充。
2:此對象是 線程阻塞-安全的
3:不接受 null 元素
4:它實作了BlockingQueue接口。
5:實作了 Collection 和 Iterator 接口的所有可選 方法。
6:在JDK5/6中,LinkedBlockingQueue和ArrayBlocingQueue等對象的poll(long timeout, TimeUnit unit)存在記憶體洩露Leak的對象AbstractQueuedSynchronizer.Node,據稱JDK5會在Update12裡Fix,JDK6會在Update2裡Fix
示例代碼如下:
package com.jh.sms.test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestBlockingQueue {
static BlockingQueue <Hamburger> queue=new LinkedBlockingQueue<Hamburger>(10);
public static void main(String[] args) throws InterruptedException {
Producer t1 = new Producer();
Consumer t2 = new Consumer();
t1.start();
t2.start();
System.out.println("Thread.sleep(1000)");
Thread.sleep(1000);
t2.interrupt();
}
}
class Hamburger{
int id;
public Hamburger(int id) {
this.id=id;
}
@Override
public String toString() {
return "Hamburger: "+id;
}
}
class Producer extends Thread{
@Override
public void run() {
int i=0;
while(i<10){
Hamburger e = new Hamburger(i);
try {
System.out.println("Produce Hamburger: "+i);
TestBlockingQueue.queue.put(e);
} catch (InterruptedException e1) {
System.out.println("Hamburger so many, it was closed.");
return;
}
i++;
}
}
}
class Consumer extends Thread{
@Override
public void run() {
while(true){
try {
System.out.println("Eat Hamburger: "+TestBlockingQueue.queue.take());
} catch (InterruptedException e1) {
System.out.println("Hamburger so less, It was stopped.");
return;
}
}
}
}
由于阻塞隊列LinkedBlockingQueue,FIFO,使用它的put(),take()會判斷目前隊列是有值,即等待生産再消費,即便是兩個線程并行執行,很簡單友善的解決了生産者消費者問題,但是在這裡需要注意的是在兩個run()方法中,列印目前生産或消費Hamburger 的時候,最好把put()和take()方法放在相應的列印語句中一起執行,否則會發生先消費後生産的後果。 因為列印語句和方法的執行時兩段代碼,由于雙線程同時執行,無法保證執行的相應代碼塊的順序性!!由于最後互相等待會造成死鎖,是以在主線程睡眠1秒後打斷消費者,讓它别等了,抛異常後return結束消費線程,最後整個main方法調用結束。
ConcurrentLinkedQueue
ConcurrentLinkedQueue是Queue的一個安全實作.Queue中元素按FIFO原則進行排序.采用CAS操作,來保證元素的一緻性。
LinkedBlockingQueue是一個線程安全的阻塞隊列,它實作了BlockingQueue接口,BlockingQueue接口繼承自java.util.Queue接口,并在這個接口的基礎上增加了take和put方法,這兩個方法正是隊列操作的阻塞版本。
package com.jh.sms.test;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
private static int count = 2; // 線程個數
//CountDownLatch,一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
private static CountDownLatch latch = new CountDownLatch(count);
public static void main(String[] args) throws InterruptedException {
long timeStart = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(4);
ConcurrentLinkedQueueTest.offer();
for (int i = 0; i < count; i++) {
es.submit(new Poll());
}
latch.await(); //使得主線程(main)阻塞直到latch.countDown()為零才繼續執行
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
es.shutdown();
}
/**
* 生産
*/
public static void offer() {
for (int i = 0; i < 100000; i++) {
queue.offer(i);
}
}
/**
* 消費
*/
static class Poll implements Runnable {
public void run() {
while (queue.size()>0) {
// while (!queue.isEmpty()) {
System.out.println(queue.poll());
}
latch.countDown();
}
}
}
運作結果:
costtime 1415ms
改用while (queue.size()>0)後
運作結果:
cost time 38214ms
結果居然相差那麼大,看了下ConcurrentLinkedQueue的API原來.size()是要周遊一遍集合的,難怪那麼慢,是以盡量要避免用size而改用isEmpty().
總結了下, 在缺乏性能測試下,對自己的程式設計要求更加要嚴格,特别是在生産環境下更是要小心謹慎。