天天看點

PriorityQueue PriorityBlockingQueue 優先級隊列與優先級阻塞隊列 源碼解析

文章目錄

    • 先看結構
    • 繼承關系
    • 基本使用
    • 核心屬性
    • 構造方法
    • 核心方法 boolean offer(E e) 插入元素
    • 核心方法 siftUp(int k, E x)
    • 核心方法 E poll()
    • 核心方法 oid siftDown(int k, E x)
    • 核心方法擴容 boolean offer(E e)
    • 常見使用場景
    • PriorityBlockingQueue 阻塞的優先隊列
    • 繼承結構
    • 核心屬性
    • 核心方法
    • 核心方法E take()
    • 總結

試想一下,如果要在一堆資料中找出最大的或者最小的是不要進行一次周遊,時間複雜度為O(N)

還能不能優化,比如O(1)的時間複雜度就能擷取到。PriorityQueue優先級隊列實作了這個功能

先看結構

完全二叉樹:一棵有n個結點的二叉樹,對樹中的結點按從上至下、從左到右的順序進行編号,節點得插入永遠在下一個位置。

下面得紫色的那排小字就是節點的序号。那麼N号元素的左孩子queue [2 * n + 1]右孩子就是queue [2 *(n + 1)](右孩子比左孩子多1 )。完全二叉樹可以使用數組來進行元素存儲。這個結構也叫小頂堆,最小的元素在堆頂。

PriorityQueue PriorityBlockingQueue 優先級隊列與優先級阻塞隊列 源碼解析

繼承關系

PriorityQueue PriorityBlockingQueue 優先級隊列與優先級阻塞隊列 源碼解析

很明顯是個隊列那麼必然有add,remove,offer,poll,peek等方法。

基本使用

public static void main(String[] args) {
    Queue<Integer> pq = new PriorityQueue<>();
    for (int i = 0; i < 20; i++) {
        pq.offer(i);
    }
    System.out.println(pq.peek());
    System.out.println(pq.poll());
    System.out.println(pq.peek());
}
           

控制台輸出,peek查詢堆頂上的元素,poll擷取堆頂上的元素,并且移除這個元素。可以看到堆頂的元素是0。

1

核心屬性

  • transient Object[] queue;

    表示為平衡二進制堆的優先級隊列:queue [n]的兩個子級是queue [2 * n + 1]和queue [2 *(n + 1)]。

    如果比較器為null,則優先級隊列元素的自然排序來排序,不為Null按比較器的結果存放

    隊列非空,則優先級最高的在queue [0]中

  • private int size = 0;

    優先級隊列中得元素個數

  • final Comparator<? super E> comparator;

    比較器,基于比較器去比較元素之間得優先級

  • transient int modCount = 0;

    用于快速失敗得一個操作計數器,說明該集合不使用于多線程情景

構造方法

構造方法挺多的,一個個看

