天天看點

淺析PriorityBlockingQueue優先級隊列原理

PriorityBlockingQueue是一個***的基于數組的優先級阻塞隊列,數組的預設長度是11,雖然指定了數組的長度,但是可以無限的擴充,直到資源消耗盡為止,每次出隊都傳回優先級别最高的或者最低的元素。其實内部是由平衡二叉樹堆來進行排序的,先進行構造二叉樹堆,二叉樹堆排序出來的數每次第一個元素和最後一個元素進行交換,這樣最大的或最小的數就到了最後面,然後最後一個不變,重新構造前面的數組元素以此類推進行堆排序。預設比較器是**null**,也就是使用隊列中元素的**compareTo**方法進行比較,意味着隊列元素要實作Comparable接口。

介紹

當你看本文時,需要具備以下知識點

二叉樹、完全二叉樹、二叉堆、二叉樹的表示方法

如果上述内容不懂也沒關系可以先看概念。

PriorityBlockingQueue

是一個***的基于數組的優先級阻塞隊列,數組的預設長度是11,雖然指定了數組的長度,但是可以無限的擴充,直到資源消耗盡為止,每次出隊都傳回優先級别最高的或者最低的元素。其實内部是由平衡二叉樹堆來進行排序的,先進行構造二叉樹堆,二叉樹堆排序出來的數每次第一個元素和最後一個元素進行交換,這樣最大的或最小的數就到了最後面,然後最後一個不變,重新構造前面的數組元素以此類推進行堆排序。預設比較器是null,也就是使用隊列中元素的compareTo方法進行比較,意味着隊列元素要實作Comparable接口。

  • PriorityBlockingQueue是一個***隊列,隊列滿時沒有進行阻塞限制,也就是沒有notFull進行阻塞,是以put是非阻塞的。
  • lock鎖獨占鎖控制隻有一個線程進行入隊、出隊操作。

平衡二叉樹堆

二叉堆的本質其實是一個完全二叉樹,它分為兩種類型:

  • 最大堆:最大堆的任何一個父節點的值都大于或等于它的左、右孩子節點的值。
  • 最小堆:最小堆的任何一個父節點的值都小于或等于它的左、右孩子節點的值。

二叉堆的根節點叫做堆頂。

最大堆和最小堆的特點:最大堆的堆頂是整個堆中最大元素,最小堆的堆頂是整個堆中最小元素。

我們知道二叉堆内部實作其實是基于數組來實作的,為什麼二叉堆又能使用數組來實作呢?因為二叉堆是完全二叉樹,并不會浪費空間資源,對于稀疏二叉樹如果使用數組來實作會有很多左右結點為空的情況,數組中需要進行占位處理,占位處理就會浪費很多空間,得不償失,但是二叉堆是一個完全二叉樹,是以不會有資源的浪費。

數組下标表示方法:

左節點:2*parent+1

右節點:2*parent+2

n坐标節點父節點:n/2

PriorityBlockingQueue類圖結構

淺析PriorityBlockingQueue優先級隊列原理

源碼分析

通過類圖可以清晰的發現它其實也是繼承自BlockingQueue接口以及Queue接口,說明也是阻塞隊列。在構造函數中預設隊列的容量是11,由于上面我們已經提到了,優先隊列使用的是二叉堆來實作的,二叉堆實作是根據數組來實作的,是以預設構造器中初始化容量為11,如下代碼所示:

/**
 * 預設數組長度。
 */
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
 * 最大數組允許的長度。
 * The maximum size of array to allocate.
 * Some VMs reserve some header words in an array.
 * Attempts to allocate larger arrays may result in
 * OutOfMemoryError: Requested array size exceeds VM limit
 */
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
           

上面講述了二叉堆的原理,二叉堆原理肯定是要進行比較大小,預設比較器是null,也就是使用元素的compareTo方法進行比較來确定元素的優先級,就意味着隊列元素必須實作Comparable接口,如下是構造函數:

