PriorityBlockingQueue是一個***的基于數組的優先級阻塞隊列,數組的預設長度是11,雖然指定了數組的長度,但是可以無限的擴充,直到資源消耗盡為止,每次出隊都傳回優先級别最高的或者最低的元素。其實内部是由平衡二叉樹堆來進行排序的,先進行構造二叉樹堆,二叉樹堆排序出來的數每次第一個元素和最後一個元素進行交換,這樣最大的或最小的數就到了最後面,然後最後一個不變,重新構造前面的數組元素以此類推進行堆排序。預設比較器是**null**,也就是使用隊列中元素的**compareTo**方法進行比較,意味着隊列元素要實作Comparable接口。
介紹
當你看本文時,需要具備以下知識點
二叉樹、完全二叉樹、二叉堆、二叉樹的表示方法
如果上述内容不懂也沒關系可以先看概念。
PriorityBlockingQueue
是一個***的基于數組的優先級阻塞隊列,數組的預設長度是11,雖然指定了數組的長度,但是可以無限的擴充,直到資源消耗盡為止,每次出隊都傳回優先級别最高的或者最低的元素。其實内部是由平衡二叉樹堆來進行排序的,先進行構造二叉樹堆,二叉樹堆排序出來的數每次第一個元素和最後一個元素進行交換,這樣最大的或最小的數就到了最後面,然後最後一個不變,重新構造前面的數組元素以此類推進行堆排序。預設比較器是null,也就是使用隊列中元素的compareTo方法進行比較,意味着隊列元素要實作Comparable接口。
- PriorityBlockingQueue是一個***隊列,隊列滿時沒有進行阻塞限制,也就是沒有notFull進行阻塞,是以put是非阻塞的。
- lock鎖獨占鎖控制隻有一個線程進行入隊、出隊操作。
平衡二叉樹堆
二叉堆的本質其實是一個完全二叉樹,它分為兩種類型:
- 最大堆:最大堆的任何一個父節點的值都大于或等于它的左、右孩子節點的值。
- 最小堆:最小堆的任何一個父節點的值都小于或等于它的左、右孩子節點的值。
二叉堆的根節點叫做堆頂。
最大堆和最小堆的特點:最大堆的堆頂是整個堆中最大元素,最小堆的堆頂是整個堆中最小元素。
我們知道二叉堆内部實作其實是基于數組來實作的,為什麼二叉堆又能使用數組來實作呢?因為二叉堆是完全二叉樹,并不會浪費空間資源,對于稀疏二叉樹如果使用數組來實作會有很多左右結點為空的情況,數組中需要進行占位處理,占位處理就會浪費很多空間,得不償失,但是二叉堆是一個完全二叉樹,是以不會有資源的浪費。
數組下标表示方法:
左節點:2*parent+1
右節點:2*parent+2
n坐标節點父節點:n/2
PriorityBlockingQueue類圖結構

