《Java源碼分析》:LinkedBlockingQueue
LinkedBlockingQueue是大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。
1、LinkedBlockingQueue的繼承體系結構
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
LinkedBlockingQueue的繼承體系結構如下,和ArrayBlockingQueue一模一樣。
2、LinkedBlockingQueue的相關屬性介紹
1、連結清單節點
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
2、private final int capacity;容量邊界,如果沒有指定,則為Integer.MAX_VALUE
3、private final AtomicInteger count = new AtomicInteger(); 目前隊列中存儲元素的數量
4、transient Node head;隊列的頭結點,且始終head.item=null,即此頭結點不存儲任何元素。
5、private transient Node last;隊列的尾結點,且始終last.next=null.
6、private final ReentrantLock takeLock = new ReentrantLock();take操作時所需要加的鎖
7、當take操作時如果隊列中存儲的元素為空,則調用此Condition的await方法。
private final Condition notEmpty = takeLock.newCondition();
8、put操作時所需要加的鎖和Condition
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
從上面可以看到,LinkedBlockingQueue阻塞隊列的實作采用了兩把鎖,一個是take鎖,一個是put鎖,這就說明,put和take操作是可以同時進行的。
3、LinkedBlockingQueue的構造方法
創造一個容量為Integer.MAX_VALUE的隊列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
創造一個指定大小(必須大于零)的隊列,如果參數小于等于零,則抛異常。
public LinkedBlockingQueue(int capacity) {
if (capacity <= ) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//頭結點和尾結點中存儲的元素為null
}
4、LinkedBlockingQueue的put方法介紹
函數功能:插入一個元素到隊列的末尾,如果隊列已滿則等待直到有空間進行存儲
源碼如下:(有詳細的注釋)
public void put(E e) throws InterruptedException {
//檢查是否為null,如果為null,則抛異常
if (e == null) throw new NullPointerException();
//注意:用此局部變量持有一個負數來訓示CAS操作是否操作成功。 c = count.getAndIncrement();//利用原子性加一
int c = -;
Node<E> node = new Node<E>(e);//根據目前的值構造一個節點
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//加鎖
try {
//如果已到達最大容量,則等待直到有空間來進行存儲才會被喚醒
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);//将節點加入到連結清單的末尾
c = count.getAndIncrement();//利用原子性加一
//如果目前存儲的元素個數小于容量,則喚醒正在等待的生産者的線程。
if (c + < capacity)
notFull.signal();
} finally {
putLock.unlock();//釋放鎖
}
//隊列不為空,剛剛添加了一個元素,是以需要喚醒一個消費者線程
if (c == )
signalNotEmpty();
}
/*
* 将節點加入到連結清單的末尾
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//喚醒一個消費者線程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
put方法實作的思想也比較簡單,叙述如下:
1、首先檢查元素是否為null,如果是,則抛異常,如果不是,則繼續進行
2、加鎖
3、檢查隊列是否已滿,如果已滿,則調用put所對應的Condition的await方法進行等待,直到有signal來将其喚醒。如果沒有滿,則進行 4.
4、調用enqueue方法将此節點加入到隊列中,并将計數器進行加一操作。
本來到這裡就差不多結束了,但是還需要處理兩種喚醒操作。
5、如果此時隊列中存儲的元素小于最大容量,則喚醒其他正在等待生産的線程。
6、由于剛剛生産了元素,是以最後還需要喚醒消費者線程。
7、釋放鎖
put(e)方法的思想相當簡單哈,下面來看take方法
LinkedBlockingQueue的take方法介紹
函數功能:擷取隊列中隊首的元素。
源碼如下:(有詳細的注釋)
public E take() throws InterruptedException {
E x;
int c = -;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//加take鎖
try {
//如果目前隊列為空,則一直等待直到隊列非空
while (count.get() == ) {
notEmpty.await();
}
x = dequeue();//将頭結點取出來
c = count.getAndDecrement();//利用原子性進行進行減一操作
//如果此時的容量是大于1的,則說明還有其它元素可以被消費線程擷取,是以喚醒其他消費者線程
if (c > )
notEmpty.signal();
} finally {
takeLock.unlock();//釋放鎖
}
//如果c的容量是等于capacity,又被消費了一個,是以可以通知生産者線程來進行生産
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
//頭結點指向下一個節點,并将頭結點中的元素置為null。即頭結點的item始終為null。
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* 喚醒一個生産者線程
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
此方法實作的思想也特别簡單,叙述如下:
1、加鎖
2、判斷隊列中是否有元素,如果沒有,則阻塞等待直至隊列非空被喚醒。如果有,則進行 3
3、取出隊列中頭結點的下一個節點(頭結點為傀儡節點)。并将計數器進行減一操作。
4、如果此時的容量是大于1的,則說明還有其它元素可以被消費線程擷取,是以喚醒一個正在等待的消費者線程
5、如果此時的容量小于隊列的最大容量,則喚醒正在等待的生産者線程進行生産。
6、釋放鎖
以上就是關于LinkedBlockingQueue的相關分析,比較簡單哈。
小結
關于LinkedBlockingQueue我們隻需要記住以下幾點
1、是基于連結清單來實作的
2、使用了兩個鎖,take、put操作各一把鎖。以及借用了兩個Condition來進行阻塞和喚醒操作。