一.概述
LinkedBlockingQueue是單向連結清單實作的可選的有界的阻塞隊列
1. 隊列元素是先進先出FIFO (first-in-first-out)
2. 隊列的頭元素head是在隊列中時間最長的元素,因為最先入隊列的,擷取操作(poll、peek、take)傳回頭元素head
3. 隊列的尾元素last是在隊列中時間最短的元素,因為最後入隊列的,新元素插入(offer、put)隊列的尾部
4. 可選容量範圍capacity的構造方法是防止隊列過度擴充的一種方法,如果沒有指定最大容量capacity,那麼最預設的最大容量為Integer.MAX_VALUE
5. 連結隊列的節點都是動态建立的,出隊列的節點可以被GC所回收,是以其具有靈活的伸縮性
6. LinkedBlockingQueue可以同時進行入隊和出隊列,它的入隊列和出隊列使用的是兩個不同的lock對象,是以無論是在入隊列還是出隊列,都會涉及對元素數量count的并發修改,是以這裡使用了一個原子操作類來解決對同一個變量進行并發修改的線程安全問題
二.LinkedBlockingQueue對象結構
三.節點元素
所有的元素都通過Node這個靜态内部類來進行存儲,這與LinkedList的處理方式完全一樣
元素節點Node對象的next是下面3種之一:
6. 指向下一個元素
7. 指向目前元素, 即是頭元素head的下一個元素head.next
8. null, 意味着是最後的元素
static class Node<E> {
E item;
Node<E> next;
Node(E x) {
item = x;
}
}
四.建立對象執行個體
- 可以在建立時手動指定最大容量capacity,如果沒有指定最大容量,那麼最預設的最大容量為Integer.MAX_VALUE.
- 變量capacity被final修飾,表示初始化後不可變
- 隊列頭節點head有一個不變性:頭節點item始終為null -head.item=null
- 隊列尾節點last也有一個不變性:尾節點next始終為null -last.next=null
// 阻塞隊列所能存儲的最大容量
private final int capacity;
/**
* 連結清單隊列的頭節點
* 連結清單的頭部元素item始終為null:head.item = null
* 建立執行個體時:last = head = new Node<E>(null);
*/
transient Node<E> head;
/**
* 連結清單隊列的尾節點
* 連結清單尾元素下一個元素next始終為null:tail.next = null
* 建立執行個體時:last = head = new Node<E>(null);
*/
private transient Node<E> last;
/**
* 建立一個容量為 Integer.MAX_VALUE 的 LinkedBlockingQueue
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 建立一個具有給定(固定)容量的 LinkedBlockingQueue
*
* @param capacity 隊列容量大小
* @throws IllegalArgumentException 如果隊列容量小于0抛出異常
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= ) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* 建立一個容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue
* 最初包含給定 collection 的元素,元素按該 collection 疊代器的周遊順序添加。
*
* @param c 包含初始元素所屬的 collection
* @throws NullPointerException 如果指定 collection 或其所有元素均為 null抛出異常
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
// 擷取鎖
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = ;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
建立隊列執行個體
// 最大容量10
LinkedBlockingQueue<Integer> linkedQueue1 = new LinkedBlockingQueue<>();
// 最大容量Integer.MAX_VALUE
LinkedBlockingQueue<Integer> linkedQueue2 = new LinkedBlockingQueue<>();
// 最大容量Integer.MAX_VALUE,用集合list初始化
List<Integer> list = new ArrayList<>();
LinkedBlockingQueue linkedQueue3 = new LinkedBlockingQueue(list);
五.入隊列
- 在入隊列和出隊列時使用的不是同一個Lock,入隊列使用ReentrantLock putLock
- 入隊列使用Condition notFull對象螢幕-當隊列的元素已經達到最大容量capactiy,通過該notFull讓入隊列的線程處于等待狀态
- 目前阻塞隊列中的元素數量AtomicInteger count,入隊列和出隊列使用的是兩個不同的lock對象,但無論是在入隊列還是出隊列,都會涉及對元素數量count的并發修改,是以這裡使用了一個原子操作類來解決對同一個變量進行并發修改的線程安全問題
- 入隊列操作之前需要先擷取putLock鎖,執行完釋放鎖putLock.unlock();
- 當隊列達到最大容量count.get() = capacity,notFull對象螢幕notFull.await()等待,即生産線程停止生産,等待隊列未滿
5.1 定義變量
/**
* 目前阻塞隊列中的元素數量
* PS:如果看過ArrayBlockingQueue的源碼,會發現ArrayBlockingQueue底層儲存元素數量使用的是一個普通的int類型變量。
* 其原因是在ArrayBlockingQueue底層對于元素的入隊列和出隊列使用的是同一個lock對象。而數量的修改都是在處于線程擷取鎖的情況下進行操作, 是以不會有線程安全問題。
* 而LinkedBlockingQueue卻不是,它的入隊列和出隊列使用的是兩個不同的lock對象,是以無論是在入隊列還是出隊列,都會涉及對元素數量count的并發修改,是以這裡使用了一個原子操作類來解決對同一個變量進行并發修改的線程安全問題。*/
private final AtomicInteger count = new AtomicInteger();
/**
* 1. 元素入隊列時線程所擷取的鎖
* 2. 當執行add、put、offer等操作時線程需要擷取鎖
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 當隊列的元素已經達到capactiy,通過該Condition讓元素入隊列的線程處于等待狀态
*/
private final Condition notFull = putLock.newCondition();
5.2 入隊列 - enqueue
/**
* 在隊列的尾部插入node
* @param node the node
*/
private void enqueue(Node<E> node) {
last = last.next = node;
}
上面的代碼其實沒什麼花樣,指派操作符“=”是從右往左的,是以上面代碼等價于:
last.next = node
last=node
5.2.1 空隊列
建立隊列或出隊列為空時,頭結點head和尾節點last都指向同一節點
last = head = new Node<E>(null);
5.2.2 入隊列1
last = last.next = node1
5.2.3 入隊列2
last = last.next = node2
5.3 入隊列方法
5.3.1 add(E e)
- 如果隊列未滿,立即執行在隊列的尾部插入指定的元素e,成功傳回true
- 如果隊列已滿,抛出異常IllegalStateException
/**
* add(e)測試:
* @throws NullPointerException 待插入元素為null
* @throws IllegalStateException 如果隊列已滿時執行插入
*/
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
5.3.2 offer(E e)
- offer方法是非阻塞的, 當隊列已經滿了,它不會繼續等待,而是直接傳回
- 當入隊列擷取到鎖時,需要進行二次的檢查,因為可能當隊列的大小為capacity-1時,兩個線程同時去搶占鎖,而隻有一個線程搶占成功,那麼此時當線程将元素入隊列後,釋放鎖,後面的線程搶占鎖之後,此時隊列大小已經達到capacity,是以将它無法讓元素入隊列
/**
* offer(e)測試:
* 1. 如果隊列未滿,立即執行在隊列的尾部插入指定的元素e,成功傳回true
* 2. 如果隊列已滿,傳回false
*
* offer(e)通常要優于add(e),因為add(e)在隊列滿了時會抛出IllegalStateException異常
*
* @throws NullPointerException 待插入元素為null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {// 二次檢查
enqueue(node);
c = count.getAndIncrement();
if (c + < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == )
signalNotEmpty();
return c >= ;
}
5.3.3 offer(E e, long timeout, TimeUnit unit)
- 一個限時等待插入操作,即在等待一定的時間内,如果隊列有空間可以插入,那麼就将元素入隊列,然後傳回true,如果在過完指定的時間後依舊沒有空間可以插入,那麼就傳回false
- 通過timeout和TimeUnit來指定等待的時長,timeout為時間的長度,TimeUnit為時間的機關
/**
* offer(E e, long timeout, TimeUnit unit)測試:
* 1. 如果隊列未滿,立即執行在隊列的尾部插入指定的元素e,成功傳回true
* 2. 如果隊列已滿,等待指定的時間timeout(機關為unit)以使空間變為可用。
*
* @throws NullPointerException 待插入元素為null
* @throws InterruptedException 在等待時被中斷
* */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= )
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == )
signalNotEmpty();
return true;
}
5.3.4 入隊列 - put
put為阻塞方法,直到隊列有空餘時,才能為隊列加入新元素
/**
* put(e)測試:
* 1. 如果隊列未滿,立即執行在隊列的尾部插入指定的元素e
* 2. 如果隊列已滿,等待隊列空間可用後喚醒再插入
*
* @throws NullPointerException 待插入元素為null
* @throws InterruptedException 如果在等待時被中斷
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == )
signalNotEmpty();
}
六.出隊列
- 在入隊列和出隊列時使用的不是同一個Lock,出隊列使用ReentrantLock takeLock
- 出隊列使用Condition notEmpty對象螢幕-當隊列的為空時,通過該notEmpty讓出隊列的線程處于等待狀态
- 目前阻塞隊列中的元素數量AtomicInteger count,入隊列和出隊列使用的是兩個不同的lock對象,但無論是在入隊列還是出隊列,都會涉及對元素數量count的并發修改,是以這裡使用了一個原子操作類來解決對同一個變量進行并發修改的線程安全問題
- 出隊列操作之前需要先擷取takeLock鎖,執行完釋放鎖takeLock.unlock();
- 當隊列為空count.get() = 0,notEmpty對象螢幕notEmpty.await()等待,即消費線程停止消費,等待隊列不為空
6.1 定義變量
private final AtomicInteger count = new AtomicInteger();
/**
* 元素出隊列時線程所擷取的鎖
* 當執行take、poll等操作時線程需要擷取的鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 當隊列為空時,通過該Condition讓從隊列中擷取元素的線程處于等待狀态
*/
private final Condition notEmpty = takeLock.newCondition();
6.2 出隊列流程
/**
* 從隊列的頭部删除節點node
* 頭部元素出隊列的過程,其最終的目的是讓原來的head被GC回收,讓其next成為head,并且新的head的item為null.
* 因為LinkedBlockingQueue的頭部具有一緻性:即元素為null。
*
* @return the node
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
6.2.1 滿隊列
6.2.2 出隊列1
6.2.3 出隊列2
6.2.4 出隊列節點GC
/**
* 從隊列的頭部删除節點node
* 頭部元素出隊列的過程,其最終的目的是讓原來的head被GC回收,讓其next成為head,并且新的head的item為null.
* 因為LinkedBlockingQueue的頭部具有一緻性:即元素為null
*/
6.3 出隊列方法
6.3.1 poll()
- 擷取并移除此隊列的頭,如果此隊列為空,則傳回 null
- poll()為非阻塞方法
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == )
return null;
E x = null;
int c = -;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > ) {
x = dequeue();
c = count.getAndDecrement();
if (c > )
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
6.3.2 poll(timeout, unit)
- 擷取并移除此隊列的頭部,在指定的等待時間前等待可用的元素
- pool()為非阻塞方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == ) {
if (nanos <= )
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > )
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
6.3.3 peek()
- 擷取但不移除此隊列的頭;如果此隊列為空,則傳回 null
- peek()為非阻塞方法
public E peek() {
if (count.get() == )
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
6.3.4 take()
- 擷取并移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)
- take()為阻塞方法
public E take() throws InterruptedException {
E x;
int c = -;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == ) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > )
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
七.疊代器
LinkedBlockingQueue的疊代器,是多線程安全的,在擷取元素之前,會對讀鎖和寫鎖同時加鎖,同時,為了防止死鎖,讀鎖和寫鎖的加解鎖順序,也是經過設計的:
void fullyLock() {
putLock.lock();// 先加寫鎖
takeLock.lock();// 再加讀鎖
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
LinkedBlockingQueue的疊代器中,儲存了以下内容:
private Node<E> current; // 指向下一個節點
private Node<E> lastRet; // 記錄目前節點
private E currentElement; // 目前節點内容
首先,儲存了目前需要傳回的内容,可以保證在目前節點移除的情況下,疊代器的next()方法,也能傳回目前指向的内容,即使先調用hasNext()方法,其他線程删除了目前對象,那麼next()方法也可以保證傳回正确對象
其次,如果在疊代器中,調用remove()方法,删除了目前對象,那麼 lastRet方法就用上了,可以通過再次周遊清單,找到需要删除的對象,并将其删除,同時為了防止remove()方法被調用兩次,在删除時,會将 lastRet設定為null,如果隻有這一個指針,那麼remove()之後,這個疊代器就啥也幹不了了
最後,current儲存了疊代器的下一個指向的位置,調用hasNext()時,可以立即直到是否還有空餘對象,更重要的是,如果在疊代器建立後,其他線程多次調用了出隊的方法,可能導緻 lastRet和current都變成懸挂的指針了,這時,隻要判斷current的next是否為自己,就可以知道自己是否已經被出隊,是否需要重定向current的位置
八.LinkedBlockingQueue與ArrayBlockingQueue的比較
- ArrayBlockingQueue由于其底層基于數組,并且在建立時指定存儲的大小,在完成後就會立即在記憶體配置設定固定大小容量的數組元素,是以其存儲通常有限,故其是一個“有界“的阻塞隊列
- LinkedBlockingQueue可以由使用者指定最大存儲容量,也可以無需指定,如果不指定則最大存儲容量将是Integer.MAX_VALUE,即可以看作是一個“無界”的阻塞隊列,由于其節點的建立都是動态建立,并且在節點出隊列後可以被GC所回收,是以其具有靈活的伸縮性
- 由于ArrayBlockingQueue的有界性,是以其能夠更好的對于性能進行預測,LinkedBlockingQueue由于沒有限制大小,當任務非常多的時候,不停地向隊列中存儲,就有可能導緻記憶體溢出的情況發生
- ArrayBlockingQueue中在入隊列和出隊列操作過程中,使用的是同一個lock,是以即使在多核CPU的情況下,其讀取和操作的都無法做到并行
- LinkedBlockingQueue的讀取和插入操作所使用的鎖是兩個不同的lock,它們之間的操作互相不受幹擾,是以兩種操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue
九.總結
JDK中選用LinkedBlockingQueue作為阻塞隊列的原因:
就在于其無界性。因為線程大小固定的線程池,其線程的數量是不具備伸縮性的,當任務非常繁忙的時候,就勢必會導緻所有的線程都處于工作狀态
1. 使用一個有界的阻塞隊列來進行處理,那麼就非常有可能很快導緻隊列滿的情況發生,進而導緻任務無法送出而抛出RejectedExecutionException
2. 使用一個無界隊列由于其良好的存儲容量的伸縮性,可以很好的去緩沖任務繁忙情況下場景,即使任務非常多,也可以進行動态擴容,當任務被處理完成之後,隊列中的節點也會被随之被GC回收,非常靈活
十.參考
LinkedBlockingQueue源碼分析
https://zhidao.baidu.com/question/393766722974951645.html