天天看點

JAVA隊列介紹(Queue)——DelayQueue(java延遲隊列)DelayQueue

DelayQueue

DelayQueue是一個支援延時擷取元素的無界阻塞隊列

java實作原理

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    private Thread leader = null;

    private final Condition available = lock.newCondition();
}
           

可以看到其内部維護了一個

PriorityQueue

對象,根據後續的方法源碼解讀中可以發現,此隊列是

DelayQueue

最終儲存元素的隊列。因為

PriorityQueue

是一個無界的隊列,是以DelayQueue自然也實作了無界隊列。

基礎操作

新增資料

類似其他的隊列實作,其提供了add、put和offer三個方法進行元素的添加。

add和put

public boolean add(E e) {
        return offer(e);
    }
           
public void put(E e) {
        offer(e);
    }
           

但是實際上根據源碼可以發現,add和put方式内部調用的是offer方法。

offer

此方法是DelayQueue最終添加元素的最終邏輯。其内部主要使用的是

PriorityQueue

的方法

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            // 當插入的元素為隊首,證明之前沒有資料
            // 此時可能存在阻塞的讀取線程
            if (q.peek() == e) {
                // 當讀取線程為空
                leader = null;
                // 喚醒相關線程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
           

可以看到其入隊的方法主要依托于

PriorityQueue

并沒有其他特殊的内容。

取出/删除資料

上面介紹過DelayQueue是一個支援延遲擷取元素的隊列,而其提供的三個查詢元素的方法中。

peek

隻是實作了簡單的資料查詢,其延遲擷取的方法主要在

poll

take

peek

public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

           

poll

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            // first.getDelay(NANOSECONDS) 擷取隊首元素的逾時時間
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 使用lockInterruptibly方式擷取鎖
        lock.lockInterruptibly();
        try {
        // 進入循環
            for (;;) {
                E first = q.peek();
                // 如果為空則等待
                if (first == null)
                    available.await();
                else {
                    // first.getDelay(NANOSECONDS)
                    long delay = first.getDelay(NANOSECONDS);
                    // 逾時則直接出隊
                    if (delay <= 0)
                        return q.poll();
                        // 釋放引用
                    first = null; // don't retain ref while waiting
                    // 此時當leader為空的時候會被目前線程指派,
                    // 是以當leader不為空的時候,則代表有其他線程在操作内容
                    // 進行線程阻塞,這個時候因為leader隻能有一個值,是以保證了隻有一個線程線程去等待到時喚醒,避免大量喚醒操
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        // 指派為目前線程
                        leader = thisThread;
                        try {
                            // 進行逾時的阻塞
                            available.awaitNanos(delay);
                        } finally {
                            // 等待結束後釋放leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader為null并且隊列不為空,說明沒有其他線程在處理,且存在值
            // 則喚醒available的鎖
            if (leader == null && q.peek() != null)
                available.signal();
            // 釋放全局鎖
            lock.unlock();
        }
    }
           

其出隊流程可以描述為:

  1. 在循環中當不存在元素則使用條件Condition進行阻塞,讓出資源。
  2. 存在資料的情況下,優先擷取元素逾時時間,假如已經逾時則直接調用poll出隊
  3. 不存在逾時的情況下,首先判斷,leader是否有值,有值則代表有其他線程正在操作,進行阻塞。
  4. leader無值得時候,表示沒有線程正在等待元素,是以将leader進行指派為本線程,然後根據之前計算的逾時時間,進行指定逾時時間的阻塞。當再次被喚醒後釋放leader資源,然後擷取元素。
  5. 當leader已經被釋放,且隊列不為空的時候,證明隊列存在值且沒有其他線程在處理,然後喚醒其他正在條件阻塞的線程。

這裡需要關注的幾個地方

  1. 當線程1到達的時候,因為隊首元素沒有逾時,則設定leader = 線程1。同時因為

    E first = q.peek();

    導緻線程1持有隊首元素引用。
  2. 當後續線程2到達的時候,因為leader = 線程1則會被阻塞,但是此時因為

    E first = q.peek();

    代碼已經執行,導緻其被線程2持有,假如線程阻塞完畢了,擷取列首元素成功,出列。這個時候列首元素應該會被回收掉,但是問題是它還被線程2持有着,是以不會回收,就會造成記憶體洩漏。

Delayed

在出隊的方法中我們看到方法使用Delayed進行逾時判斷。此接口是延遲隊列進行延遲的主要邏輯。是以DelayQueue要求隊列中的元素必須實作Delayed接口

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
           

該方法傳回與此對象相關的的剩餘時間。同時可以看到此接口繼承了Comperable接口。是以實作Delayed接口的類存在一個排序邏輯,然後配合

PriorityQueue

隊列可以保證在取出的元素的延時時間是有序的。

DelayQueue的使用

因為DelayQueue是基于

Delayed

的接口進行逾時判斷,是以元素需要繼承此接口,下面一個簡單使用此隊列的例子

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author daify
 */
public class Test {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue();
        long l = System.currentTimeMillis();
        Item item = new Item(l+200,"item");
        Item item2 = new Item(l+100,"item2");
        Item item3 = new Item(l+150,"item3");
        Item item4 = new Item(l+300,"item4");
        delayQueue.add(item);
        delayQueue.add(item2);
        delayQueue.add(item3);
        delayQueue.add(item4);
        int size = delayQueue.size();

        long time = 0;

        for (int i = 0; i < size; i++) {
            long l1 = System.currentTimeMillis();
            System.out.println(l1 - time);
            time = l1;
            System.out.println(((Item)delayQueue.take()).getName());
        }

    }

}

class Item implements Delayed {

    private String name;

    private Long cancelTime;

    public Item(Long cancelTime,String name) {
        this.cancelTime = cancelTime;
        this.name = name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long l = unit.convert(cancelTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return l;
    }

    @Override
    public int compareTo(Delayed o) {
        return this.getCancelTime().compareTo(((Item) o).getCancelTime());
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Long getCancelTime() {
        return cancelTime;
    }

    public void setCancelTime(Long cancelTime) {
        this.cancelTime = cancelTime;
    }
}

           

輸出内容

item2
1000
item3
499
item
500
item4
           

此時可以看到在元素插入隊列的時候,元素就已經根據時間進行重新排序了。是以取出的順序是根據

compareTo

的結果計算的。

關于DelayQueue

延遲隊列主要應用的場景:

  1. 緩存系統的設計:使用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中擷取元素時,就表示有緩存到期了。
  2. 定時任務排程:使用DelayQueue儲存當天要執行的任務和執行時間,一旦從DelayQueue中擷取到任務就開始執行,比如Timer就是使用DelayQueue實作的。
個人水準有限,上面的内容可能存在沒有描述清楚或者錯誤的地方,假如開發同學發現了,請及時告知,我會第一時間修改相關内容。假如我的這篇内容對你有任何幫助的話,麻煩給我點一個贊。你的點贊就是我前進的動力。