//預設構造方法,數組的容量預設為11,比較器為空。使用自然排序
public PriorityQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
//指定容量的的構造方法,當可能存在較多元素時,避免多次擴容可以指定相近的元素
public PriorityQueue(int initialCapacity) {
    this(initialCapacity, null);
}
//預設容量,指定比較器
public PriorityQueue(Comparator<? super E> comparator) {
    this(DEFAULT_INITIAL_CAPACITY, comparator);
}
//指定比較器,又指定容量。
public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        //建立存放元素的數組
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
}
/**
* 包含給定元素的優先級隊列。如果指定的集合原先是存在排序關系的(SortedSet,PriorityQueue)則根據原順序進行排序。 
* 否則根據元素的自然排序對該優先級隊列進行排序。
* 如果指定集合的​​元素不能互相比較,則抛出ClassCastException 
* 如果指定集合根據優先級不能互相比較隊列的順序,則抛出NullPointerException或
*/
```java
public PriorityQueue(Collection<? extends E> c) {
    //SortedSet PriorityQueue 這兩個類型的處理
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        initElementsFromCollection(ss);
    } else if (c instanceof PriorityQueue<?>) {
        PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        initFromPriorityQueue(pq);
    } else {
        //按自然排序處理
        this.comparator = null;
        initFromCollection(c);
    }
}
           

SortedSet的情況:先指派比較器,SortedSet内部也維護了一個比較器

做些正常校驗,并且清單中不能有空的元素,直接指派數組以及比較器。

private void initElementsFromCollection(Collection<? extends E> c) {
    Object[] a = c.toArray();
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, a.length, Object[].class);
    int len = a.length;
    if (len == 1 || this.comparator != null)
        for (int i = 0; i < len; i++)
            if (a[i] == null)
                throw new NullPointerException();
    this.queue = a;
    this.size = a.length;
}
           

優先級隊列的情況,也是指派數組,比較器

private void initFromPriorityQueue(PriorityQueue<? extends E> c) {
    if (c.getClass() == PriorityQueue.class) {
        this.queue = c.toArray();
        this.size = c.size();
    } else {
        initFromCollection(c);
    }
}
           

按自然順序排序處理,調用了initElementsFromCollection(上文的方法),因為是Collection清單不一定有序。是以需要heapify調整一下元素位置。siftDown後續單獨看。

private void initFromCollection(Collection<? extends E> c) {
    initElementsFromCollection(c);
    heapify();
}
private void heapify() {
    for (int i = (size >>> 1) - 1; i >= 0; i--)
        siftDown(i, (E) queue[i]);
}
           

核心方法 boolean offer(E e) 插入元素

優先隊列在插入元素時就會将各元素與父節點進行比較,使得父節點的優先級是永遠大于子節點的。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    //大于存放元素得數組時,進行擴容
    if (i >= queue.length)
        grow(i + 1);
    //已存元素個數加1
    size = i + 1;
    //如果此時堆中不存在元素,将元素放第一個位置
    if (i == 0)
        queue[0] = e;
    else
        //往數組插入元素
        siftUp(i, e);
    return true;
}
           

核心方法 siftUp(int k, E x)

在位置k插入項x,通過将樹x提升到大于或等于其父節點或成為根,進而保持堆不變。

說的通俗一點就是你在某個位置要插入元素了,那麼我會把這個元素跟他的父節點去比較,如果大于父節點就和父節點交換,以此類推,直到不大于父節點。

PriorityQueue PriorityBlockingQueue 優先級隊列與優先級阻塞隊列 源碼解析
//K是存放在數組中的位置,e是存放的元素
private void siftUp(int k, E x) {
    if (comparator != null)
        //有自定義得比較器
        siftUpUsingComparator(k, x);
    else
        //預設比較器
        siftUpComparable(k, x);
}
           

預設比較器

先計算出父節點的位置,

private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    //一直比較直到,找到優先級比他大的父節點
    //或者一直到根節點 
    while (k > 0) {
        //定位父節點的位置:queue [n]的兩個子級是queue [2 * n + 1]和queue [2 *(n + 1)]
        //定位到父節點的
        int parent = (k - 1) >>> 1;
        //擷取父節點
        Object e = queue[parent];
        //如果父節點大于等于該節點退出循環
        if (key.compareTo((E) e) >= 0)
            break;
        //如果父節點小于該節點則交換
        queue[k] = e;
        //新插入的節點的下标的索引變為父節點的
        k = parent;
    }
    //根據記錄的下标節點存放元素
    queue[k] = key;
}
           

自定義比較器

private void siftUpUsingComparator(int k, E x) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        //與上述代碼唯一不同點,使用了自定義的比較器
        if (comparator.compare(x, (E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = x;
}
           

核心方法 E poll()

擷取優先級最大的元素,并且移除該元素。這個元素就是堆頂元素,但是移除完元素後還要進一步調整樹的結構

PriorityQueue PriorityBlockingQueue 優先級隊列與優先級阻塞隊列 源碼解析
public E poll() {
    if (size == 0)
        return null;
    int s = --size;
    modCount++;
    //擷取堆頂元素
    E result = (E) queue[0];
    //擷取數組最後一個元素,用于從上至下調整結構
    E x = (E) queue[s];
    //最後一個元素的位置置為Null
    queue[s] = null;
    if (s != 0)
        //開始調整樹結構,将最末尾元素,插入到堆頂,然後開始從上至下調整
        siftDown(0, x);
    return result;
}
           

核心方法 oid siftDown(int k, E x)

在位置k插入項x,通過反複将x降級到樹上小于或等于的節點的子節點,進而保持堆不變。

将節點和它的子節點進行比較調整,保證它比它所有的子節點都要小。這個調整的順序是從目前節點向下,一直調整到葉節點。

private void siftDown(int k, E x) {
    //兩者的差別知識比較器的使用不同。
    if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}
           
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    // 容量的一半
    int half = size >>> 1;
    // 葉子節點不用比較,因為優先級必定小于父節點
    // 每次比較完K都等于優先級最高的孩子的下标
    while (k < half) {
        //左孩子的下标
        int child = (k << 1) + 1; // assume left child is least
        //假設左孩子優先級最高
        Object c = queue[child];
        //右孩子的下标
        int right = child + 1;
        // 如果右孩子的優先級大于左孩子
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            //優先級最高的換為右孩子
            c = queue[child = right];
        // 如果傳進來的元素(末尾元素),優先級高于子節點,直接退出循環
        if (key.compareTo((E) c) <= 0)
            break;
        // 父節點和優先級最高的孩子節點交換
        // 将變為父節點的節點的位置記錄下來,用于下次尋找子節點。
        queue[k] = c;
        k = child;
    }
    // 如此往複,當退出節點時
    // K記錄的就是節點的位置
    queue[k] = key;
}
           

核心方法擴容 boolean offer(E e)

在插入元素時size等于數組容量時觸發擴容

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    //在插入元素時size等于數組容量時觸發擴容
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);
    return true;
}
           
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    //如果目前容量小于64 擴容為2倍+2
    //大于64 擴容為1.5倍
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    //當擴容後的長度超過最大的處理
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    //數組元素遷移到新的數組擴容後的數組中
    queue = Arrays.copyOf(queue, newCapacity);
}
           
private static int hugeCapacity(int minCapacity) {
        if (minCapacity < 0) // overflow
            throw new OutOfMemoryError();
        //如果原size+1 大于最大的可配置設定的長度 新長度為2^31 -1
        //否則為Integer.MAX_VALUE - 8(原文的解釋:一些虛拟機在數組中保留一些标題字。嘗試配置設定更大的陣列可能會導緻 OutOfMemoryError)
        return (minCapacity > MAX_ARRAY_SIZE) ?
            Integer.MAX_VALUE :
            MAX_ARRAY_SIZE;
    }
           

常見使用場景

  • 找到第K大的元素

    Leetcode215題:在未排序的數組中找到第 k個最大的元素。poll幾次之後堆頂就是第K大的

  • 優先任務處理

    一批任務中優先處理VIP等級高的優先處理等

PriorityBlockingQueue 阻塞的優先隊列

繼承結構

PriorityQueue PriorityBlockingQueue 優先級隊列與優先級阻塞隊列 源碼解析

相比于PriorityQueue,優先級阻塞隊列實作了BlockingQueue。需要實作put,take方法。

核心屬性

  • private transient Object[] queue;
  • private transient int size;
  • private transient Comparator<? super E> comparator;

    以上都是和非阻塞的隊列一樣,主要是多了以下三個屬性

  • private final ReentrantLock lock;

    可重入鎖

  • private final Condition notEmpty;

    提供了類似Object的螢幕方法,與Lock配合可以實作等待/通知模式。

  • private transient volatile int allocationSpinLock;

    是否正在擴容的一個标志,注意這個變量是volatile的

核心方法

阻塞隊列一般在put()和take()時,都需要根據滿或空來進行阻塞。但是無界隊列在put時不需要阻塞。雖然在構造參數時可以指容量,但是那隻是初始化容量,當容量不夠用了會自動進行擴容。

public void put(E e) {
    offer(e);
}
           

初略來看相比非阻塞,在操作前加了一把沖入鎖。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lock();
    int n, cap;
    Object[] array;
    // 嘗試擴容,為什麼是嘗試呢
    // 嘗試擴容的過程有可能有其他線程在擴容導緻失敗,讓出CPU的使用
    // 被系統排程後 會再判斷是否需要擴容,直到擴容被某一個線程完成
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            //預設自然排序比較器(與PriorityQueue一緻)
            siftUpComparable(n, e, array);
        else
            //自定義比較器
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        //put進元素了,喚醒消費線程進行消費
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
           

嘗試擴容,一進入方法,發現他把鎖釋放了(并且注釋:必須釋放然後重新擷取主鎖),至于為什麼接着往下看。判斷allocationSpinLock等于0,如果等于0就CAS為1.看到這裡就很熟悉了allocationSpinLock屬性被volatile。allocationSpinLock為1,或者CAS失敗代表已經有線程在擴容了, 直接調用Thread.yield()讓出CPU是線程從運作狀态轉為就緒狀态(跟ConcurrentHashMap的初始化數組過程相似)。最後又競争到鎖,将數組遷移到新數組中擴容完成。

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    //allocationSpinLock為1說明有線程正在擴容,為0戴白目前沒有線程在擴容
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
        try {
            //擴容操作與非阻塞的一緻不再贅述
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            //擴容結束,allocationSpinLock 置為0
            allocationSpinLock = 0;
        }
    }
    // allocationSpinLock 為1 或者CAS失敗 已經有線程在擴容了,讓出CPU使用權
    if (newArray == null)
        Thread.yield();
    // 重新競争鎖
    // 如果擴容是目前線程擴容的,需要做指派操作
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        //内容拷貝
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
           

核心方法E take()

可以看到take與put方法使用的同一把鎖。在擴容時put釋放了鎖,使用CAS控制volatile變量來同步線程之間的擴容(保證隻有一個線程在做擴容操作)。這樣在擴容期間還未擴容結束時take還是可以工作(數組重新指派擴容後的數組需要重新競争鎖),提高了吞吐量。等擴容完又競争鎖完成put操作,保證線程安全。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lockInterruptibly();
    E result;
    try {
        //沒擷取到元素就阻塞,線程進入等待狀态,等待線程被喚醒
        //出隊方法與非阻塞的一緻,不再贅述
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
           

總結

  • PriorityQueue優先級隊列使用完全二叉樹(小頂堆)的資料結構實作,其存儲結構是數組。由于是完全二叉樹又有那麼第N個元素的左孩子queue [2 * n +1]右孩子就是queue [2 *(n + 1)]。
  • 每次在放入元素時先将元素放置在數組末尾,再進行向上調整,優先級大于父節點則與父節點交換,直至根節點。
  • 擷取優先級最高的元素,就是擷取數組中下标為0的元素,如果要移除堆頂元素,會比較兩孩子的優先級,優先級高的調整為根節點,直至葉子節點,最後将末尾元素移動到最後一次升為中間節點的位置上。
  • 優先級阻塞隊列,其阻塞主要展現在take方法上,因為優先隊列是無界隊列(Integer.MAX_VALUE -8),當數組容量不夠用時會進行擴容操作。使用了一把可重入鎖來實作阻塞,當take元素為空是會調用Condition.await()方法讓線程進入等待狀态并釋放鎖,等待put線程将元素插入隊列并喚醒take線程。
  • 因為擴容相對耗時,如果在put時如果需要擴容,會釋放鎖使用CAS控制allocationSpinLock變量來保證線程安全,擴容的同時是又不影響take線程擷取元素(擴容的線程在做資料指派需要重新獲得鎖),提高并發量。

繼續閱讀