文章目录
-
- 先看结构
- 继承关系
- 基本使用
- 核心属性
- 构造方法
- 核心方法 boolean offer(E e) 插入元素
- 核心方法 siftUp(int k, E x)
- 核心方法 E poll()
- 核心方法 oid siftDown(int k, E x)
- 核心方法扩容 boolean offer(E e)
- 常见使用场景
- PriorityBlockingQueue 阻塞的优先队列
- 继承结构
- 核心属性
- 核心方法
- 核心方法E take()
- 总结
试想一下,如果要在一堆数据中找出最大的或者最小的是不要进行一次遍历,时间复杂度为O(N)
还能不能优化,比如O(1)的时间复杂度就能获取到。PriorityQueue优先级队列实现了这个功能
先看结构
完全二叉树:一棵有n个结点的二叉树,对树中的结点按从上至下、从左到右的顺序进行编号,节点得插入永远在下一个位置。
下面得紫色的那排小字就是节点的序号。那么N号元素的左孩子queue [2 * n + 1]右孩子就是queue [2 *(n + 1)](右孩子比左孩子多1 )。完全二叉树可以使用数组来进行元素存储。这个结构也叫小顶堆,最小的元素在堆顶。
继承关系
很明显是个队列那么必然有add,remove,offer,poll,peek等方法。
基本使用
public static void main(String[] args) {
Queue<Integer> pq = new PriorityQueue<>();
for (int i = 0; i < 20; i++) {
pq.offer(i);
}
System.out.println(pq.peek());
System.out.println(pq.poll());
System.out.println(pq.peek());
}
控制台输出,peek查询堆顶上的元素,poll获取堆顶上的元素,并且移除这个元素。可以看到堆顶的元素是0。
1
核心属性
-
transient Object[] queue;
表示为平衡二进制堆的优先级队列:queue [n]的两个子级是queue [2 * n + 1]和queue [2 *(n + 1)]。
如果比较器为null,则优先级队列元素的自然排序来排序,不为Null按比较器的结果存放
队列非空,则优先级最高的在queue [0]中
-
private int size = 0;
优先级队列中得元素个数
-
final Comparator<? super E> comparator;
比较器,基于比较器去比较元素之间得优先级
-
transient int modCount = 0;
用于快速失败得一个操作计数器,说明该集合不使用于多线程情景
构造方法
构造方法挺多的,一个个看
//默认构造方法,数组的容量默认为11,比较器为空。使用自然排序
public PriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
//指定容量的的构造方法,当可能存在较多元素时,避免多次扩容可以指定相近的元素
public PriorityQueue(int initialCapacity) {
this(initialCapacity, null);
}
//默认容量,指定比较器
public PriorityQueue(Comparator<? super E> comparator) {
this(DEFAULT_INITIAL_CAPACITY, comparator);
}
//指定比较器,又指定容量。
public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
//创建存放元素的数组
this.queue = new Object[initialCapacity];
this.comparator = comparator;
}
/**
* 包含给定元素的优先级队列。如果指定的集合原先是存在排序关系的(SortedSet,PriorityQueue)则根据原顺序进行排序。
* 否则根据元素的自然排序对该优先级队列进行排序。
* 如果指定集合的元素不能相互比较,则抛出ClassCastException
* 如果指定集合根据优先级不能相互比较队列的顺序,则抛出NullPointerException或
*/
```java
public PriorityQueue(Collection<? extends E> c) {
//SortedSet PriorityQueue 这两个类型的处理
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
initElementsFromCollection(ss);
} else if (c instanceof PriorityQueue<?>) {
PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
initFromPriorityQueue(pq);
} else {
//按自然排序处理
this.comparator = null;
initFromCollection(c);
}
}
SortedSet的情况:先赋值比较器,SortedSet内部也维护了一个比较器
做些常规校验,并且列表中不能有空的元素,直接赋值数组以及比较器。
private void initElementsFromCollection(Collection<? extends E> c) {
Object[] a = c.toArray();
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, a.length, Object[].class);
int len = a.length;
if (len == 1 || this.comparator != null)
for (int i = 0; i < len; i++)
if (a[i] == null)
throw new NullPointerException();
this.queue = a;
this.size = a.length;
}
优先级队列的情况,也是赋值数组,比较器
private void initFromPriorityQueue(PriorityQueue<? extends E> c) {
if (c.getClass() == PriorityQueue.class) {
this.queue = c.toArray();
this.size = c.size();
} else {
initFromCollection(c);
}
}
按自然顺序排序处理,调用了initElementsFromCollection(上文的方法),因为是Collection列表不一定有序。所以需要heapify调整一下元素位置。siftDown后续单独看。
private void initFromCollection(Collection<? extends E> c) {
initElementsFromCollection(c);
heapify();
}
private void heapify() {
for (int i = (size >>> 1) - 1; i >= 0; i--)
siftDown(i, (E) queue[i]);
}
核心方法 boolean offer(E e) 插入元素
优先队列在插入元素时就会将各元素与父节点进行比较,使得父节点的优先级是永远大于子节点的。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
//大于存放元素得数组时,进行扩容
if (i >= queue.length)
grow(i + 1);
//已存元素个数加1
size = i + 1;
//如果此时堆中不存在元素,将元素放第一个位置
if (i == 0)
queue[0] = e;
else
//往数组插入元素
siftUp(i, e);
return true;
}
核心方法 siftUp(int k, E x)
在位置k插入项x,通过将树x提升到大于或等于其父节点或成为根,从而保持堆不变。
说的通俗一点就是你在某个位置要插入元素了,那么我会把这个元素跟他的父节点去比较,如果大于父节点就和父节点交换,以此类推,直到不大于父节点。
//K是存放在数组中的位置,e是存放的元素
private void siftUp(int k, E x) {
if (comparator != null)
//有自定义得比较器
siftUpUsingComparator(k, x);
else
//默认比较器
siftUpComparable(k, x);
}
默认比较器
先计算出父节点的位置,
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
//一直比较直到,找到优先级比他大的父节点
//或者一直到根节点
while (k > 0) {
//定位父节点的位置:queue [n]的两个子级是queue [2 * n + 1]和queue [2 *(n + 1)]
//定位到父节点的
int parent = (k - 1) >>> 1;
//获取父节点
Object e = queue[parent];
//如果父节点大于等于该节点退出循环
if (key.compareTo((E) e) >= 0)
break;
//如果父节点小于该节点则交换
queue[k] = e;
//新插入的节点的下标的索引变为父节点的
k = parent;
}
//根据记录的下标节点存放元素
queue[k] = key;
}
自定义比较器
private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
//与上述代码唯一不同点,使用了自定义的比较器
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
核心方法 E poll()
获取优先级最大的元素,并且移除该元素。这个元素就是堆顶元素,但是移除完元素后还要进一步调整树的结构
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
//获取堆顶元素
E result = (E) queue[0];
//获取数组最后一个元素,用于从上至下调整结构
E x = (E) queue[s];
//最后一个元素的位置置为Null
queue[s] = null;
if (s != 0)
//开始调整树结构,将最末尾元素,插入到堆顶,然后开始从上至下调整
siftDown(0, x);
return result;
}
核心方法 oid siftDown(int k, E x)
在位置k插入项x,通过反复将x降级到树上小于或等于的节点的子节点,从而保持堆不变。
将节点和它的子节点进行比较调整,保证它比它所有的子节点都要小。这个调整的顺序是从当前节点向下,一直调整到叶节点。
private void siftDown(int k, E x) {
//两者的区别知识比较器的使用不同。
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
// 容量的一半
int half = size >>> 1;
// 叶子节点不用比较,因为优先级必定小于父节点
// 每次比较完K都等于优先级最高的孩子的下标
while (k < half) {
//左孩子的下标
int child = (k << 1) + 1; // assume left child is least
//假设左孩子优先级最高
Object c = queue[child];
//右孩子的下标
int right = child + 1;
// 如果右孩子的优先级大于左孩子
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
//优先级最高的换为右孩子
c = queue[child = right];
// 如果传进来的元素(末尾元素),优先级高于子节点,直接退出循环
if (key.compareTo((E) c) <= 0)
break;
// 父节点和优先级最高的孩子节点交换
// 将变为父节点的节点的位置记录下来,用于下次寻找子节点。
queue[k] = c;
k = child;
}
// 如此往复,当退出节点时
// K记录的就是节点的位置
queue[k] = key;
}
核心方法扩容 boolean offer(E e)
在插入元素时size等于数组容量时触发扩容
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
//在插入元素时size等于数组容量时触发扩容
if (i >= queue.length)
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);
return true;
}
private void grow(int minCapacity) {
int oldCapacity = queue.length;
//如果当前容量小于64 扩容为2倍+2
//大于64 扩容为1.5倍
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
//当扩容后的长度超过最大的处理
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
//数组元素迁移到新的数组扩容后的数组中
queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
//如果原size+1 大于最大的可分配的长度 新长度为2^31 -1
//否则为Integer.MAX_VALUE - 8(原文的解释:一些虚拟机在数组中保留一些标题字。尝试分配更大的阵列可能会导致 OutOfMemoryError)
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
常见使用场景
-
找到第K大的元素
Leetcode215题:在未排序的数组中找到第 k个最大的元素。poll几次之后堆顶就是第K大的
-
优先任务处理
一批任务中优先处理VIP等级高的优先处理等
PriorityBlockingQueue 阻塞的优先队列
继承结构
相比于PriorityQueue,优先级阻塞队列实现了BlockingQueue。需要实现put,take方法。
核心属性
- private transient Object[] queue;
- private transient int size;
-
private transient Comparator<? super E> comparator;
以上都是和非阻塞的队列一样,主要是多了以下三个属性
-
private final ReentrantLock lock;
可重入锁
-
private final Condition notEmpty;
提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。
-
private transient volatile int allocationSpinLock;
是否正在扩容的一个标志,注意这个变量是volatile的
核心方法
阻塞队列一般在put()和take()时,都需要根据满或空来进行阻塞。但是无界队列在put时不需要阻塞。虽然在构造参数时可以指容量,但是那只是初始化容量,当容量不够用了会自动进行扩容。
public void put(E e) {
offer(e);
}
初略来看相比非阻塞,在操作前加了一把冲入锁。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
int n, cap;
Object[] array;
// 尝试扩容,为什么是尝试呢
// 尝试扩容的过程有可能有其他线程在扩容导致失败,让出CPU的使用
// 被系统调度后 会再判断是否需要扩容,直到扩容被某一个线程完成
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
//默认自然排序比较器(与PriorityQueue一致)
siftUpComparable(n, e, array);
else
//自定义比较器
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//put进元素了,唤醒消费线程进行消费
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
尝试扩容,一进入方法,发现他把锁释放了(并且注释:必须释放然后重新获取主锁),至于为什么接着往下看。判断allocationSpinLock等于0,如果等于0就CAS为1.看到这里就很熟悉了allocationSpinLock属性被volatile。allocationSpinLock为1,或者CAS失败代表已经有线程在扩容了, 直接调用Thread.yield()让出CPU是线程从运行状态转为就绪状态(跟ConcurrentHashMap的初始化数组过程相似)。最后又竞争到锁,将数组迁移到新数组中扩容完成。
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
//allocationSpinLock为1说明有线程正在扩容,为0戴白目前没有线程在扩容
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
//扩容操作与非阻塞的一致不再赘述
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
//扩容结束,allocationSpinLock 置为0
allocationSpinLock = 0;
}
}
// allocationSpinLock 为1 或者CAS失败 已经有线程在扩容了,让出CPU使用权
if (newArray == null)
Thread.yield();
// 重新竞争锁
// 如果扩容是当前线程扩容的,需要做赋值操作
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
//内容拷贝
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
核心方法E take()
可以看到take与put方法使用的同一把锁。在扩容时put释放了锁,使用CAS控制volatile变量来同步线程之间的扩容(保证只有一个线程在做扩容操作)。这样在扩容期间还未扩容结束时take还是可以工作(数组重新赋值扩容后的数组需要重新竞争锁),提高了吞吐量。等扩容完又竞争锁完成put操作,保证线程安全。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
E result;
try {
//没获取到元素就阻塞,线程进入等待状态,等待线程被唤醒
//出队方法与非阻塞的一致,不再赘述
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
总结
- PriorityQueue优先级队列使用完全二叉树(小顶堆)的数据结构实现,其存储结构是数组。由于是完全二叉树又有那么第N个元素的左孩子queue [2 * n +1]右孩子就是queue [2 *(n + 1)]。
- 每次在放入元素时先将元素放置在数组末尾,再进行向上调整,优先级大于父节点则与父节点交换,直至根节点。
- 获取优先级最高的元素,就是获取数组中下标为0的元素,如果要移除堆顶元素,会比较两孩子的优先级,优先级高的调整为根节点,直至叶子节点,最后将末尾元素移动到最后一次升为中间节点的位置上。
- 优先级阻塞队列,其阻塞主要体现在take方法上,因为优先队列是无界队列(Integer.MAX_VALUE -8),当数组容量不够用时会进行扩容操作。使用了一把可重入锁来实现阻塞,当take元素为空是会调用Condition.await()方法让线程进入等待状态并释放锁,等待put线程将元素插入队列并唤醒take线程。
- 因为扩容相对耗时,如果在put时如果需要扩容,会释放锁使用CAS控制allocationSpinLock变量来保证线程安全,扩容的同时是又不影响take线程获取元素(扩容的线程在做数据赋值需要重新获得锁),提高并发量。