DelayQueue
DelayQueue是一個支援延時擷取元素的無界阻塞隊列
java實作原理
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
可以看到其内部維護了一個
PriorityQueue
對象,根據後續的方法源碼解讀中可以發現,此隊列是
DelayQueue
最終儲存元素的隊列。因為
PriorityQueue
是一個無界的隊列,是以DelayQueue自然也實作了無界隊列。
基礎操作
新增資料
類似其他的隊列實作,其提供了add、put和offer三個方法進行元素的添加。
add和put
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
但是實際上根據源碼可以發現,add和put方式内部調用的是offer方法。
offer
此方法是DelayQueue最終添加元素的最終邏輯。其内部主要使用的是
PriorityQueue
的方法
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 當插入的元素為隊首,證明之前沒有資料
// 此時可能存在阻塞的讀取線程
if (q.peek() == e) {
// 當讀取線程為空
leader = null;
// 喚醒相關線程
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
可以看到其入隊的方法主要依托于
PriorityQueue
并沒有其他特殊的内容。
取出/删除資料
上面介紹過DelayQueue是一個支援延遲擷取元素的隊列,而其提供的三個查詢元素的方法中。
peek
隻是實作了簡單的資料查詢,其延遲擷取的方法主要在
poll
和
take
中
peek
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
poll
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// first.getDelay(NANOSECONDS) 擷取隊首元素的逾時時間
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 使用lockInterruptibly方式擷取鎖
lock.lockInterruptibly();
try {
// 進入循環
for (;;) {
E first = q.peek();
// 如果為空則等待
if (first == null)
available.await();
else {
// first.getDelay(NANOSECONDS)
long delay = first.getDelay(NANOSECONDS);
// 逾時則直接出隊
if (delay <= 0)
return q.poll();
// 釋放引用
first = null; // don't retain ref while waiting
// 此時當leader為空的時候會被目前線程指派,
// 是以當leader不為空的時候,則代表有其他線程在操作内容
// 進行線程阻塞,這個時候因為leader隻能有一個值,是以保證了隻有一個線程線程去等待到時喚醒,避免大量喚醒操
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
// 指派為目前線程
leader = thisThread;
try {
// 進行逾時的阻塞
available.awaitNanos(delay);
} finally {
// 等待結束後釋放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader為null并且隊列不為空,說明沒有其他線程在處理,且存在值
// 則喚醒available的鎖
if (leader == null && q.peek() != null)
available.signal();
// 釋放全局鎖
lock.unlock();
}
}
其出隊流程可以描述為:
- 在循環中當不存在元素則使用條件Condition進行阻塞,讓出資源。
- 存在資料的情況下,優先擷取元素逾時時間,假如已經逾時則直接調用poll出隊
- 不存在逾時的情況下,首先判斷,leader是否有值,有值則代表有其他線程正在操作,進行阻塞。
- leader無值得時候,表示沒有線程正在等待元素,是以将leader進行指派為本線程,然後根據之前計算的逾時時間,進行指定逾時時間的阻塞。當再次被喚醒後釋放leader資源,然後擷取元素。
- 當leader已經被釋放,且隊列不為空的時候,證明隊列存在值且沒有其他線程在處理,然後喚醒其他正在條件阻塞的線程。
這裡需要關注的幾個地方
- 當線程1到達的時候,因為隊首元素沒有逾時,則設定leader = 線程1。同時因為
導緻線程1持有隊首元素引用。E first = q.peek();
- 當後續線程2到達的時候,因為leader = 線程1則會被阻塞,但是此時因為
代碼已經執行,導緻其被線程2持有,假如線程阻塞完畢了,擷取列首元素成功,出列。這個時候列首元素應該會被回收掉,但是問題是它還被線程2持有着,是以不會回收,就會造成記憶體洩漏。E first = q.peek();
Delayed
在出隊的方法中我們看到方法使用Delayed進行逾時判斷。此接口是延遲隊列進行延遲的主要邏輯。是以DelayQueue要求隊列中的元素必須實作Delayed接口
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
該方法傳回與此對象相關的的剩餘時間。同時可以看到此接口繼承了Comperable接口。是以實作Delayed接口的類存在一個排序邏輯,然後配合
PriorityQueue
隊列可以保證在取出的元素的延時時間是有序的。
DelayQueue的使用
因為DelayQueue是基于
Delayed
的接口進行逾時判斷,是以元素需要繼承此接口,下面一個簡單使用此隊列的例子
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author daify
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
DelayQueue delayQueue = new DelayQueue();
long l = System.currentTimeMillis();
Item item = new Item(l+200,"item");
Item item2 = new Item(l+100,"item2");
Item item3 = new Item(l+150,"item3");
Item item4 = new Item(l+300,"item4");
delayQueue.add(item);
delayQueue.add(item2);
delayQueue.add(item3);
delayQueue.add(item4);
int size = delayQueue.size();
long time = 0;
for (int i = 0; i < size; i++) {
long l1 = System.currentTimeMillis();
System.out.println(l1 - time);
time = l1;
System.out.println(((Item)delayQueue.take()).getName());
}
}
}
class Item implements Delayed {
private String name;
private Long cancelTime;
public Item(Long cancelTime,String name) {
this.cancelTime = cancelTime;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
long l = unit.convert(cancelTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return l;
}
@Override
public int compareTo(Delayed o) {
return this.getCancelTime().compareTo(((Item) o).getCancelTime());
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Long getCancelTime() {
return cancelTime;
}
public void setCancelTime(Long cancelTime) {
this.cancelTime = cancelTime;
}
}
輸出内容
item2
1000
item3
499
item
500
item4
此時可以看到在元素插入隊列的時候,元素就已經根據時間進行重新排序了。是以取出的順序是根據
compareTo
的結果計算的。
關于DelayQueue
延遲隊列主要應用的場景:
- 緩存系統的設計:使用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中擷取元素時,就表示有緩存到期了。
- 定時任務排程:使用DelayQueue儲存當天要執行的任務和執行時間,一旦從DelayQueue中擷取到任務就開始執行,比如Timer就是使用DelayQueue實作的。
個人水準有限,上面的内容可能存在沒有描述清楚或者錯誤的地方,假如開發同學發現了,請及時告知,我會第一時間修改相關内容。假如我的這篇内容對你有任何幫助的話,麻煩給我點一個贊。你的點贊就是我前進的動力。