天天看点

java常用类--DelayQueue

DelayQueue,顾名思义,延迟队列,可以设置延迟多久执行,从类注释上可以得到以下有用信息:

  • 队列元素将在过期时被执行,越靠近对头,越早过期
  • 未过期的元素不能被take
  • 不允许空元素

一、类图说明

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
           

 可以看出,DelayQueue的元素必须是Delayed的子类,Delayed是表示延迟能力的关键接口,继承了Comparable接口,并定义了getDelay方法,具体源代码如下所示:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
           

即DelayQueue队列元素必须是实现Delayed和Comparable接口,并重写了getDelay和compareTo方法才可以;

另外,从源码中可以看出,DelayQueue中大量使用了PriorityQueue的功能

private final PriorityQueue<E> q = new PriorityQueue<E>();
           
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
           
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
           

PriorityQueue优先级队列,此处的作用是可以根据过期时间进行优先级排序,让先过期的可以先执行;

此处的复用思想还是很重要的,在源码中可以经常看大这种思想,譬如LinkedHashMap复用HashMap的能力,Set复用Map的能力,DelayQueue复用PriorityQueue的能力;

二、放数据

以put方法为例,put调用的是offer的方法,具体源代码如下:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    // 上锁
    lock.lock();
    try {
        // 使用 PriorityQueue 的扩容,排序等能力
        q.offer(e);
        // 如果恰好刚放进去的元素正好在队列头
        // 立马唤醒 take 的阻塞线程,执行 take 操作
        // 如果元素需要延迟执行的话,可以使其更快的沉睡计时
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}
           

 PriorityQueue的offer方法源码如下所示:

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }
           
private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x);
    else
        siftUpComparable(k, x);
 }
           
// 按照从小到大的顺序排列
private void siftUpComparable(int k, E x) {
   Comparable<? super E> key = (Comparable<? super E>) x;
   // k 是当前队列实际大小的位置
   while (k > 0) {
       // 对 k 进行减倍
       int parent = (k - 1) >>> 1;
       Object e = queue[parent];
       // 如果 x 比 e 大,退出,把 x 放在 k 位置上
       if (key.compareTo((E) e) >= 0)
           break;
            // x 比 e 小,继续循环,直到找到 x 比队列中元素大的位置
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
           

从源代码中可以看到,PriorityQueue的offer方法主要做了以下事情:

  • 对新增元素进行空值判断;
  • 对队列进行扩容,扩容策略和集合的扩容策略很相近;
  • 按照优先级进行排序,即根据元素的compareTo方法进行排序,我们最终希望排序的结果是从小到大,因为想要对头的都是过期的数据,需要每个元素的过期时间进行排序:
(int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
           

三、拿数据

拿数据时,如果发现有元素的过期时间到了,则可以拿出数据,反之如果没有过期元素,则线程阻塞,以take方法为例:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 从队头中拿数据出来
                E first = q.peek();
                // 如果为空,说明队列中,没有数据,阻塞住
                if (first == null)
                    available.await();
                else {
                    // 获取队头数据的过期时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 如果过期了,直接返回队头数据
                    if (delay <= 0)
                        return q.poll();
                    // 引用置为 null ,便于 gc,这样可以让线程等待时,回收 first 变量
                    first = null; // don't retain ref while waiting
                    // leader 不为空的话,表示当前队列元素之前已经被设置过阻塞时间了
                    // 直接阻塞当前线程等待。
                    if (leader != null)
                        available.await();
                    else {
                     // 之前没有设置过阻塞时间,按照一定的时间进行阻塞
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                             // 进行阻塞
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
           

整体来看,DelayQueue延迟队列底层使用了排序和超时阻塞实现了延迟队列,队列使用的是PriorityQueue的排序能力,超时阻塞使用的是锁的等待能力,主要是为了满足延迟执行的场景,在已有api的基础上进行封装而已;