天天看点

PriorityQueue PriorityBlockingQueue 优先级队列与优先级阻塞队列 源码解析

文章目录

    • 先看结构
    • 继承关系
    • 基本使用
    • 核心属性
    • 构造方法
    • 核心方法 boolean offer(E e) 插入元素
    • 核心方法 siftUp(int k, E x)
    • 核心方法 E poll()
    • 核心方法 oid siftDown(int k, E x)
    • 核心方法扩容 boolean offer(E e)
    • 常见使用场景
    • PriorityBlockingQueue 阻塞的优先队列
    • 继承结构
    • 核心属性
    • 核心方法
    • 核心方法E take()
    • 总结

试想一下,如果要在一堆数据中找出最大的或者最小的是不要进行一次遍历,时间复杂度为O(N)

还能不能优化,比如O(1)的时间复杂度就能获取到。PriorityQueue优先级队列实现了这个功能

先看结构

完全二叉树:一棵有n个结点的二叉树,对树中的结点按从上至下、从左到右的顺序进行编号,节点得插入永远在下一个位置。

下面得紫色的那排小字就是节点的序号。那么N号元素的左孩子queue [2 * n + 1]右孩子就是queue [2 *(n + 1)](右孩子比左孩子多1 )。完全二叉树可以使用数组来进行元素存储。这个结构也叫小顶堆,最小的元素在堆顶。

PriorityQueue PriorityBlockingQueue 优先级队列与优先级阻塞队列 源码解析

继承关系

PriorityQueue PriorityBlockingQueue 优先级队列与优先级阻塞队列 源码解析

很明显是个队列那么必然有add,remove,offer,poll,peek等方法。

基本使用

public static void main(String[] args) {
    Queue<Integer> pq = new PriorityQueue<>();
    for (int i = 0; i < 20; i++) {
        pq.offer(i);
    }
    System.out.println(pq.peek());
    System.out.println(pq.poll());
    System.out.println(pq.peek());
}
           

控制台输出,peek查询堆顶上的元素,poll获取堆顶上的元素,并且移除这个元素。可以看到堆顶的元素是0。

1

核心属性

  • transient Object[] queue;

    表示为平衡二进制堆的优先级队列:queue [n]的两个子级是queue [2 * n + 1]和queue [2 *(n + 1)]。

    如果比较器为null,则优先级队列元素的自然排序来排序,不为Null按比较器的结果存放

    队列非空,则优先级最高的在queue [0]中

  • private int size = 0;

    优先级队列中得元素个数

  • final Comparator<? super E> comparator;

    比较器,基于比较器去比较元素之间得优先级

  • transient int modCount = 0;

    用于快速失败得一个操作计数器,说明该集合不使用于多线程情景

构造方法

构造方法挺多的,一个个看