/**
 * 建立一個預設長度為11的隊列,預設比較器為null。
 * Creates a {@code PriorityBlockingQueue} with the default
 * initial capacity (11) that orders its elements according to
 * their {@linkplain Comparable natural ordering}.
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

/**
 * 建立一個指定數組初始化長度的隊列,預設比較器為null。
 * Creates a {@code PriorityBlockingQueue} with the specified
 * initial capacity that orders its elements according to their
 * {@linkplain Comparable natural ordering}.
 *
 * @param initialCapacity the initial capacity for this priority queue
 * @throws IllegalArgumentException if {@code initialCapacity} is less
 *         than 1
 */
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

/**
 * 建立一個指定數組初始化長度的隊列,比較器可以自己指定。
 * Creates a {@code PriorityBlockingQueue} with the specified initial
 * capacity that orders its elements according to the specified
 * comparator.
 *
 * @param initialCapacity the initial capacity for this priority queue
 * @param  comparator the comparator that will be used to order this
 *         priority queue.  If {@code null}, the {@linkplain Comparable
 *         natural ordering} of the elements will be used.
 * @throws IllegalArgumentException if {@code initialCapacity} is less
 *         than 1
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

/**
 * 傳入一個集合,如果集合是SortedSet或PriorityBlockingQueue的話不需要進行堆化,直接使用原有的排序即可,如果不是則需要調用
 * heapify方法進行堆初始化操作。
 * Creates a {@code PriorityBlockingQueue} containing the elements
 * in the specified collection.  If the specified collection is a
 * {@link SortedSet} or a {@link PriorityQueue}, this
 * priority queue will be ordered according to the same ordering.
 * Otherwise, this priority queue will be ordered according to the
 * {@linkplain Comparable natural ordering} of its elements.
 *
 * @param  c the collection whose elements are to be placed
 *         into this priority queue
 * @throws ClassCastException if elements of the specified collection
 *         cannot be compared to one another according to the priority
 *         queue's ordering
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true表示需要進行堆化也就是初始化基于現有集合初始化一個二叉堆,使用下沉方式。
    boolean screen = true;  // true表示需要篩選空值
    if (c instanceof Sort3edSet<?>) {
      	// 如果是SortedSet不需要進行堆初始化操作。
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
      	// 不需要進行堆初始化。
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue<?>) {
      	// 如果是PriorityBlockingQueue不需要進行堆初始化操作。
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
      	// 不需要篩選空值。
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
          	// 不需要進行堆初始化操作。
            heapify = false;
    }
  	// 将集合轉換成數組類型。
    Object[] a = c.toArray();
  	// 擷取數組的長度大小
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
  	// 将轉化數組的對象指派給隊列,以及實際存儲的長度。
    this.queue = a;
    this.size = n;
    if (heapify)
      	// 調整堆大小。
        heapify();
}
           

構造函數中第四個構造函數傳遞的是一個集合,集合如果是SortSet和PriorityBlockingQueue是不需要進行堆初始化操作,如果是其他集合類型則需要進行堆排序。

private void heapify() {
    Object[] array = queue;
    int n = size;
  	// 這裡其實就是尋找完全二叉樹中最後一個非葉子節點值,由于數組元素是從0開始的下标,長度是從1開始的是以需要減掉1相等于是數組元素下标最後一個元素下标/2得到的值,也就是最後一個元素的父節點。
    int half = (n >>> 1) - 1;
    Comparator<? super E> cmp = comparator;
    if (cmp == null) {
      	// 循環周遊非葉子節點的值。
        for (int i = half; i >= 0; i--)
            siftDownComparable(i, (E) array[i], array, n);
    }
    else {
        for (int i = half; i >= 0; i--)
            siftDownUsingComparator(i, (E) array[i], array, n, cmp);
    }
}
           

例如我們初始化數組為a=[7,1,3,10,5,2,8,9,6],完全二叉樹表示為:

淺析PriorityBlockingQueue優先級隊列原理

首先half=9/2-1=4-1=3,a[3]=10,剛好是完全二叉樹中最後一個非葉子節點,再往上非葉子節點是3,1,7剛好數組下标是遞減的,然後針對元素10進行下沉操作,發現左右子樹中最小元素是6,則6和10元素進行交換。

淺析PriorityBlockingQueue優先級隊列原理

然後再下沉元素3,i--操作,得到如下完全二叉樹

淺析PriorityBlockingQueue優先級隊列原理

繼續周遊下一個非葉子節點,i=2減少1得到i=1,此時a[1]=1左右子節點6和5都比1大是以不需要進行變動,然後i進行減少,則到了i=0,此時a[7]=0,發現左右節點中元素1是最小的,是以先下沉到元素1的位置。

淺析PriorityBlockingQueue優先級隊列原理

下沉後發現還可以繼續下沉,因為左右節點中右節點元素5是最小的是以還需要進行下沉操作。

淺析PriorityBlockingQueue優先級隊列原理

至此下沉結束,二叉堆建構完成。

offer操作

接下來看一下隊列是如何進行建構成一個二叉堆的,其實這裡面建構二叉堆以及二叉堆擷取資料時,會采用上浮和下沉的操作來進行處理整個二叉堆的平衡,詳細來看一下offer方法,代碼如下所示:

/**
 * Inserts the specified element into this priority queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws ClassCastException if the specified element cannot be compared
 *         with elements currently in the priority queue according to the
 *         priority queue's ordering
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
  	// 首先擷取鎖對象。
    final ReentrantLock lock = this.lock;
  	// 隻有一個線程操作入隊和出隊動作。
    lock.lock();
  	// n代表數組的實際存儲内容的大小
  	// cap代表隊列的整體大小,也就是數組的長度。
    int n, cap;
    Object[] array;
  	// 如果數組實際長度大于等于數組的長度時,需要進行擴容操作。
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
      	// 如果使用者指定比較器,則使用使用者指定的比較器來進行比較,如果沒有則使用預設比較器。
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
          	// 進行上浮操作。
            siftUpComparable(n, e, array);
        else
          	// 進行上浮操作。
            siftUpUsingComparator(n, e, array, cmp);
      	// 實際長度增加1,由于有且僅有一個線程操作隊列,是以這裡并沒有使用原子性操作。
        size = n + 1;
      	// 通知等待的線程,隊列已經有資料,可以擷取資料。
        notEmpty.signal();
    } finally {
      	// 解鎖操作。
        lock.unlock();
    }
  	// 傳回操作成功。
    return true;
}
           
  1. 首先擷取鎖對象,控制有且僅有一個線程操作隊列。
  2. 如果數組實際存儲的内容大小大于等于數組長度時進行擴容操作,調用

    tryGrow

    方法進行擴容。
  3. 使用比較器進行比較,進行上浮操作來建立二叉堆。

tryGrow

是如何進行擴容的呢?

/**
 * Tries to grow array to accommodate at least one more element
 * (but normally expand by about 50%), giving up (allowing retry)
 * on contention (which we expect to be rare). Call only while
 * holding lock.
 *
 * @param array the heap array
 * @param oldCap the length of the array
 */
