原文:https://blog.csdn.net/javazejian/article/details/72772461
深入了解(1)Java注解類型(@Annotation)
深入了解(2)Java枚舉類型(enum)
深入了解(3)Java類加載器(ClassLoader)
深入了解(4)Java類型資訊(Class對象)與反射機制
深入了解(5)Java記憶體模型(JMM)及volatile關鍵字
深入了解(6)Java并發AQS的共享鎖的實作(基于信号量Semaphore)
深入了解(7)Java無鎖CAS與Unsafe類及其并發包Atomic
深入了解(8)Java并發之synchronized實作原理
深入了解(9)Java基于并發AQS的(獨占鎖)重入鎖(ReetrantLock)及其Condition實作原理
深入了解(10)java并發之阻塞隊列LinkedBlockingQueue與ArrayBlockingQueue
文章目錄
- 阻塞隊列概要
- ArrayBlockingQueue的基本使用
- ArrayBlockingQueue的實作原理剖析
-
- ArrayBlockingQueue原理概要
- ArrayBlockingQueue的(阻塞)添加的實作原理
- ArrayBlockingQueue的(阻塞)移除實作原理
- LinkedBlockingQueue的基本概要
- LinkedBlockingQueue的實作原理剖析
-
- 原理概論
- 添加方法的實作原理
- 移除方法的實作原理
- LinkedBlockingQueue和ArrayBlockingQueue迥異
阻塞隊列概要
阻塞隊列與我們平常接觸的普通隊列(LinkedList或ArrayList等)的最大不同點,在于阻塞隊列支出阻塞添加和阻塞删除方法。
-
阻塞添加
所謂的阻塞添加是指當阻塞隊列元素已滿時,隊列會阻塞加入元素的線程,直隊列元素不滿時才重新喚醒線程執行元素加入操作。
-
阻塞删除
阻塞删除是指在隊列元素為空時,删除隊列元素的線程将被阻塞,直到隊列不為空再執行删除操作(一般都會傳回被删除的元素)
由于Java中的阻塞隊列接口BlockingQueue繼承自Queue接口,是以先來看看阻塞隊列接口為我們提供的主要方法
public interface BlockingQueue<E> extends Queue<E> {
//将指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量)
//在成功時傳回 true,如果此隊列已滿,則抛IllegalStateException。
boolean add(E e);
//将指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量)
// 将指定的元素插入此隊列的尾部,如果該隊列已滿,
//則在到達指定的等待時間之前等待可用的空間,該方法可中斷
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//将指定的元素插入此隊列的尾部,如果該隊列已滿,則一直等到(阻塞)。
void put(E e) throws InterruptedException;
//擷取并移除此隊列的頭部,如果沒有元素則等待(阻塞),
//直到有元素将喚醒等待線程執行該操作
E take() throws InterruptedException;
//擷取并移除此隊列的頭部,在指定的等待時間前一直等到擷取元素, //超過時間方法将結束
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//從此隊列中移除指定元素的單個執行個體(如果存在)。
boolean remove(Object o);
}
//除了上述方法還有繼承自Queue接口的方法
//擷取但不移除此隊列的頭元素,沒有則跑異常NoSuchElementException
E element();
//擷取但不移除此隊列的頭;如果此隊列為空,則傳回 null。
E peek();
//擷取并移除此隊列的頭,如果此隊列為空,則傳回 null。
E poll();12345678910111213141516171819202122232425262728293031323334
這裡我們把上述操作進行分類
- 插入方法:
- add(E e) : 添加成功傳回true,失敗抛IllegalStateException異常
- offer(E e) : 成功傳回 true,如果此隊列已滿,則傳回 false。
- put(E e) :将元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞
- 删除方法:
- remove(Object o) :移除指定元素,成功傳回true,失敗傳回false
- poll() : 擷取并移除此隊列的頭元素,若隊列為空,則傳回 null
- take():擷取并移除此隊列頭元素,若沒有元素則一直阻塞。
- 檢查方法
- element() :擷取但不移除此隊列的頭元素,沒有元素則抛異常
- peek() :擷取但不移除此隊列的頭;若隊列為空,則傳回 null。
阻塞隊列的對元素的增删查操作主要就是上述的三類方法,通常情況下我們都是通過這3類方法操作阻塞隊列,了解完阻塞隊列的基本方法後,下面我們将分析阻塞隊列中的兩個實作類ArrayBlockingQueue和LinkedBlockingQueue的簡單使用和實作原理,其中實作原理是這篇文章重點分析的内容。
ArrayBlockingQueue的基本使用
ArrayBlockingQueue 是一個用數組實作的有界阻塞隊列,其内部按先進先出的原則對元素進行排序,其中put方法和take方法為添加和删除的阻塞方法,下面我們通過ArrayBlockingQueue隊列實作一個生産者消費者的案例,通過該案例簡單了解其使用方式
這裡寫代碼片1
代碼比較簡單, Consumer 消費者和 Producer 生産者,通過ArrayBlockingQueue 隊列擷取和添加元素,其中消費者調用了take()方法擷取元素當隊列沒有元素就阻塞,生産者調用put()方法添加元素,當隊列滿時就阻塞,通過這種方式便實作生産者消費者模式。比直接使用等待喚醒機制或者Condition條件隊列來得更加簡單。執行代碼,列印部分Log如下
package com.zejian.concurrencys.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by wuzejian on 2017/8/13
*/
public class ArrayBlockingQueueDemo {
private final static ArrayBlockingQueue<Apple> queue= new ArrayBlockingQueue<>(1);
public static void main(String[] args){
new Thread(new Producer(queue)).start();
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Apple {
public Apple(){
}
}
/**
* 生産者線程
*/
class Producer implements Runnable{
private final ArrayBlockingQueue<Apple> mAbq;
Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
this.mAbq = arrayBlockingQueue;
}
@Override
public void run() {
while (true) {
Produce();
}
}
private void Produce(){
try {
Apple apple = new Apple();
mAbq.put(apple);
System.out.println("生産:"+apple);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 消費者線程
*/
class Consumer implements Runnable{
private ArrayBlockingQueue<Apple> mAbq;
Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
this.mAbq = arrayBlockingQueue;
}
@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(1000);
comsume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void comsume() throws InterruptedException {
Apple apple = mAbq.take();
System.out.println("消費Apple="+apple);
}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
生産:[email protected]
消費[email protected]
生産:[email protected]
生産:[email protected]
消費[email protected]
消費Ap[email protected]
........1234567
有點需要注意的是ArrayBlockingQueue内部的阻塞隊列是通過重入鎖ReenterLock和Condition條件隊列實作的,是以ArrayBlockingQueue中的元素存在公平通路與非公平通路的差別,對于公平通路隊列,被阻塞的線程可以按照阻塞的先後順序通路隊列,即先阻塞的線程先通路隊列。而非公平隊列,當隊列可用時,阻塞的線程将進入争奪通路資源的競争中,也就是說誰先搶到誰就執行,沒有固定的先後順序。建立公平與非公平阻塞隊列代碼如下:
//預設非公平阻塞隊列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//公平阻塞隊列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
//構造方法源碼
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}123456789101112131415161718
其他方法如下:
//自動移除此隊列中的所有元素。
void clear()
//如果此隊列包含指定的元素,則傳回 true。
boolean contains(Object o)
//移除此隊列中所有可用的元素,并将它們添加到給定collection中。
int drainTo(Collection<? super E> c)
//最多從此隊列中移除給定數量的可用元素,并将這些元素添加到給定collection 中。
int drainTo(Collection<? super E> c, int maxElements)
//傳回在此隊列中的元素上按适當順序進行疊代的疊代器。
Iterator<E> iterator()
//傳回隊列還能添加元素的數量
int remainingCapacity()
//傳回此隊列中元素的數量。
int size()
//傳回一個按适當順序包含此隊列中所有元素的數組。
Object[] toArray()
//傳回一個按适當順序包含此隊列中所有元素的數組;傳回數組的運作時類型是指定數組的運作時類型。
<T> T[] toArray(T[] a)1234567891011121314151617181920212223242526
ArrayBlockingQueue的實作原理剖析
ArrayBlockingQueue原理概要
ArrayBlockingQueue的内部是通過一個可重入鎖ReentrantLock和兩個Condition條件對象來實作阻塞,這裡先看看其内部成員變量
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存儲資料的數組 */
final Object[] items;
/**擷取資料的索引,主要用于take,poll,peek,remove方法 */
int takeIndex;
/**添加資料的索引,主要用于 put, offer, or add 方法*/
int putIndex;
/** 隊列元素的個數 */
int count;
/** 控制并非通路的鎖 */
final ReentrantLock lock;
/**notEmpty條件對象,用于通知take方法隊列已有元素,可執行擷取操作 */
private final Condition notEmpty;
/**notFull條件對象,用于通知put方法隊列未滿,可執行添加操作 */
private final Condition notFull;
/**
疊代器
*/
transient Itrs itrs = null;
}12345678910111213141516171819202122232425262728293031
從成員變量可看出,ArrayBlockingQueue内部确實是通過數組對象items來存儲所有的資料,值得注意的是ArrayBlockingQueue通過一個ReentrantLock來同時控制添加線程與移除線程的并非通路,這點與LinkedBlockingQueue差別很大(稍後會分析)。而對于notEmpty條件對象則是用于存放等待或喚醒調用take方法的線程,告訴他們隊列已有元素,可以執行擷取操作。同理notFull條件對象是用于等待或喚醒調用put方法的線程,告訴它們,隊列未滿,可以執行添加元素的操作。takeIndex代表的是下一個方法(take,poll,peek,remove)被調用時擷取數組元素的索引,putIndex則代表下一個方法(put, offer, or add)被調用時元素添加到數組中的索引。圖示如下
ArrayBlockingQueue的(阻塞)添加的實作原理
//add方法實作,間接調用了offer(e)
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//offer方法
public boolean offer(E e) {
checkNotNull(e);//檢查元素是否為null
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
if (count == items.length)//判斷隊列是否滿
return false;
else {
enqueue(e);//添加元素到隊列
return true;
}
} finally {
lock.unlock();
}
}
//入隊操作
private void enqueue(E x) {
//擷取目前數組
final Object[] items = this.items;
//通過putIndex索引對數組進行指派
items[putIndex] = x;
//索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//隊列中元素數量加1
//喚醒調用take()方法的線程,執行元素擷取操作。
notEmpty.signal();
}1234567891011121314151617181920212223242526272829303132333435363738
這裡的add方法和offer方法實作比較簡單,其中需要注意的是enqueue(E x)方法,其方法内部通過putIndex索引直接将元素添加到數組items中,這裡可能會疑惑的是當putIndex索引大小等于數組長度時,需要将putIndex重新設定為0,這是因為目前隊列執行元素擷取時總是從隊列頭部擷取,而添加元素從中從隊列尾部擷取是以當隊列索引(從0開始)與數組長度相等時,下次我們就需要從數組頭部開始添加了,如下圖示範
ok~,接着看put方法,它是一個阻塞添加的方法,
//put方法,阻塞時可中斷
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//該方法可中斷
try {
//當隊列元素個數與數組長度相等時,無法添加元素
while (count == items.length)
//将目前調用線程挂起,添加到notFull條件隊列中等待喚醒
notFull.await();
enqueue(e);//如果隊列沒有滿直接添加。。
} finally {
lock.unlock();
}
}123456789101112131415
put方法是一個阻塞的方法,如果隊列元素已滿,那麼目前線程将會被notFull條件對象挂起加到等待隊列中,直到隊列有空檔才會喚醒執行添加操作。但如果隊列沒有滿,那麼就直接調用enqueue(e)方法将元素加入到數組隊列中。到此我們對三個添加方法即put,offer,add都分析完畢,其中offer,add在正常情況下都是無阻塞的添加,而put方法是阻塞添加。這就是阻塞隊列的添加過程。說白了就是當隊列滿時通過條件對象Condtion來阻塞目前調用put方法的線程,直到線程又再次被喚醒執行。總得來說添加線程的執行存在以下兩種情況,一是,隊列已滿,那麼新到來的put線程将添加到notFull的條件隊列中等待,二是,有移除線程執行移除操作,移除成功同時喚醒put線程,如下圖所示
ArrayBlockingQueue的(阻塞)移除實作原理
關于删除先看poll方法,該方法擷取并移除此隊列的頭元素,若隊列為空,則傳回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判斷隊列是否為null,不為null執行dequeue()方法,否則傳回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//删除隊列頭元素并傳回
private E dequeue() {
//拿到目前數組的資料
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//擷取要删除的對象
E x = (E) items[takeIndex];
将數組中takeIndex索引位置設定為null
items[takeIndex] = null;
//takeIndex索引加1并判斷是否與數組長度相等,
//如果相等說明已到盡頭,恢複為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//隊列個數減1
if (itrs != null)
itrs.elementDequeued();//同時更新疊代器中的元素資料
//删除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操作
notFull.signal();
return x;
}123456789101112131415161718192021222324252627282930
poll(),擷取并删除隊列頭元素,隊列沒有資料就傳回null,内部通過dequeue()方法删除頭元素,注釋很清晰,這裡不重複了。接着看remove(Object o)方法
public boolean remove(Object o) {
if (o == null) return false;
//擷取數組資料
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
//如果此時隊列不為null,這裡是為了防止并發情況
if (count > 0) {
//擷取下一個要添加元素時的索引
final int putIndex = this.putIndex;
//擷取目前要被删除元素的索引
int i = takeIndex;
//執行循環查找要删除的元素
do {
//找到要删除的元素
if (o.equals(items[i])) {
removeAt(i);//執行删除
return true;//删除成功傳回true
}
//目前删除索引執行加1後判斷是否與數組長度相等
//若為true,說明索引已到數組盡頭,将i設定為0
if (++i == items.length)
i = 0;
} while (i != putIndex);//繼承查找
}
return false;
} finally {
lock.unlock();
}
}
//根據索引删除元素,實際上是把删除索引之後的元素往前移動一個位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//先判斷要删除的元素是否為目前隊列頭元素
if (removeIndex == takeIndex) {
//如果是直接删除
items[takeIndex] = null;
//目前隊列頭元素加1并判斷是否與數組長度相等,若為true設定為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//隊列元素減1
if (itrs != null)
itrs.elementDequeued();//更新疊代器中的資料
} else {
//如果要删除的元素不在隊列頭部,
//那麼隻需循環疊代把删除元素後面的所有元素往前移動一個位置
//擷取下一個要被添加的元素的索引,作為循環判斷結束條件
final int putIndex = this.putIndex;
//執行循環
for (int i = removeIndex;;) {
//擷取要删除節點索引的下一個索引
int next = i + 1;
//判斷是否已為數組長度,如果是從數組頭部(索引為0)開始找
if (next == items.length)
next = 0;
//如果查找的索引不等于要添加元素的索引,說明元素可以再移動
if (next != putIndex) {
items[i] = items[next];//把後一個元素前移覆寫要删除的元
i = next;
} else {
//在removeIndex索引之後的元素都往前移動完畢後清空最後一個元素
items[i] = null;
this.putIndex = i;
break;//結束循環
}
}
count--;//隊列元素減1
if (itrs != null)
itrs.removedAt(removeIndex);//更新疊代器資料
}
notFull.signal();//喚醒添加線程
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
remove(Object o)方法的删除過程相對複雜些,因為該方法并不是直接從隊列頭部删除元素。首先線程先擷取鎖,再一步判斷隊列count>0,這點是保證并發情況下删除操作安全執行。接着擷取下一個要添加源的索引putIndex以及takeIndex索引 ,作為後續循環的結束判斷,因為隻要putIndex與takeIndex不相等就說明隊列沒有結束。然後通過while循環找到要删除的元素索引,執行removeAt(i)方法删除,在removeAt(i)方法中實際上做了兩件事,一是首先判斷隊列頭部元素是否為删除元素,如果是直接删除,并喚醒添加線程,二是如果要删除的元素并不是隊列頭元素,那麼執行循環操作,從要删除元素的索引removeIndex之後的元素都往前移動一個位置,那麼要删除的元素就被removeIndex之後的元素替換,進而也就完成了删除操作。接着看take()方法,是一個阻塞方法,直接擷取隊列頭元素并删除。
//從隊列頭部删除,隊列沒有元素就阻塞,可中斷
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//中斷
try {
//如果隊列沒有元素
while (count == 0)
//執行阻塞操作
notEmpty.await();
return dequeue();//如果隊列有元素執行删除操作
} finally {
lock.unlock();
}
}1234567891011121314
take方法其實很簡單,有就删除沒有就阻塞,注意這個阻塞是可以中斷的,如果隊列沒有資料那麼就加入notEmpty條件隊列等待(有資料就直接取走,方法結束),如果有新的put線程添加了資料,那麼put操作将會喚醒take線程,執行take操作。圖示如下
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直接傳回目前隊列的頭元素,但不删除
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}1234567891011121314
peek方法非常簡單,直接傳回目前隊列的頭元素但不删除任何元素。ok~,到此對于ArrayBlockingQueue的主要方法就分析完了。
LinkedBlockingQueue的基本概要
LinkedBlockingQueue是一個由連結清單實作的有界隊列阻塞隊列,但大小預設值為Integer.MAX_VALUE,是以我們在使用LinkedBlockingQueue時建議手動傳值,為其提供我們所需的大小,避免隊列過大造成機器負載或者記憶體爆滿等情況。其構造函數如下
//預設大小為Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//建立指定大小為capacity的阻塞隊列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//建立大小預設值為Integer.MAX_VALUE的阻塞隊列并添加c中的元素到阻塞隊列
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}1234567891011121314151617181920212223242526272829303132
從源碼看,有三種方式可以構造LinkedBlockingQueue,通常情況下,我們建議建立指定大小的LinkedBlockingQueue阻塞隊列。LinkedBlockingQueue隊列也是按 FIFO(先進先出)排序元素。隊列的頭部是在隊列中時間最長的元素,隊列的尾部 是在隊列中時間最短的元素,新元素插入到隊列的尾部,而隊列執行擷取操作會獲得位于隊列頭部的元素。在正常情況下,連結隊列的吞吐量要高于基于數組的隊列(ArrayBlockingQueue),因為其内部實作添加和删除操作使用的兩個ReenterLock來控制并發執行,而ArrayBlockingQueue内部隻是使用一個ReenterLock控制并發,是以LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。注意LinkedBlockingQueue和ArrayBlockingQueue的API幾乎是一樣的,但它們的内部實作原理不太相同,這點稍後會分析。使用LinkedBlockingQueue,我們同樣也能實作生産者消費者模式。隻需把前面ArrayBlockingQueue案例中的阻塞隊列對象換成LinkedBlockingQueue即可。這裡限于篇幅就不貼重複代碼了。接下來我們重點分析LinkedBlockingQueue的内部實作原理,最後我們将對ArrayBlockingQueue和LinkedBlockingQueue 做總結,闡明它們間的不同之處。
LinkedBlockingQueue的實作原理剖析
原理概論
LinkedBlockingQueue是一個基于連結清單的阻塞隊列,其内部維持一個基于連結清單的資料隊列,實際上我們對LinkedBlockingQueue的API操作都是間接操作該資料隊列,這裡我們先看看LinkedBlockingQueue的内部成員變量
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 節點類,用于存儲資料
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞隊列的大小,預設為Integer.MAX_VALUE */
private final int capacity;
/** 目前阻塞隊列中的元素個數 */
private final AtomicInteger count = new AtomicInteger();
/**
* 阻塞隊列的頭結點
*/
transient Node<E> head;
/**
* 阻塞隊列的尾節點
*/
private transient Node<E> last;
/** 擷取并移除元素時使用的鎖,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty條件對象,當隊列沒有資料時用于挂起執行删除的線程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素時使用的鎖如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull條件對象,當隊列資料已滿時用于挂起執行添加的線程 */
private final Condition notFull = putLock.newCondition();
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
從上述可看成,每個添加到LinkedBlockingQueue隊列中的資料都将被封裝成Node節點,添加的連結清單隊列中,其中head和last分别指向隊列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 對并發進行控制,也就是說,添加和删除操作并不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。這裡再次強調如果沒有給LinkedBlockingQueue指定容量大小,其預設值将是Integer.MAX_VALUE,如果存在添加速度大于删除速度時候,有可能會記憶體溢出,這點在使用前希望慎重考慮。至于LinkedBlockingQueue的實作原理圖與ArrayBlockingQueue是類似的,除了對添加和移除方法使用單獨的鎖控制外,兩者都使用了不同的Condition條件對象作為等待隊列,用于挂起take線程和put線程。
ok~,下面我們看看其其内部添加過程和删除過程是如何實作的。
添加方法的實作原理
對于添加方法,主要指的是add,offer以及put,這裡先看看add方法和offer方法的實作
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}123456
從源碼可以看出,add方法間接調用的是offer方法,如果add方法添加失敗将抛出IllegalStateException異常,添加成功則傳回true,那麼下面我們直接看看offer的相關方法實作
public boolean offer(E e) {
//添加元素為null直接抛出異常
if (e == null) throw new NullPointerException();
//擷取隊列的個數
final AtomicInteger count = this.count;
//判斷隊列是否已滿
if (count.get() == capacity)
return false;
int c = -1;
//建構節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//再次判斷隊列是否已滿,考慮并發情況
if (count.get() < capacity) {
enqueue(node);//添加元素
c = count.getAndIncrement();//拿到目前未添加新元素時的隊列長度
//如果容量還沒滿
if (c + 1 < capacity)
notFull.signal();//喚醒下一個添加線程,執行添加操作
}
} finally {
putLock.unlock();
}
// 由于存在添加鎖和消費鎖,而消費鎖和添加鎖都會持續喚醒等到線程,是以count肯定會變化。
//這裡的if條件表示如果隊列中還有1條資料
if (c == 0)
signalNotEmpty();//如果還存在資料那麼就喚醒消費鎖
return c >= 0; // 添加成功傳回true,否則傳回false
}
//入隊操作
private void enqueue(Node<E> node) {
//隊列尾節點指向新的node節點
last = last.next = node;
}
//signalNotEmpty方法
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
//喚醒擷取并删除元素的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
這裡的Offer()方法做了兩件事,第一件事是判斷隊列是否滿,滿了就直接釋放鎖,沒滿就将節點封裝成Node入隊,然後再次判斷隊列添加完成後是否已滿,不滿就繼續喚醒等到在條件對象notFull上的添加線程。第二件事是,判斷是否需要喚醒等到在notEmpty條件對象上的消費線程。這裡我們可能會有點疑惑,為什麼添加完成後是繼續喚醒在條件對象notFull上的添加線程而不是像ArrayBlockingQueue那樣直接喚醒notEmpty條件對象上的消費線程?而又為什麼要當
if (c == 0)
時才去喚醒消費線程呢?
- 喚醒添加線程的原因,在添加新元素完成後,會判斷隊列是否已滿,不滿就繼續喚醒在條件對象notFull上的添加線程,這點與前面分析的ArrayBlockingQueue很不相同,在ArrayBlockingQueue内部完成添加操作後,會直接喚醒消費線程對元素進行擷取,這是因為ArrayBlockingQueue隻用了一個ReenterLock同時對添加線程和消費線程進行控制,這樣如果在添加完成後再次喚醒添加線程的話,消費線程可能永遠無法執行,而對于LinkedBlockingQueue來說就不一樣了,其内部對添加線程和消費線程分别使用了各自的ReenterLock鎖對并發進行控制,也就是說添加線程和消費線程是不會互斥的,是以添加鎖隻要管好自己的添加線程即可,添加線程自己直接喚醒自己的其他添加線程,如果沒有等待的添加線程,直接結束了。如果有就直到隊列元素已滿才結束挂起,當然offer方法并不會挂起,而是直接結束,隻有put方法才會當隊列滿時才執行挂起操作。注意消費線程的執行過程也是如此。這也是為什麼LinkedBlockingQueue的吞吐量要相對大些的原因。
- 為什麼要判斷
時才去喚醒消費線程呢,這是因為消費線程一旦被喚醒是一直在消費的(前提是有資料),是以c值是一直在變化的,c值是添加完元素前隊列的大小,此時c隻可能是0或if (c == 0)
,如果是c>0
,那麼說明之前消費線程已停止,條件對象上可能存在等待的消費線程,添加完資料後應該是c=0
,那麼有資料就直接喚醒等待消費線程,如果沒有就結束啦,等待下一次的消費操作。如果c+1
那麼消費線程就不會被喚醒,隻能等待下一個消費操作(poll、take、remove)的調用,那為什麼不是條件c>0
才去喚醒呢?我們要明白的是消費線程一旦被喚醒會和添加線程一樣,一直不斷喚醒其他消費線程,如果添加前c>0
,那麼很可能上一次調用的消費線程後,資料并沒有被消費完,條件隊列上也就不存在等待的消費線程了,是以c>0
喚醒消費線程得意義不是很大,當然如果添加線程一直添加元素,那麼一直c>0
,消費線程執行的換就要等待下一次調用消費操作了(poll、take、remove)。c>0
移除方法的實作原理
關于移除的方法主要是指remove和poll以及take方法,下面一一分析
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();//同時對putLock和takeLock加鎖
try {
//循環查找要删除的元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {//找到要删除的節點
unlink(p, trail);//直接删除
return true;
}
}
return false;
} finally {
fullyUnlock();//解鎖
}
}
//兩個同時加鎖
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}1234567891011121314151617181920212223242526272829
remove方法删除指定的對象,這裡我們可能會詫異,為什麼同時對putLock和takeLock加鎖?這是因為remove方法删除的資料的位置不确定,為了避免造成并非安全問題,是以需要對2個鎖同時加鎖。
public E poll() {
//擷取目前隊列的大小
final AtomicInteger count = this.count;
if (count.get() == 0)//如果沒有元素直接傳回null
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//判斷隊列是否有資料
if (count.get() > 0) {
//如果有,直接删除并擷取該元素值
x = dequeue();
//目前隊列大小減一
c = count.getAndDecrement();
//如果隊列未空,繼續喚醒等待在條件對象notEmpty上的消費線程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//判斷c是否等于capacity,這是因為如果滿說明NotFull條件對象上
//可能存在等待的添加線程
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;//擷取頭結點
Node<E> first = h.next; 擷取頭結的下一個節點(要删除的節點)
h.next = h; // help GC//自己next指向自己,即被删除
head = first;//更新頭結點
E x = first.item;//擷取删除節點的值
first.item = null;//清空資料,因為first變成頭結點是不能帶資料的,這樣也就删除隊列的帶資料的第一個節點
return x;
}123456789101112131415161718192021222324252627282930313233343536373839
poll方法也比較簡單,如果隊列沒有資料就傳回null,如果隊列有資料,那麼就取出來,如果隊列還有資料那麼喚醒等待在條件對象notEmpty上的消費線程。然後判斷if (c == capacity)為true就喚醒添加線程,這點與前面分析if(c==0)是一樣的道理。因為隻有可能隊列滿了,notFull條件對象上才可能存在等待的添加線程。
public E take() throws InterruptedException {
E x;
int c = -1;
//擷取目前隊列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中斷
try {
//如果隊列沒有資料,挂機目前線程到條件對象的等待隊列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在資料直接删除并傳回該資料
x = dequeue();
c = count.getAndDecrement();//隊列大小減1
if (c > 1)
notEmpty.signal();//還有資料就喚醒後續的消費線程
} finally {
takeLock.unlock();
}
//滿足條件,喚醒條件對象上等待隊列中的添加線程
if (c == capacity)
signalNotFull();
return x;
}12345678910111213141516171819202122232425
take方法是一個可阻塞可中斷的移除方法,主要做了兩件事,一是,如果隊列沒有資料就挂起目前線程到 notEmpty條件對象的等待隊列中一直等待,如果有資料就删除節點并傳回資料項,同時喚醒後續消費線程,二是嘗試喚醒條件對象notFull上等待隊列中的添加線程。 到此關于remove、poll、take的實作也分析完了,其中隻有take方法具備阻塞功能。remove方法則是成功傳回true失敗傳回false,poll方法成功傳回被移除的值,失敗或沒資料傳回null。下面再看看兩個檢查方法,即peek和element
//構造方法,head 節點不存放資料
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public E element() {
E x = peek();//直接調用peek
if (x != null)
return x;
else
throw new NoSuchElementException();//沒資料抛異常
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//擷取頭結節點的下一個節點
Node<E> first = head.next;
if (first == null)
return null;//為null就傳回null
else
return first.item;//傳回值
} finally {
takeLock.unlock();
}
}12345678910111213141516171819202122232425262728293031
從代碼來看,head頭結節點在初始化時是本身不帶資料的,僅僅作為頭部head友善我們執行連結清單的相關操作。peek傳回直接擷取頭結點的下一個節點傳回其值,如果沒有值就傳回null,有值就傳回節點對應的值。element方法内部調用的是peek,有資料就傳回,沒資料就抛異常。下面我們最後來看兩個根據時間阻塞的方法,比較有意思,利用的Conditin來實作的。
//在指定時間内阻塞添加的方法,逾時就結束
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//将時間轉換成納秒
long nanos = unit.toNanos(timeout);
int c = -1;
//擷取鎖
final ReentrantLock putLock = this.putLock;
//擷取目前隊列大小
final AtomicInteger count = this.count;
//鎖中斷(如果需要)
putLock.lockInterruptibly();
try {
//判斷隊列是否滿
while (count.get() == capacity) {
if (nanos <= 0)
return false;
//如果隊列滿根據阻塞的等待
nanos = notFull.awaitNanos(nanos);
}
//隊列沒滿直接入隊
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//喚醒條件對象上等待的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//喚醒消費線程
if (c == 0)
signalNotEmpty();
return true;
}123456789101112131415161718192021222324252627282930313233343536
對于這個offer方法,我們重點來看看阻塞的這段代碼
//判斷隊列是否滿
while (count.get() == capacity) {
if (nanos <= 0)
return false;
//如果隊列滿根據阻塞的等待
nanos = notFull.awaitNanos(nanos);
}
//CoditionObject(Codition的實作類)中的awaitNanos方法
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//這裡是将目前添加線程封裝成NODE節點加入Condition的等待隊列中
//注意這裡的NODE是AQS的内部類Node
Node node = addConditionWaiter();
//加入等待,那麼就釋放目前線程持有的鎖
int savedState = fullyRelease(node);
//計算過期時間
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//主要看這裡!!由于是while 循環,這裡會不斷判斷等待時間
//nanosTimeout 是否逾時
//static final long spinForTimeoutThreshold = 1000L;
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);//挂起線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//重新計算剩餘等待時間,while循環中繼續判斷下列公式
//nanosTimeout >= spinForTimeoutThreshold
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546
awaitNanos方法中,根據傳遞進來的時間計算逾時阻塞nanosTimeout,然後通過while循環中判斷
nanosTimeout >= spinForTimeoutThreshold
該公式是否成立,當其為true時則說明逾時時間nanosTimeout 還未到期,再次計算
nanosTimeout = deadline - System.nanoTime();
即nanosTimeout ,持續判斷,直到nanosTimeout 小于spinForTimeoutThreshold結束逾時阻塞操作,方法也就結束。這裡的spinForTimeoutThreshold其實更像一個經驗值,因為非常短的逾時等待無法做到十分精确,是以采用了spinForTimeoutThreshold這樣一個臨界值。
offer(E e, long timeout, TimeUnit unit)
方法内部正是利用這樣的Codition的逾時等待awaitNanos方法實作添加方法的逾時阻塞操作。同樣對于
poll(long timeout, TimeUnit unit)
方法也是一樣的道理。
LinkedBlockingQueue和ArrayBlockingQueue迥異
通過上述的分析,對于LinkedBlockingQueue和ArrayBlockingQueue的基本使用以及内部實作原理我們已較為熟悉了,這裡我們就對它們兩間的差別來個小結
1.隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對于後者而言,當添加速度大于移除速度時,在無界的情況下,可能會造成記憶體溢出等問題。
2.資料存儲容器不同,ArrayBlockingQueue采用的是數組作為資料存儲容器,而LinkedBlockingQueue采用的則是以Node節點作為連接配接對象的連結清單。
3.由于ArrayBlockingQueue采用的是數組的存儲容器,是以在插入或删除元素時不會産生或銷毀任何額外的對象執行個體,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間内需要高效并發地處理大批量資料的時,對于GC可能存在較大影響。
4.兩者的實作隊列添加或移除的鎖不一樣,ArrayBlockingQueue實作的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而LinkedBlockingQueue實作的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味着在高并發的情況下生産者和消費者可以并行地操作隊列中的資料,以此來提高整個隊列的并發性能。