在Kafka中應用了大量的延遲操作但在Kafka中 并沒用使用JDK自帶的Timer或是DelayQueue用于延遲操作,而是使用自己開發的DelayedOperationPurgatory元件用于管理延遲操作,Kafka這類分布式架構有大量延遲操作并且對性能要求及其高,而java.util.Timer與java.util.concurrent.DelayQueue的插入和删除時間複雜度都為對數階O(log n)并不能滿足Kafka性能要求,是以Kafka實作了基于時間輪的定時任務元件,該時間輪定時任務實作的插入與删除(開始定時器與暫停定時器)的時間複雜度都為常數階O(1);
時間輪的應用并不少見,在Netty、akka、Quartz、Zookeeper等高性能元件中都存在時間輪定時器的蹤影;
時間輪資料結構

時間輪名詞解釋:
時間格:環形結構中用于存放延遲任務的區塊;
指針(CurrentTime):指向目前操作的時間格,代表目前時間
格數(ticksPerWheel):為時間輪中時間格的個數
間隔(tickDuration):每個時間格之間的間隔
總間隔(interval):目前時間輪總間隔,也就是等于ticksPerWheel*tickDuration
TimingWheel并非簡單的環形時間輪,而是多層級時間輪,每個時間輪由多個時間格組成,每個時間格為一個時間間隔,底層的時間格跨度較小,然後随着延遲任務延遲時間的長短逐層變大;如上圖,底下的時間輪每個時間格為1ms,整個時間輪為10ms,而上面一層的時間輪中時間格為10ms,整個時間輪為100ms;
時間輪添加上級時間輪的規則為:目前currentTime為上級時間輪的startMs,目前interval為上級時間輪的tickDuration,每層ticksPerWheel相同;簡單點說就是上層時間輪跨度為目前的M倍,時間格為目前的N倍;
Kafka中時間輪的實作
Kafka中時間輪時間類為TimingWheel,該類結構為存儲定時任務的環形隊列,内部使用數組實作,數組是用于存放TimerTaskList對象,TimerTaskList環形雙向連結清單,連結清單項TimerTaskEntry封裝了定時任務TimerTask,TimerTaskList與TimerTaskEntry中均有逾時時間字段,TimerTask中delayMs字段用于記錄任務延遲時間;該三個類為Kafka時間輪實作的核心;
TimingWheel:表示一個時間輪,通常會有多層時間輪也就存在多個TimingWheel對象;
TimerTaskList:為數組對象用于存放延遲任務,一個TimerTaskList就代表一個時間格,一個時間格中能儲存的任務到期時間隻可在[t~t+10ms]區間(t為時間格到期時間,10ms時間格間格),每個時間格有個過期時間,時間格過期後時間格中的任務将向前移動存入前面時間格中;
TimerTask:表示延遲任務;
SystemTimer:kafka實作的定時器,内部封裝了TimningWheel用于執行、管理定時任務;
下面通過一個示例來介紹kafka時間輪的工作過程:
時間輪初始化:初始時間輪中的格數、間隔、指針的初始化時間,建立時間格所對應的buckets數組,計算總間隔interval;
添加延遲任務:判斷該任務是否已被取消、是否已經過期如已過期則把任務放入線程池中執行、根據時間輪總間隔與目前時間判斷任務是否可存入目前層級時間輪否則添加上層時間輪并再次嘗試往時間輪中添加該任務;
時間輪降級:有一個定時任務再300ms後将執行,現層級時間輪每層有10個時間格,頂層時間輪的時間格間隔為1ms,整個時間輪為10ms,無法存下該任務。這時建立第二層時間輪,時間格間隔為10ms,整個時間輪為100ms,還是無法存該任務。接着建立第三層時間輪,時間格間隔為100ms,整個時間輪為1000ms,此時任務存入第三層時間輪的第三個時間格中;過了段時間,TimerTaskList到期(時間格)可該任務還有90ms,還無法執行。此時将再次把定時任務添加到時間輪中,頂層時間輪還是無法滿足存入條件,往第二層時間輪添加,這時定時任務存入第二層時間輪第九個時間格當中;任務在時間輪中如此反複,直到任務過期時将放入線程池中執行;
關鍵實作方法
public boolean add(TaskEntry e) {
synchronized (this) {
long expiration = e.getExpirationMs();
if(expiration<(currentTime+tickDuration)){
//目前任務過期時間
LOGGER.info("目前任務已過期");
return false;
}else if(expiration<(currentTime+interval)) {
//查找時間格的位置,過期時間/時間格%時間輪大小
long virtualId = expiration / tickDuration;
TaskEntryList taskEntryList = buckets.get((int) (virtualId % ticksPerWheel));
taskEntryList.add(e);
//設定EntryList過期時間
if(taskEntryList.setTime(virtualId * tickDuration)) {
listDelayQueue.offer(taskEntryList);
}
return true;
}else{
if(overflowWheel==null){
// 添加上級timingWheel
addOverflowWheel();
}
return overflowWheel.add(e);
}
}
}
/**
*時間表針移動
* @param timeMS
*/
public void advanceClock(long timeMS){
if(timeMS>=(currentTime+tickDuration)){
currentTime=timeMS-(timeMS%tickDuration);
}
if (overflowWheel != null) overflowWheel.advanceClock(currentTime);
}
/**
* 添加定時任務
* @param taskEntry
*/
public void add(TaskEntry taskEntry) {
if (!timingWheel.add(taskEntry)) {
System.out.println(String.format("任務已過期,開始執行 %s",taskEntry.getTimerTask()));
taskExecutor.execute(taskEntry.getTimerTask());
}
}
文章首發位址:Solinx
http://www.solinx.co/archives/989