private void tryGrow(Object[] array, int oldCap) {
  	// 這裡先釋放了鎖,為什麼要釋放鎖呢?詳細請見下面。
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
  	// 這裡allocationSpinLock預設是0,通過CAS來講該值修改為1,也就是同時隻有一個線程進行擴容操作。
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
          	// 如果原容量小于64,就執行原容量+2,如果原容量大于64,擴大一倍容量。
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
          	// 新容量超過了最大容量,将容量調整為最大值。
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
          	// 建立新的數組對象。
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
          	// 擴容成功,則将值修改為0。
            allocationSpinLock = 0;
        }
    }
  	// 這裡是為了其他線程進入後優先讓擴容的線程進行操作。
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
  	// 加鎖操作。
    lock.lock();
  	// 将原資料拷貝到新數組中。
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
           

看似擴容還是很容易了解的,開始的時候為什麼要先釋放鎖呢,然後用CAS控制隻有一個線程可以擴容呢?其實可以不釋放鎖,也能控制隻允許一個線程進行擴容操作,但是會産生性能問題,因為當我擴容的時候,一直獲得鎖會不允許其他線程進行入隊和出隊操作,擴容的時候先釋放鎖,也就是可以其他線程進行處理出隊和入隊操作,例如第一個線程先CAS成功,并且進行擴容中,此時第二個線程進入後allocationSpinLock此時已經等于1了,也就是不會進行擴容操作,而是會直接進入到判斷newArray==null,此時線程二發現newArray為null,線程而會調用Thread.yield來讓出CPU,目的是讓擴容的線程擴容後優先調用lock.lock重新獲得鎖,但是這又不能完全保證,有可能yield退出了擴容還沒有結束,此時線程二獲得鎖,如果目前數組擴容沒有完畢,則線程二會再次調用offer的tryGrow進行擴容操作,再次給擴容線程讓出了鎖,再次調用yield讓出CPU。當擴容線程進行擴容時,其他線程自旋的檢測目前線程是否成功,成功了才會進行入隊的操作。