//默认构造方法,数组的容量默认为11,比较器为空。使用自然排序
public PriorityQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
//指定容量的的构造方法,当可能存在较多元素时,避免多次扩容可以指定相近的元素
public PriorityQueue(int initialCapacity) {
    this(initialCapacity, null);
}
//默认容量,指定比较器
public PriorityQueue(Comparator<? super E> comparator) {
    this(DEFAULT_INITIAL_CAPACITY, comparator);
}
//指定比较器,又指定容量。
public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        //创建存放元素的数组
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
}
/**
* 包含给定元素的优先级队列。如果指定的集合原先是存在排序关系的(SortedSet,PriorityQueue)则根据原顺序进行排序。 
* 否则根据元素的自然排序对该优先级队列进行排序。
* 如果指定集合的​​元素不能相互比较,则抛出ClassCastException 
* 如果指定集合根据优先级不能相互比较队列的顺序,则抛出NullPointerException或
*/
```java
public PriorityQueue(Collection<? extends E> c) {
    //SortedSet PriorityQueue 这两个类型的处理
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        initElementsFromCollection(ss);
    } else if (c instanceof PriorityQueue<?>) {
        PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        initFromPriorityQueue(pq);
    } else {
        //按自然排序处理
        this.comparator = null;
        initFromCollection(c);
    }
}
           

SortedSet的情况:先赋值比较器,SortedSet内部也维护了一个比较器

做些常规校验,并且列表中不能有空的元素,直接赋值数组以及比较器。

private void initElementsFromCollection(Collection<? extends E> c) {
    Object[] a = c.toArray();
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, a.length, Object[].class);
    int len = a.length;
    if (len == 1 || this.comparator != null)
        for (int i = 0; i < len; i++)
            if (a[i] == null)
                throw new NullPointerException();
    this.queue = a;
    this.size = a.length;
}
           

优先级队列的情况,也是赋值数组,比较器

private void initFromPriorityQueue(PriorityQueue<? extends E> c) {
    if (c.getClass() == PriorityQueue.class) {
        this.queue = c.toArray();
        this.size = c.size();
    } else {
        initFromCollection(c);
    }
}
           

按自然顺序排序处理,调用了initElementsFromCollection(上文的方法),因为是Collection列表不一定有序。所以需要heapify调整一下元素位置。siftDown后续单独看。

private void initFromCollection(Collection<? extends E> c) {
    initElementsFromCollection(c);
    heapify();
}
private void heapify() {
    for (int i = (size >>> 1) - 1; i >= 0; i--)
        siftDown(i, (E) queue[i]);
}
           

核心方法 boolean offer(E e) 插入元素

优先队列在插入元素时就会将各元素与父节点进行比较,使得父节点的优先级是永远大于子节点的。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    //大于存放元素得数组时,进行扩容
    if (i >= queue.length)
        grow(i + 1);
    //已存元素个数加1
    size = i + 1;
    //如果此时堆中不存在元素,将元素放第一个位置
    if (i == 0)
        queue[0] = e;
    else
        //往数组插入元素
        siftUp(i, e);
    return true;
}
           

核心方法 siftUp(int k, E x)

在位置k插入项x,通过将树x提升到大于或等于其父节点或成为根,从而保持堆不变。

说的通俗一点就是你在某个位置要插入元素了,那么我会把这个元素跟他的父节点去比较,如果大于父节点就和父节点交换,以此类推,直到不大于父节点。

PriorityQueue PriorityBlockingQueue 优先级队列与优先级阻塞队列 源码解析
//K是存放在数组中的位置,e是存放的元素
private void siftUp(int k, E x) {
    if (comparator != null)
        //有自定义得比较器
        siftUpUsingComparator(k, x);
    else
        //默认比较器
        siftUpComparable(k, x);
}
           

默认比较器

先计算出父节点的位置,

private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    //一直比较直到,找到优先级比他大的父节点
    //或者一直到根节点 
    while (k > 0) {
        //定位父节点的位置:queue [n]的两个子级是queue [2 * n + 1]和queue [2 *(n + 1)]
        //定位到父节点的
        int parent = (k - 1) >>> 1;
        //获取父节点
        Object e = queue[parent];
        //如果父节点大于等于该节点退出循环
        if (key.compareTo((E) e) >= 0)
            break;
        //如果父节点小于该节点则交换
        queue[k] = e;
        //新插入的节点的下标的索引变为父节点的
        k = parent;
    }
    //根据记录的下标节点存放元素
    queue[k] = key;
}
           

自定义比较器

private void siftUpUsingComparator(int k, E x) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        //与上述代码唯一不同点,使用了自定义的比较器
        if (comparator.compare(x, (E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = x;
}
           

核心方法 E poll()

获取优先级最大的元素,并且移除该元素。这个元素就是堆顶元素,但是移除完元素后还要进一步调整树的结构

PriorityQueue PriorityBlockingQueue 优先级队列与优先级阻塞队列 源码解析
public E poll() {
    if (size == 0)
        return null;
    int s = --size;
    modCount++;
    //获取堆顶元素
    E result = (E) queue[0];
    //获取数组最后一个元素,用于从上至下调整结构
    E x = (E) queue[s];
    //最后一个元素的位置置为Null
    queue[s] = null;
    if (s != 0)
        //开始调整树结构,将最末尾元素,插入到堆顶,然后开始从上至下调整
        siftDown(0, x);
    return result;
}
           

核心方法 oid siftDown(int k, E x)

在位置k插入项x,通过反复将x降级到树上小于或等于的节点的子节点,从而保持堆不变。

将节点和它的子节点进行比较调整,保证它比它所有的子节点都要小。这个调整的顺序是从当前节点向下,一直调整到叶节点。

private void siftDown(int k, E x) {
    //两者的区别知识比较器的使用不同。
    if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}
           
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    // 容量的一半
    int half = size >>> 1;
    // 叶子节点不用比较,因为优先级必定小于父节点
    // 每次比较完K都等于优先级最高的孩子的下标
    while (k < half) {
        //左孩子的下标
        int child = (k << 1) + 1; // assume left child is least
        //假设左孩子优先级最高
        Object c = queue[child];
        //右孩子的下标
        int right = child + 1;
        // 如果右孩子的优先级大于左孩子
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            //优先级最高的换为右孩子
            c = queue[child = right];
        // 如果传进来的元素(末尾元素),优先级高于子节点,直接退出循环
        if (key.compareTo((E) c) <= 0)
            break;
        // 父节点和优先级最高的孩子节点交换
        // 将变为父节点的节点的位置记录下来,用于下次寻找子节点。
        queue[k] = c;
        k = child;
    }
    // 如此往复,当退出节点时
    // K记录的就是节点的位置
    queue[k] = key;
}
           

核心方法扩容 boolean offer(E e)

在插入元素时size等于数组容量时触发扩容

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    //在插入元素时size等于数组容量时触发扩容
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);
    return true;
}
           
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    //如果当前容量小于64 扩容为2倍+2
    //大于64 扩容为1.5倍
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    //当扩容后的长度超过最大的处理
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    //数组元素迁移到新的数组扩容后的数组中
    queue = Arrays.copyOf(queue, newCapacity);
}
           
private static int hugeCapacity(int minCapacity) {
        if (minCapacity < 0) // overflow
            throw new OutOfMemoryError();
        //如果原size+1 大于最大的可分配的长度 新长度为2^31 -1
        //否则为Integer.MAX_VALUE - 8(原文的解释:一些虚拟机在数组中保留一些标题字。尝试分配更大的阵列可能会导致 OutOfMemoryError)
        return (minCapacity > MAX_ARRAY_SIZE) ?
            Integer.MAX_VALUE :
            MAX_ARRAY_SIZE;
    }
           

常见使用场景

  • 找到第K大的元素

    Leetcode215题:在未排序的数组中找到第 k个最大的元素。poll几次之后堆顶就是第K大的

  • 优先任务处理

    一批任务中优先处理VIP等级高的优先处理等

PriorityBlockingQueue 阻塞的优先队列

继承结构

PriorityQueue PriorityBlockingQueue 优先级队列与优先级阻塞队列 源码解析

相比于PriorityQueue,优先级阻塞队列实现了BlockingQueue。需要实现put,take方法。

核心属性

  • private transient Object[] queue;
  • private transient int size;
  • private transient Comparator<? super E> comparator;

    以上都是和非阻塞的队列一样,主要是多了以下三个属性

  • private final ReentrantLock lock;

    可重入锁

  • private final Condition notEmpty;

    提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。

  • private transient volatile int allocationSpinLock;

    是否正在扩容的一个标志,注意这个变量是volatile的

核心方法

阻塞队列一般在put()和take()时,都需要根据满或空来进行阻塞。但是无界队列在put时不需要阻塞。虽然在构造参数时可以指容量,但是那只是初始化容量,当容量不够用了会自动进行扩容。

public void put(E e) {
    offer(e);
}
           

初略来看相比非阻塞,在操作前加了一把冲入锁。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    int n, cap;
    Object[] array;
    // 尝试扩容,为什么是尝试呢
    // 尝试扩容的过程有可能有其他线程在扩容导致失败,让出CPU的使用
    // 被系统调度后 会再判断是否需要扩容,直到扩容被某一个线程完成
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            //默认自然排序比较器(与PriorityQueue一致)
            siftUpComparable(n, e, array);
        else
            //自定义比较器
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        //put进元素了,唤醒消费线程进行消费
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
           

尝试扩容,一进入方法,发现他把锁释放了(并且注释:必须释放然后重新获取主锁),至于为什么接着往下看。判断allocationSpinLock等于0,如果等于0就CAS为1.看到这里就很熟悉了allocationSpinLock属性被volatile。allocationSpinLock为1,或者CAS失败代表已经有线程在扩容了, 直接调用Thread.yield()让出CPU是线程从运行状态转为就绪状态(跟ConcurrentHashMap的初始化数组过程相似)。最后又竞争到锁,将数组迁移到新数组中扩容完成。

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    //allocationSpinLock为1说明有线程正在扩容,为0戴白目前没有线程在扩容
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
        try {
            //扩容操作与非阻塞的一致不再赘述
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            //扩容结束,allocationSpinLock 置为0
            allocationSpinLock = 0;
        }
    }
    // allocationSpinLock 为1 或者CAS失败 已经有线程在扩容了,让出CPU使用权
    if (newArray == null)
        Thread.yield();
    // 重新竞争锁
    // 如果扩容是当前线程扩容的,需要做赋值操作
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        //内容拷贝
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
           

核心方法E take()

可以看到take与put方法使用的同一把锁。在扩容时put释放了锁,使用CAS控制volatile变量来同步线程之间的扩容(保证只有一个线程在做扩容操作)。这样在扩容期间还未扩容结束时take还是可以工作(数组重新赋值扩容后的数组需要重新竞争锁),提高了吞吐量。等扩容完又竞争锁完成put操作,保证线程安全。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lockInterruptibly();
    E result;
    try {
        //没获取到元素就阻塞,线程进入等待状态,等待线程被唤醒
        //出队方法与非阻塞的一致,不再赘述
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
           

总结

  • PriorityQueue优先级队列使用完全二叉树(小顶堆)的数据结构实现,其存储结构是数组。由于是完全二叉树又有那么第N个元素的左孩子queue [2 * n +1]右孩子就是queue [2 *(n + 1)]。
  • 每次在放入元素时先将元素放置在数组末尾,再进行向上调整,优先级大于父节点则与父节点交换,直至根节点。
  • 获取优先级最高的元素,就是获取数组中下标为0的元素,如果要移除堆顶元素,会比较两孩子的优先级,优先级高的调整为根节点,直至叶子节点,最后将末尾元素移动到最后一次升为中间节点的位置上。
  • 优先级阻塞队列,其阻塞主要体现在take方法上,因为优先队列是无界队列(Integer.MAX_VALUE -8),当数组容量不够用时会进行扩容操作。使用了一把可重入锁来实现阻塞,当take元素为空是会调用Condition.await()方法让线程进入等待状态并释放锁,等待put线程将元素插入队列并唤醒take线程。
  • 因为扩容相对耗时,如果在put时如果需要扩容,会释放锁使用CAS控制allocationSpinLock变量来保证线程安全,扩容的同时是又不影响take线程获取元素(扩容的线程在做数据赋值需要重新获得锁),提高并发量。

继续阅读