【重難點】【Java集合 04】ArrayDeque 的使用場景、ArrayBlockingQueue 的源碼實作
文章目錄
- 【重難點】【Java集合 04】ArrayDeque 的使用場景、ArrayBlockingQueue 的源碼實作
- 一、ArrayDeque 的使用場景
- 二、ArrayBlockingQueue
-
- 1.阻塞隊列概要
- 2.基本介紹
- 3.使用示例
- 4.源碼分析
一、ArrayDeque 的使用場景
我們可以看到,ArrayDeque 實作了 Deque,Deque 代表一個雙端隊列,該隊列允許從兩端來操作隊列中的元素,Deque 不僅可以當成雙端隊列使用,而且可以當成棧來使用
ArrayDeque 非線程安全,不可以存儲 NULL,因為 ArrayDeque 需要根據某個位置是否為 NULL 來判斷元素的存在
ArrayDeque 作為隊列使用時性能比 LinkedList 好,作為棧使用時 性能比 Stack 好
二、ArrayBlockingQueue
1.阻塞隊列概要
阻塞隊列與我們平常接觸的普通隊列(LinkedList 或 ArrayList 等)的最大不同點,在于阻塞隊列的阻塞添加和阻塞删除方法
- 阻塞添加:當阻塞隊列元素已滿時,隊列會阻塞加入元素的線程,直至隊列不滿時才重新喚醒線程執行元素加入操作
- 阻塞删除:在隊列元素為空時,删除隊列元素的線程将被阻塞,直至隊列不為空再執行删除操作
阻塞隊列接口 BlockingQueue 繼承自 Queue 接口,下面是它的主要方法:
public interface BlockingQueue<E> extends Queue<E>{
//将指定的元素插入到隊列的尾部
//在成功時傳回 true,如果此隊列已滿,則抛出 IllegalStateException
boolean add(E e);
//将指定的元素插入到此隊列的尾部
//将指定的元素插入此隊列的尾部,如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間
//該方法可中斷
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//将指定的元素插入此隊列的尾部,如果該隊列已滿,則一直等待
void put(E e) throws InterruptedException;
//擷取并移除此隊列的頭部,如果沒有元素則等待
//知道有元素将喚醒等待線程執行該操作
E take() throws InterruptedException;
//擷取并移除此隊列的頭部,在指定的等待時間前一直等待擷取元素,超過時間方法将結束
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//從此隊列中移除指定元素的單個執行個體(如果存在)
boolean remove(Object o);
插入方法:
- add(E e):添加成功傳回 true,失敗抛出 IllegalStateException 異常
- offer(E e):成功傳回 true,如果此隊列已滿,則傳回 false
- put(E e):将元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞
删除方法:
- remove(Object o):移除指定元素,成功傳回 true,失敗傳回 false
- poll():擷取并移除此隊列的頭元素,若隊列為空,則傳回 null
- take():擷取并移除此隊列頭元素,若沒有元素則一直阻塞
阻塞隊列對元素的增删改查操作主要就是上述方法,通常情況下我們都是通過這些方法操作阻塞隊列
2.基本介紹
ArrayBlockingQueue 的特點:
- 一個由數組支援的遊街隊列,此隊列按 FIFO(先進先出)原則對元素進行排序
- 新元素插入到隊列的尾部,隊列擷取操作則是從隊列頭部開始獲得元素
- 這是一個簡單的 ”有界緩存區“,一旦建立,就不能再增加其容量
- 在向已滿隊列中添加元素會導緻操作阻塞,從空隊列中提取元素也将導緻阻塞
- 此類支援對等待的生産者線程和使用者線程進行排序的可選公平政策。預設情況下,不保證是這種排序的。然而通過将公平性(fairness)設定為 true,而構造的隊列允許按照 FIFO 順序通路線程。公平性通常會降低吞吐量,但也減少了可變性和避免了 ”不平衡性“
簡單來說,ArrayBlockingQueue 是一個用數組實作的有界阻塞隊列,其内部按先進先出的原則對元素進行排序,其中 put 方法和 take 方法為添加和删除的阻塞方法,下面我們通過 ArrayBlockingQueue 隊列實作一個生産者消費者的案例
3.使用示例
Consumer 消費者,Producer 生産者,通過 ArrayBlockingQueue 隊列擷取和添加元素。其中消費者調用了 take() 方法擷取元素,當隊列沒有元素就阻塞;生産者調用 put() 方法添加元素,當隊列滿時就阻塞。通過這種方式實作生産者消費者模式,比直接使用等待喚醒機制或則和 Condition 條件隊列來得更加簡單
package com.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueDemo {
private final static ArrayBlockingQueue<Apple> queue = new ArrayBlockingQueue<>(1);
public static void main(String[] args){
new Thread(new Producer(queue)).start();
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Apple{
public Apple(){
}
}
//生産者
class Producer implements Runnable{
private final ArrayBlockingQueue<Apple> mAbq;
Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
this.mAbq = arrayBlockingQueue;
}
@Override
public void run(){
while(true){
Produce();
}
}
private void Produce(){
Apple apple = new Apple();
try {
mAbq.put(apple);
System.out.println("生産:" + apple);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//消費者
class Consumer implements Runnable{
private ArrayBlockingQueue<Apple> mAbq;
Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
this.mAbq = arrayBlockingQueue;
}
@Override
public void run(){
while(true){
try {
TimeUnit.MILLISECONDS.sleep(1000);
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void consume() throws InterruptedException{
Apple apple = mAbq.take();
System.out.println("消費 Apple:" + apple);
}
}
運作結果
生産:com.BlockingQueue.Apple@1fee0706
消費 Apple:com.BlockingQueue.Apple@1fee0706
生産:com.BlockingQueue.Apple@4552d520
消費 Apple:com.BlockingQueue.Apple@15166c76
生産:com.BlockingQueue.Apple@15166c76
消費 Apple:com.BlockingQueue.Apple@4b9533fa
生産:com.BlockingQueue.Apple@5014e0b0
消費 Apple:com.BlockingQueue.Apple@4552d520
生産:com.BlockingQueue.Apple@4b9533fa
消費 Apple:com.BlockingQueue.Apple@5014e0b0
生産:com.BlockingQueue.Apple@4faabe50
消費 Apple:com.BlockingQueue.Apple@3f2d0b83
生産:com.BlockingQueue.Apple@3f2d0b83
消費 Apple:com.BlockingQueue.Apple@4faabe50
生産:com.BlockingQueue.Apple@aa839c1
生産:com.BlockingQueue.Apple@610ed35f
消費 Apple:com.BlockingQueue.Apple@aa839c1
消費 Apple:com.BlockingQueue.Apple@610ed35f
消費 Apple:com.BlockingQueue.Apple@59001661
生産:com.BlockingQueue.Apple@47dd66d4
生産:com.BlockingQueue.Apple@59001661
消費 Apple:com.BlockingQueue.Apple@47dd66d4
生産:com.BlockingQueue.Apple@3be8f458
消費 Apple:com.BlockingQueue.Apple@303b1fff
生産:com.BlockingQueue.Apple@303b1fff
消費 Apple:com.BlockingQueue.Apple@3be8f458
生産:com.BlockingQueue.Apple@21aa854b
消費 Apple:com.BlockingQueue.Apple@21aa854b
……
……
……
4.源碼分析
ArrayBlockingQueue 内部的阻塞隊列是通過重入鎖 ReenterLock 和 Condition 條件隊列實作的,是以 ArrayBlockingQueue 中的元素存在公平通路與非公平通路的差別,對于公平通路隊列,被阻塞的線程可以按照阻塞的先後順序通路隊列,即先阻塞的線程先通路隊列。而對于非公平隊列,當隊列可用時,阻塞的線程将進入争奪通路資源的競争中,也就是說誰先搶到誰就執行,沒有固定的先後順序。建立公平與非公平阻塞隊列代碼如下:
//預設非公平阻塞隊列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//公平阻塞隊列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
//構造方法源碼
public ArrayBlockingQueue(int capacity){
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean){
if(capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue 的内部通過一個可重入鎖 ReentrantLock 和兩個 Condition條件對象來實作阻塞,這裡先看看其内部成員變量
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable{
//存儲資料的數組
final Object[] items;
//擷取資料的索引,主要用于 take,poll,peek,remove 方法
int takeIndex;
//添加資料的索引,主要用于 put,offer,add 方法
int putIndex;
//隊列元素的個數
int count;
//控制非通路的鎖
final ReentrantLock lock;
//notEmpty 條件對象,用于通知 take 方法隊列已有元素,可執行擷取操作
private final Condition notEmpty;
//notFull 條件對象,用于通知 put 方法隊列未滿,可執行添加操作
private fianl Condition notFull;
//疊代器
transient Itrs itrs = null;
}
從成員變量可以看出,ArrayBlockingQueue 内部确實是通過數組對象 items 來存儲所有的資料,值得注意的是 ArrayBlockingQueue 通過一個 ReentrantLock 來同時控制添加線程與移除線程的并發通路,這點與 LinkedBlockingQueue 差別很大。而對于 notEmpty 條件對象則是存放等待或喚醒調用 take 方法的線程,告訴他們隊列已有元素,可以執行擷取操作。同理 notFull 條件對象是用于等待或喚醒調用 put 方法的線程,告訴它們,隊列未滿,可以執行添加元素的操作。takeIndex 代表的是下一個方法(take、poll、peek、remove)被調用時擷取數組元素的索引,putIndex 則代表下一個方法(put、offer、add)被調用時元素添加到數組中的索引
添加方法
//offer 方法
public boolean offer(E e){
checkNotNull(e); //檢查元素是否為 null
final ReentrantLock lock = this.lock;
lock.lock(); //加鎖
try{
if(count == items.length) //判斷隊列是否滿
return false;
else{
enqueue(e); //添加元素到隊列
return true;
}
} finally {
lock.unlock();
}
}
//add 方法實作,間接調用了 offer(e)
public boolean add(E e){
if(offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//入隊操作
private void enqueue(E e){
//擷取目前數組
final Object[] items = this.items;
//通過 putIndex 索引對數組進行指派
items[putIndex] = x;
//索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
if(++putIndex == items.length)
putIndex = 0;
count++; //隊列中元素數量加 1
//喚醒調用 take() 方法的線程,執行元素擷取操作
notEmpty.signal();
}
這裡的 offer 方法和 ad 方法實作比較簡單,其中需要注意的是 enqueue(E x) 方法,當 putIndex 索引大小等于數組長度時,需要将 putIndex 重新設定為 0,因為後面講到的取值也是從數組中第一個開始一次往後面去,取了之後會将原位置的值設定為 null,友善循環 put 操作,這裡需要注意并不是每次取數組中的第一個值,takeIndex 也會增加,因為做了添加操作,數組中肯定不會空,則 notEmpty 條件會 喚醒 take() 方法取值
接着看 put 方法,它是一個阻塞添加的方法:
//put 方法,阻塞時可中斷
public void put(E e) throws InterruptedException{
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //該方法可中斷
try{
//當隊列元素個數與數組長度相等時,無法添加元素
while(count == items.length)
//将目前調用線程挂起,添加到 notFull 條件隊列中等待喚醒
notFull.await();
enqueue(e); //如果隊列沒有滿直接添加
} finally {
lock.unlock();
}
}
put 方法是一個阻塞的方法,如果隊列元素已滿,那麼目前線程将會被 notFull 條件對象挂起加到等待隊列中,直到隊列有空檔才會喚醒執行添加操作。但如果隊列沒有滿,那麼就直接調用 enqueue(e) 方法将元素加入到數組隊列中。至此我們對 offer、add、put 三個添加方法都分析完畢
擷取元素
peek 方法非常簡單,直接傳回目前隊列的頭元素但不删除任何元素
public E peek(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
//直接傳回目前隊列的頭元素,但不删除
return itemAt(takeIndex); //null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i){
return (E) items[i];
}
删除元素
首先看 poll 方法,該方法擷取并移除此隊列的頭元素,若隊列為空,則傳回 null
public E poll(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
//判斷隊列是否為 null,不為 null 執行 dequeue() 方法,否則傳回 null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//删除隊列頭元素傳回
private E dequeue(){
//拿到目前數組的資料
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//擷取要删除的對象
E x = (E) items[takeIndex];
//将數組中 takeIndex 索引位置設定為 null
items[takeIndex] = null;
//takeIndex 索引加 1 并判斷是否與數組長度相等,
//如果相等說明已到盡頭,恢複為 0
if(++takeIndex == items.length)
takeIndex = 0;
count--; //隊列個數減 1
if(itrs != null)
itrs.elementDequeued(); //同時更新疊代器中的元素資料
//删除了元素說明隊列有空位,喚醒 notFull 條件對象添加線程,執行添加操作
notFull.signal();
return x;
}
接着看 take() 方法,是一個阻塞方法,擷取隊列頭元素并删除
//從隊列頭部删除,隊列沒有元素就阻塞,可中斷
public E take() throws InterruptedException{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //中斷
try{
//如果隊列沒有元素
while(count == 0)
//執行阻塞操作
notEmpty.await();
return dequeue(); //執行删除操作
} finally {
lock.unlock();
}
}
take 和 poll 的差別是,隊列為空時,poll 傳回 null,take 則被挂起阻塞,直到有元素添加進來,take 線程被喚醒,然後擷取第一個元素并删除