擴容成功後就需要建構二叉堆,建構二叉堆調用的是

siftUpComparable

方法,也就是上浮操作,接下來詳細講解一下上浮操作是如何進行的?

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
      	// 找到節點k的父節點。
        int parent = (k - 1) >>> 1;
      	// 擷取父節點的值。
        Object e = array[parent];
      	// 比較父節點和插入值的大小,如果插入值大于父節點直接插入到末尾。
        if (key.compareTo((T) e) >= 0)
            break;
      	// 将父節點設定到子節點,小資料進行上浮操作。
        array[k] = e;
      	// 将父節點的下标設定給k。
        k = parent;
    }
  	// 最後key上浮到k的位置。
    array[k] = key;
}

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}
           

為了能夠示範算法的整個過程,這裡舉例來說明一下:

為了示範擴容的過程我們先初始化隊列長度為2,後面會進行擴容操作。

  1. 當我們調用offer(3)時,此時k=n=size=0,x是我們要插入的内容,array是二叉堆數組,當我們插入第一個元素是,發現k>0是不成立的,是以直接運作 array[k] = key,此時二叉堆數組隻有一個元素3,如下圖所示:
    淺析PriorityBlockingQueue優先級隊列原理
  2. 第二次調用offer(5),此時k=n=size=1實際存儲長度為1,運作(k-1)>>>1尋找父節點,k-1=0,無符号向右側移動一位,就相當于是$(k-1)/2$,和開始介紹二叉堆特點的時候尋找父節點是一樣的,此時要插入的節點5的父節點是3,需要進行比較3和5的大小,發現父節點小于要插入的節點,是以執行break退出循環,執行array[k] = key操作,将節點5插入到父節點3下面,并且size進行增加1,此時size=2,如下所示:
    淺析PriorityBlockingQueue優先級隊列原理
  3. 當第三次調用offer(9)時,此時k=n=size=2,發現

    (n = size) >= (cap = (array = queue).length)

    實際長度與數組的長度相等,此時進行擴容操作,通過上述源碼分析得到,oldCap+(oldCap+2)=2+(2+2)=6,數組的長度從長度為2擴容到長度為6,将原有數組内容指派到新的數組中,擴容之後進入到

    上浮

    操作進行入隊操作,其實怎麼了解呢?可以了解為我們将元素9插入到數組最後一個位置,也就是隊列的最後一個位置。
    淺析PriorityBlockingQueue優先級隊列原理
    然後9這個元素需要找到它的父節點,那就是3,也即是(k-1)>>>1=(2-1)>>>1得到下标為0,array[0]=3,元素9的父節點是3,比較父節點和子節點大小,發現3<9位置不需要進行變動,則二叉堆就變成如下内容,size進行加1,size=3。
    淺析PriorityBlockingQueue優先級隊列原理
  4. 第四次調用offer(2)時,此時k=n=size=3,還是按照上面意思将元素2暫時插入到數組的末尾,然後進行上浮操作,虛線的意思就是告訴你我現在還不一定在不在這裡,我要和我的父親比較下到底誰大,如果父節點大那我隻能乖乖在這裡喽,如果父節點小,不好意思我得往上浮動了,接下來看一上浮動的過程。
    淺析PriorityBlockingQueue優先級隊列原理
    k=3,元素2的父節點=(k-1)>>>1=2>>>1等于1,也就是array[1]的元素5是元素2的父節點,通過上面的二叉圖也能夠清晰看到元素5是父節點,比較發現2<5,執行array[k] = e,array[3]=5,也就是說将5的位置進行下沉操作,這裡源代碼并沒發現2上浮操作,但是在下一次比較中又用到了我們的元素2進行比較,其實作在樹的結構相當于如下所示:
    淺析PriorityBlockingQueue優先級隊列原理
    發現剛開始元素2在最後節點,現在被替換成了元素5,這就意味着元素2的位置變到了之前元素5的位置,源碼中的k = parent,此時元素2的下标已經變到k=1,但是數組裡面的内容并沒有變,隻是元素下标上浮操作,上浮到父節點,相當于元素2就在元素5的位置,隻是下标1的位置元素内容并沒有直接替換成元素2僅此而已。接下來還會進行循環,數組下标1的元素的父節點是元素3,這裡就不計算了,因為上面第二步計算過,此時發現2<3,需要将元素3的内容進行下沉操作,元素2的下标進行上浮操作。
    淺析PriorityBlockingQueue優先級隊列原理
    此時下标2的元素移動到父節點下标0的位置,發現k<0,則本次循環結束,将array[0]替換成元素2,本次上浮操作結束。
    淺析PriorityBlockingQueue優先級隊列原理
    由此可見,次堆為最小二叉堆,當出隊操作時彈出的是最小的元素。