源碼分析
通過類圖可以清晰的發現它其實也是繼承自BlockingQueue接口以及Queue接口,說明也是阻塞隊列。在構造函數中預設隊列的容量是11,由于上面我們已經提到了,優先隊列使用的是二叉堆來實作的,二叉堆實作是根據數組來實作的,是以預設構造器中初始化容量為11,如下代碼所示:
/**
* 預設數組長度。
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 最大數組允許的長度。
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
上面講述了二叉堆的原理,二叉堆原理肯定是要進行比較大小,預設比較器是null,也就是使用元素的compareTo方法進行比較來确定元素的優先級,就意味着隊列元素必須實作Comparable接口,如下是構造函數:
/**
* 建立一個預設長度為11的隊列,預設比較器為null。
* Creates a {@code PriorityBlockingQueue} with the default
* initial capacity (11) that orders its elements according to
* their {@linkplain Comparable natural ordering}.
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* 建立一個指定數組初始化長度的隊列,預設比較器為null。
* Creates a {@code PriorityBlockingQueue} with the specified
* initial capacity that orders its elements according to their
* {@linkplain Comparable natural ordering}.
*
* @param initialCapacity the initial capacity for this priority queue
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
/**
* 建立一個指定數組初始化長度的隊列,比較器可以自己指定。
* Creates a {@code PriorityBlockingQueue} with the specified initial
* capacity that orders its elements according to the specified
* comparator.
*
* @param initialCapacity the initial capacity for this priority queue
* @param comparator the comparator that will be used to order this
* priority queue. If {@code null}, the {@linkplain Comparable
* natural ordering} of the elements will be used.
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
/**
* 傳入一個集合,如果集合是SortedSet或PriorityBlockingQueue的話不需要進行堆化,直接使用原有的排序即可,如果不是則需要調用
* heapify方法進行堆初始化操作。
* Creates a {@code PriorityBlockingQueue} containing the elements
* in the specified collection. If the specified collection is a
* {@link SortedSet} or a {@link PriorityQueue}, this
* priority queue will be ordered according to the same ordering.
* Otherwise, this priority queue will be ordered according to the
* {@linkplain Comparable natural ordering} of its elements.
*
* @param c the collection whose elements are to be placed
* into this priority queue
* @throws ClassCastException if elements of the specified collection
* cannot be compared to one another according to the priority
* queue's ordering
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true表示需要進行堆化也就是初始化基于現有集合初始化一個二叉堆,使用下沉方式。
boolean screen = true; // true表示需要篩選空值
if (c instanceof Sort3edSet<?>) {
// 如果是SortedSet不需要進行堆初始化操作。
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
// 不需要進行堆初始化。
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
// 如果是PriorityBlockingQueue不需要進行堆初始化操作。
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
// 不需要篩選空值。
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
// 不需要進行堆初始化操作。
heapify = false;
}
// 将集合轉換成數組類型。
Object[] a = c.toArray();
// 擷取數組的長度大小
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
// 将轉化數組的對象指派給隊列,以及實際存儲的長度。
this.queue = a;
this.size = n;
if (heapify)
// 調整堆大小。
heapify();
}
構造函數中第四個構造函數傳遞的是一個集合,集合如果是SortSet和PriorityBlockingQueue是不需要進行堆初始化操作,如果是其他集合類型則需要進行堆排序。
private void heapify() {
Object[] array = queue;
int n = size;
// 這裡其實就是尋找完全二叉樹中最後一個非葉子節點值,由于數組元素是從0開始的下标,長度是從1開始的是以需要減掉1相等于是數組元素下标最後一個元素下标/2得到的值,也就是最後一個元素的父節點。
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
// 循環周遊非葉子節點的值。
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
例如我們初始化數組為a=[7,1,3,10,5,2,8,9,6],完全二叉樹表示為:
首先half=9/2-1=4-1=3,a[3]=10,剛好是完全二叉樹中最後一個非葉子節點,再往上非葉子節點是3,1,7剛好數組下标是遞減的,然後針對元素10進行下沉操作,發現左右子樹中最小元素是6,則6和10元素進行交換。
然後再下沉元素3,i--操作,得到如下完全二叉樹
繼續周遊下一個非葉子節點,i=2減少1得到i=1,此時a[1]=1左右子節點6和5都比1大是以不需要進行變動,然後i進行減少,則到了i=0,此時a[7]=0,發現左右節點中元素1是最小的,是以先下沉到元素1的位置。
下沉後發現還可以繼續下沉,因為左右節點中右節點元素5是最小的是以還需要進行下沉操作。
至此下沉結束,二叉堆建構完成。
offer操作
接下來看一下隊列是如何進行建構成一個二叉堆的,其實這裡面建構二叉堆以及二叉堆擷取資料時,會采用上浮和下沉的操作來進行處理整個二叉堆的平衡,詳細來看一下offer方法,代碼如下所示:
/**
* Inserts the specified element into this priority queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
// 首先擷取鎖對象。
final ReentrantLock lock = this.lock;
// 隻有一個線程操作入隊和出隊動作。
lock.lock();
// n代表數組的實際存儲内容的大小
// cap代表隊列的整體大小,也就是數組的長度。
int n, cap;
Object[] array;
// 如果數組實際長度大于等于數組的長度時,需要進行擴容操作。
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 如果使用者指定比較器,則使用使用者指定的比較器來進行比較,如果沒有則使用預設比較器。
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 進行上浮操作。
siftUpComparable(n, e, array);
else
// 進行上浮操作。
siftUpUsingComparator(n, e, array, cmp);
// 實際長度增加1,由于有且僅有一個線程操作隊列,是以這裡并沒有使用原子性操作。
size = n + 1;
// 通知等待的線程,隊列已經有資料,可以擷取資料。
notEmpty.signal();
} finally {
// 解鎖操作。
lock.unlock();
}
// 傳回操作成功。
return true;
}
- 首先擷取鎖對象,控制有且僅有一個線程操作隊列。
- 如果數組實際存儲的内容大小大于等于數組長度時進行擴容操作,調用
方法進行擴容。tryGrow
- 使用比較器進行比較,進行上浮操作來建立二叉堆。
tryGrow
是如何進行擴容的呢?
/**
* Tries to grow array to accommodate at least one more element
* (but normally expand by about 50%), giving up (allowing retry)
* on contention (which we expect to be rare). Call only while
* holding lock.
*
* @param array the heap array
* @param oldCap the length of the array
*/
private void tryGrow(Object[] array, int oldCap) {
// 這裡先釋放了鎖,為什麼要釋放鎖呢?詳細請見下面。
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 這裡allocationSpinLock預設是0,通過CAS來講該值修改為1,也就是同時隻有一個線程進行擴容操作。
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果原容量小于64,就執行原容量+2,如果原容量大于64,擴大一倍容量。
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 新容量超過了最大容量,将容量調整為最大值。
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 建立新的數組對象。
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 擴容成功,則将值修改為0。
allocationSpinLock = 0;
}
}
// 這裡是為了其他線程進入後優先讓擴容的線程進行操作。
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 加鎖操作。
lock.lock();
// 将原資料拷貝到新數組中。
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
看似擴容還是很容易了解的,開始的時候為什麼要先釋放鎖呢,然後用CAS控制隻有一個線程可以擴容呢?其實可以不釋放鎖,也能控制隻允許一個線程進行擴容操作,但是會産生性能問題,因為當我擴容的時候,一直獲得鎖會不允許其他線程進行入隊和出隊操作,擴容的時候先釋放鎖,也就是可以其他線程進行處理出隊和入隊操作,例如第一個線程先CAS成功,并且進行擴容中,此時第二個線程進入後allocationSpinLock此時已經等于1了,也就是不會進行擴容操作,而是會直接進入到判斷newArray==null,此時線程二發現newArray為null,線程而會調用Thread.yield來讓出CPU,目的是讓擴容的線程擴容後優先調用lock.lock重新獲得鎖,但是這又不能完全保證,有可能yield退出了擴容還沒有結束,此時線程二獲得鎖,如果目前數組擴容沒有完畢,則線程二會再次調用offer的tryGrow進行擴容操作,再次給擴容線程讓出了鎖,再次調用yield讓出CPU。當擴容線程進行擴容時,其他線程自旋的檢測目前線程是否成功,成功了才會進行入隊的操作。
擴容成功後就需要建構二叉堆,建構二叉堆調用的是
siftUpComparable
方法,也就是上浮操作,接下來詳細講解一下上浮操作是如何進行的?
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 找到節點k的父節點。
int parent = (k - 1) >>> 1;
// 擷取父節點的值。
Object e = array[parent];
// 比較父節點和插入值的大小,如果插入值大于父節點直接插入到末尾。
if (key.compareTo((T) e) >= 0)
break;
// 将父節點設定到子節點,小資料進行上浮操作。
array[k] = e;
// 将父節點的下标設定給k。
k = parent;
}
// 最後key上浮到k的位置。
array[k] = key;
}
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
為了能夠示範算法的整個過程,這裡舉例來說明一下:
為了示範擴容的過程我們先初始化隊列長度為2,後面會進行擴容操作。
- 當我們調用offer(3)時,此時k=n=size=0,x是我們要插入的内容,array是二叉堆數組,當我們插入第一個元素是,發現k>0是不成立的,是以直接運作 array[k] = key,此時二叉堆數組隻有一個元素3,如下圖所示:
淺析PriorityBlockingQueue優先級隊列原理 - 第二次調用offer(5),此時k=n=size=1實際存儲長度為1,運作(k-1)>>>1尋找父節點,k-1=0,無符号向右側移動一位,就相當于是$(k-1)/2$,和開始介紹二叉堆特點的時候尋找父節點是一樣的,此時要插入的節點5的父節點是3,需要進行比較3和5的大小,發現父節點小于要插入的節點,是以執行break退出循環,執行array[k] = key操作,将節點5插入到父節點3下面,并且size進行增加1,此時size=2,如下所示:
淺析PriorityBlockingQueue優先級隊列原理 - 當第三次調用offer(9)時,此時k=n=size=2,發現
實際長度與數組的長度相等,此時進行擴容操作,通過上述源碼分析得到,oldCap+(oldCap+2)=2+(2+2)=6,數組的長度從長度為2擴容到長度為6,将原有數組内容指派到新的數組中,擴容之後進入到(n = size) >= (cap = (array = queue).length)
操作進行入隊操作,其實怎麼了解呢?可以了解為我們将元素9插入到數組最後一個位置,也就是隊列的最後一個位置。上浮
然後9這個元素需要找到它的父節點,那就是3,也即是(k-1)>>>1=(2-1)>>>1得到下标為0,array[0]=3,元素9的父節點是3,比較父節點和子節點大小,發現3<9位置不需要進行變動,則二叉堆就變成如下内容,size進行加1,size=3。淺析PriorityBlockingQueue優先級隊列原理 淺析PriorityBlockingQueue優先級隊列原理 - 第四次調用offer(2)時,此時k=n=size=3,還是按照上面意思将元素2暫時插入到數組的末尾,然後進行上浮操作,虛線的意思就是告訴你我現在還不一定在不在這裡,我要和我的父親比較下到底誰大,如果父節點大那我隻能乖乖在這裡喽,如果父節點小,不好意思我得往上浮動了,接下來看一上浮動的過程。 k=3,元素2的父節點=(k-1)>>>1=2>>>1等于1,也就是array[1]的元素5是元素2的父節點,通過上面的二叉圖也能夠清晰看到元素5是父節點,比較發現2<5,執行array[k] = e,array[3]=5,也就是說将5的位置進行下沉操作,這裡源代碼并沒發現2上浮操作,但是在下一次比較中又用到了我們的元素2進行比較,其實作在樹的結構相當于如下所示:
淺析PriorityBlockingQueue優先級隊列原理 發現剛開始元素2在最後節點,現在被替換成了元素5,這就意味着元素2的位置變到了之前元素5的位置,源碼中的k = parent,此時元素2的下标已經變到k=1,但是數組裡面的内容并沒有變,隻是元素下标上浮操作,上浮到父節點,相當于元素2就在元素5的位置,隻是下标1的位置元素内容并沒有直接替換成元素2僅此而已。接下來還會進行循環,數組下标1的元素的父節點是元素3,這裡就不計算了,因為上面第二步計算過,此時發現2<3,需要将元素3的内容進行下沉操作,元素2的下标進行上浮操作。淺析PriorityBlockingQueue優先級隊列原理 此時下标2的元素移動到父節點下标0的位置,發現k<0,則本次循環結束,将array[0]替換成元素2,本次上浮操作結束。淺析PriorityBlockingQueue優先級隊列原理 由此可見,次堆為最小二叉堆,當出隊操作時彈出的是最小的元素。淺析PriorityBlockingQueue優先級隊列原理
poll操作
poll操作就是将隊列内部二叉堆的堆頂元素出隊操作,如果隊列為空則傳回null。如下是poll的源碼:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
- 首先擷取鎖對象,進行上鎖操作,有且僅有一個線程出隊和入隊操作
- 獲得到鎖對象之後進行出隊操作,調用
方法dequeue
- 最後釋放鎖對象
接下來看一下出隊操作是如何進行的,
/**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
// 數組的元素的個數。
int n = size - 1;
// 如果數組中不存在元素則直接傳回null。
if (n < 0)
return null;
else {
// 擷取隊列數組。
Object[] array = queue;
// 将第一個元素也就是二叉堆的根結點堆頂元素作為傳回結果。
E result = (E) array[0];
// 擷取數組中最後一個元素。
E x = (E) array[n];
// 将最後一個元素設定為null。
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 進行下沉操作。
siftDownComparable(0, x, array, n);
else
// 進行下沉操作。
siftDownUsingComparator(0, x, array, n, cmp);
// 實際元素大小減少1.
size = n;
// 傳回結果。
return result;
}
}
通過源碼可以看到,出隊的内容就是二叉堆的堆頂元素arrra[0],而後面它有取到的完全二叉樹的最後一個節點,将最後一個節點設定為null,然後在調用了siftDownComparable對堆進行了調整動作,看一下這個方法的具體實作内容,然後再結合上面入隊的内容進行講解出隊是如何保證二叉堆平衡的。
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; // 最後一個節點的父節點,也就是代表到這裡之後就結束了。 int half = n >>> 1; // loop while a non-leaf while (k < half) { // child=2n+1代表leftChild左節點。 int child = (k << 1) + 1; // assume left child is least // 擷取左節點的值。 Object c = array[child]; // 擷取右節點=2n+1+1=2n+2的坐标 int right = child + 1; // 如果右側節點坐标小于數組中最有一個元素,并且右節點值小于左側節點值,則将c設定為右側節點值。 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; // 對比c和目前最後一個元素的大小。 if (key.compareTo((T) c) <= 0) break; // 将坐标k位置設定為比較的後的值。 array[k] = c; // 将光标移動到替換的節點上。 k = child; } // 最後将元素最後一個值指派到k的位置。 array[k] = key; }}private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; }}
看了源碼其實還是有點蒙的,因為根本沒有辦法想象到是如何實作的,接下來跟着思路一步一步的分析,上面offer時最後二叉堆的情況如下圖所示:
- 先調用一次poll操作,此時size=4,result=array[0]=2,n=size-1=3,x=array[n]=array[3]=5,将array[3]位置設定為null,就變成如下的二叉堆 數組中最後一個元素相當于直接被設定為空了,從二叉堆中移除掉了,移除掉了放在那裡呢?其實這裡大家可以了解為放在堆頂,原因是什麼呢?我們想一下,堆頂元素相當于是要被出隊操作,元素2已經出隊了,但是沒有堆頂元素了,此時需要怎麼操作呢?直接從數組中找到完全二叉樹葉子節點最後一個節點,将最後一個節點設定到堆頂,然後将堆頂元素進行下沉操作,但這裡源碼中并沒有實際設定元素5到堆頂,而是經過比較的過程将元素5從堆頂進行下沉的操作,接下來一步一步分析,目前可以将堆看做如下:
淺析PriorityBlockingQueue優先級隊列原理 然後通過源碼可以看到它是現将堆頂也就是元素2的左右節點相比較,比較3<9是以最小節點c=3,然後再跟元素5(可以看做現在是在堆頂)進行比較3<5,發現元素5大于元素3,将元素3的位置替換原堆頂的元素2(此時我們可以完全可以看成元素5的位置也再堆頂,其實就是替換元素5的位置内容),然後元素5的下标從0調整到原來元素3下标1的位置,這個動作我們稱之為下沉操作,下沉過程中并沒有進行内容交換,隻是坐标進行下沉操作了,此時二叉堆内容如下所示:淺析PriorityBlockingQueue優先級隊列原理 half=3>>>1=1,k的下标在這個時候已經變為1,進行k<half時發現等于false,本次循環結束,将下标1位置替換真實下沉的内容 array[k] = key=array[1]=5,此時二叉堆内容如下所示:淺析PriorityBlockingQueue優先級隊列原理
最後将原調整前的result傳回。
put操作
put的操作内部其實就是調用了offer操作,由于是***數組可以進行擴充,是以不需要進行阻塞操作。
public void put(E e) {
offer(e); // never need to block
}
take操作
take操作當隊列為空時進行阻塞操作,當隊列不為空調用offer操作時會調用
notEmpty.signal();
通知等待的線程,隊列中已經有資料了,可以繼續擷取資料了,源碼如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 調用的事可以相應中斷。
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
// 如果隊列為空時等待。
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
總結
- PriorityBlockingQueue是一個阻塞隊列,繼承自BlockingQueue。
- PriorityBlockingQueue的元素必須實作Comparable接口。
- PriorityBlockingQueue内部實作原理是基于二叉堆來實作的,二叉堆的實作是基于數組來實作的。
- PriorityBlockingQueue是一個***數組,插入值時不需要進行阻塞操作,擷取值如果隊列為空時阻塞線程擷取值。