天天看點

Java DelayQueue使用執行個體

DelayQueue是一個支援延時擷取元素的無界阻塞隊列。隊列使用PriorityQueue來實作。隊列中的元素必須實作Delayed接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時才能從隊列中提取元素。我們可以将DelayQueue運用在以下應用場景:

  • 緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了。
  • 定時任務排程。使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從DelayQueue中擷取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實作的。

具有過期時間的緩存

package test;

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class SessionCache<K, V> {

    public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<K, V>();
    public DelayQueue<DelayedItem<K>> queue = new DelayQueue<DelayedItem<K>>();
    
    
    static class SingletonHolder {    
        static SessionCache instance = new SessionCache();    
    }    
       
    public static SessionCache getInstance() {    
        return SingletonHolder.instance;    
    }  
    

    public void put(K k, V v, long liveTime) {
        V v2 = map.put(k, v);
        DelayedItem<K> tmpItem = new DelayedItem<K>(k, liveTime);
        if (v2 != null) {
            queue.remove(tmpItem);
        }
        queue.put(tmpItem);
    }

    public SessionCache() {
        Thread t = new Thread() {
            @Override
            public void run() {
                dameonCheckOverdueKey();
            }
        };
        t.setDaemon(true);
        t.start();
    }

    public void dameonCheckOverdueKey() {
        while (true) {
            DelayedItem<K> delayedItem = queue.poll();
            if (delayedItem != null) {
                map.remove(delayedItem.getT());
                System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
            }
            try {
                Thread.sleep(300);
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        int cacheNumber = 10;
        Long liveTime = 200000000L;
        
        SessionCache cache = SessionCache.getInstance();
        for (int i = 0; i < cacheNumber; i++) {
            liveTime = liveTime + random.nextInt(10) * 10000;
            System.out.println(i + "  " + liveTime);
            cache.put(i + "", i + "", liveTime);
            liveTime = 200000000L;

        }

        Thread.sleep(100000);
        System.out.println();
    }

}

class DelayedItem<T> implements Delayed {

    private T t;
    private long liveTime;
    private long removeTime;

    public DelayedItem(T t, long liveTime) {
        this.setT(t);
        this.liveTime = liveTime;
        this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime();
    }

    @Override
    public int compareTo(Delayed o) {
        if (o == null)
            return 1;
        if (o == this)
            return 0;
        if (o instanceof DelayedItem) {
            DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
            if (liveTime > tmpDelayedItem.liveTime) {
                return 1;
            } else if (liveTime == tmpDelayedItem.liveTime) {
                return 0;
            } else {
                return -1;
            }
        }
        long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        return diff > 0 ? 1 : diff == 0 ? 0 : -1;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(removeTime - System.nanoTime(), unit);
    }

    public T getT() {
        return t;
    }

    public void setT(T t) {
        this.t = t;
    }

    @Override
    public int hashCode() {
        return t.hashCode();
    }

    @Override
    public boolean equals(Object object) {
        if (object instanceof DelayedItem) {
            return object.hashCode() == hashCode() ? true : false;
        }
        return false;
    }
    
    
}
           

 參考:http://ifeve.com/java-blocking-queue/