poll操作

poll操作就是将隊列内部二叉堆的堆頂元素出隊操作,如果隊列為空則傳回null。如下是poll的源碼:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
           
  1. 首先擷取鎖對象,進行上鎖操作,有且僅有一個線程出隊和入隊操作
  2. 獲得到鎖對象之後進行出隊操作,調用

    dequeue

    方法
  3. 最後釋放鎖對象

接下來看一下出隊操作是如何進行的,

/**
 * Mechanics for poll().  Call only while holding lock.
 */
private E dequeue() {
  	// 數組的元素的個數。
    int n = size - 1;
  	// 如果數組中不存在元素則直接傳回null。
    if (n < 0)
        return null;
    else {
      	// 擷取隊列數組。
        Object[] array = queue;
      	// 将第一個元素也就是二叉堆的根結點堆頂元素作為傳回結果。
        E result = (E) array[0];
      	// 擷取數組中最後一個元素。
        E x = (E) array[n];
      	// 将最後一個元素設定為null。
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
          	// 進行下沉操作。
            siftDownComparable(0, x, array, n);
        else
          	// 進行下沉操作。
            siftDownUsingComparator(0, x, array, n, cmp);
      	// 實際元素大小減少1.
        size = n;
      	// 傳回結果。
        return result;
    }
}
           

通過源碼可以看到,出隊的内容就是二叉堆的堆頂元素arrra[0],而後面它有取到的完全二叉樹的最後一個節點,将最後一個節點設定為null,然後在調用了siftDownComparable對堆進行了調整動作,看一下這個方法的具體實作内容,然後再結合上面入隊的内容進行講解出隊是如何保證二叉堆平衡的。

private static <T> void siftDownComparable(int k, T x, Object[] array,                                           int n) {    if (n > 0) {        Comparable<? super T> key = (Comparable<? super T>)x;      	// 最後一個節點的父節點,也就是代表到這裡之後就結束了。        int half = n >>> 1;           // loop while a non-leaf        while (k < half) {          	// child=2n+1代表leftChild左節點。            int child = (k << 1) + 1; // assume left child is least          	// 擷取左節點的值。            Object c = array[child];          	// 擷取右節點=2n+1+1=2n+2的坐标            int right = child + 1;          	// 如果右側節點坐标小于數組中最有一個元素,并且右節點值小于左側節點值,則将c設定為右側節點值。            if (right < n &&                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)                c = array[child = right];          	// 對比c和目前最後一個元素的大小。            if (key.compareTo((T) c) <= 0)                break;          	// 将坐标k位置設定為比較的後的值。            array[k] = c;          	// 将光标移動到替換的節點上。            k = child;        }      	// 最後将元素最後一個值指派到k的位置。        array[k] = key;    }}private static <T> void siftDownUsingComparator(int k, T x, Object[] array,                                                int n,                                                Comparator<? super T> cmp) {    if (n > 0) {        int half = n >>> 1;        while (k < half) {            int child = (k << 1) + 1;            Object c = array[child];            int right = child + 1;            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)                c = array[child = right];            if (cmp.compare(x, (T) c) <= 0)                break;            array[k] = c;            k = child;        }        array[k] = x;    }}
           

看了源碼其實還是有點蒙的,因為根本沒有辦法想象到是如何實作的,接下來跟着思路一步一步的分析,上面offer時最後二叉堆的情況如下圖所示:

淺析PriorityBlockingQueue優先級隊列原理
  1. 先調用一次poll操作,此時size=4,result=array[0]=2,n=size-1=3,x=array[n]=array[3]=5,将array[3]位置設定為null,就變成如下的二叉堆
    淺析PriorityBlockingQueue優先級隊列原理
    數組中最後一個元素相當于直接被設定為空了,從二叉堆中移除掉了,移除掉了放在那裡呢?其實這裡大家可以了解為放在堆頂,原因是什麼呢?我們想一下,堆頂元素相當于是要被出隊操作,元素2已經出隊了,但是沒有堆頂元素了,此時需要怎麼操作呢?直接從數組中找到完全二叉樹葉子節點最後一個節點,将最後一個節點設定到堆頂,然後将堆頂元素進行下沉操作,但這裡源碼中并沒有實際設定元素5到堆頂,而是經過比較的過程将元素5從堆頂進行下沉的操作,接下來一步一步分析,目前可以将堆看做如下:
    淺析PriorityBlockingQueue優先級隊列原理
    然後通過源碼可以看到它是現将堆頂也就是元素2的左右節點相比較,比較3<9是以最小節點c=3,然後再跟元素5(可以看做現在是在堆頂)進行比較3<5,發現元素5大于元素3,将元素3的位置替換原堆頂的元素2(此時我們可以完全可以看成元素5的位置也再堆頂,其實就是替換元素5的位置内容),然後元素5的下标從0調整到原來元素3下标1的位置,這個動作我們稱之為下沉操作,下沉過程中并沒有進行内容交換,隻是坐标進行下沉操作了,此時二叉堆内容如下所示:
    淺析PriorityBlockingQueue優先級隊列原理
    half=3>>>1=1,k的下标在這個時候已經變為1,進行k<half時發現等于false,本次循環結束,将下标1位置替換真實下沉的内容 array[k] = key=array[1]=5,此時二叉堆内容如下所示:
淺析PriorityBlockingQueue優先級隊列原理

最後将原調整前的result傳回。

put操作

put的操作内部其實就是調用了offer操作,由于是***數組可以進行擴充,是以不需要進行阻塞操作。

public void put(E e) {   
  offer(e); // never need to block
}
           

take操作

take操作當隊列為空時進行阻塞操作,當隊列不為空調用offer操作時會調用

notEmpty.signal();

通知等待的線程,隊列中已經有資料了,可以繼續擷取資料了,源碼如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  	// 調用的事可以相應中斷。
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
          	// 如果隊列為空時等待。
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
           

總結

  1. PriorityBlockingQueue是一個阻塞隊列,繼承自BlockingQueue。
  2. PriorityBlockingQueue的元素必須實作Comparable接口。
  3. PriorityBlockingQueue内部實作原理是基于二叉堆來實作的,二叉堆的實作是基于數組來實作的。
  4. PriorityBlockingQueue是一個***數組,插入值時不需要進行阻塞操作,擷取值如果隊列為空時阻塞線程擷取值。

繼續閱讀