Timer和ScheduledExecutorService是JDK内置的定時任務方案,而業内還有一個經典的定時任務的設計叫時間輪(Timing Wheel), Netty内部基于時間輪實作了一個HashedWheelTimer來優化百萬量級I/O逾時的檢測,它是一個高性能,低消耗的資料結構,它适合用非準實時,延遲的短平快任務,例如心跳檢測。本文主要介紹時間輪(Timing Wheel)及其使用。@pdai
知識準備
需要對時間輪(Timing Wheel),以及Netty的HashedWheelTimer要解決什麼問題有初步的認識。
什麼是時間輪(Timing Wheel)
時間輪(Timing Wheel)是George Varghese和Tony Lauck在1996年的論文'Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility'實作的,它在Linux核心中使用廣泛,是Linux核心定時器的實作方法和基礎之一。
時間輪(Timing Wheel)是一種環形的資料結構,就像一個時鐘可以分成很多格子(Tick),每個格子代表時間的間隔,它指向存儲的具體任務(timerTask)的一個連結清單。

以上述在論文中的圖檔例子,這裡一個輪子包含8個格子(Tick), 每個tick是一秒鐘;
任務的添加:如果一個任務要在17秒後執行,那麼它需要轉2輪,最終加到Tick=1位置的連結清單中。
任務的執行:在時鐘轉2Round到Tick=1的位置,開始執行這個位置指向的連結清單中的這個任務。(# 這裡表示剩餘需要轉幾輪再執行這個任務)
Netty的HashedWheelTimer要解決什麼問題
HashedWheelTimer是Netty根據時間輪(Timing Wheel)開發的工具類,它要解決什麼問題呢?這裡面有兩個要點:延遲任務 + 低時效性。@pdai
在Netty中的一個典型應用場景是判斷某個連接配接是否idle,如果idle(如用戶端由于網絡原因導緻到伺服器的心跳無法送達),則伺服器會主動斷開連接配接,釋放資源。判斷連接配接是否idle是通過定時任務完成的,但是Netty可能維持數百萬級别的長連接配接,對每個連接配接去定義一個定時任務是不可行的,是以如何提升I/O逾時排程的效率呢?
Netty根據時間輪(Timing Wheel)開發了HashedWheelTimer工具類,用來優化I/O逾時排程(本質上是延遲任務);之是以采用時間輪(Timing Wheel)的結構還有一個很重要的原因是I/O逾時這種類型的任務對時效性不需要非常精準。
HashedWheelTimer的使用方式
在了解時間輪(Timing Wheel)和Netty的HashedWheelTimer要解決的問題後,我們看下HashedWheelTimer的使用方式
通過構造函數看主要參數
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor){
}
具體參數說明如下:
-
:線程工廠,用于建立工作線程, 預設是Executors.defaultThreadFactory()threadFactory
-
:tick的周期,即多久tick一次tickDuration
-
: tick周期的機關unit
-
:時間輪的長度,一圈下來有多少格ticksPerWheel
-
:是否開啟記憶體洩漏檢測,預設是trueleakDetection
-
:最多執行的任務數,預設是-1,即不限制。在高并發量情況下才會設定這個參數。maxPendingTimeouts
實作案例
這裡展示下HashedWheelTimer的基本使用案例。@pdai
Pom依賴
引入pom的依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>
2個簡單例子
例子1:5秒後執行TimerTask
@SneakyThrows
public static void simpleHashedWheelTimer(){
log.info("init task 1...");
HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
// add a new timeout
timer.newTimeout(timeout -> {
log.info("running task 1...");
}, 5, TimeUnit.SECONDS);
}
執行結果如下:
23:32:21.364 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 1...
...
23:32:27.454 [pool-1-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 1...
例子2:任務失效後cancel并讓它重新在3秒後執行。
@SneakyThrows
public static void reScheduleHashedWheelTimer(){
log.info("init task 2...");
HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
Thread.sleep(5000);
// add a new timeout
Timeout tm = timer.newTimeout(timeout -> {
log.info("running task 2...");
}, 5, TimeUnit.SECONDS);
// cancel
if (!tm.isExpired()) {
log.info("cancel task 2...");
tm.cancel();
}
// reschedule
timer.newTimeout(tm.task(), 3, TimeUnit.SECONDS);
}
23:28:36.408 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 2...
23:28:41.412 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - cancel task 2...
23:28:45.414 [pool-2-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 2...
進一步了解
我們通過如下問題進一步了解HashedWheelTimer。@pdai
HashedWheelTimer是如何實作的?
簡單看下HashedWheelTimer是如何實作的
-
:worker工作線程主要負責任務排程觸發,單線程運作。Worker
-
: 時間輪上面的格子,内部持有HashedWheelTimeout組成的連結清單結構的頭尾節點,多個格子組成的時間輪形成一圈又一圈的任務環HashedWheelBucket
-
: 往時間輪裡面送出的任務會被封裝成HashedWheelTimeoutHashedWheelTimeout
構造函數
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor){
checkNotNull(threadFactory, "threadFactory");
checkNotNull(unit, "unit");
checkPositive(tickDuration, "tickDuration");
checkPositive(ticksPerWheel, "ticksPerWheel");
this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
建立wheel
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
//ticksPerWheel may not be greater than 2^30
checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel){
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
任務的添加
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit){
checkNotNull(task, "task");
checkNotNull(unit, "unit");
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
執行方法
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void start(){
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
停止方法
@Override
public Set<Timeout> stop(){
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts();
}
什麼是多級Timing Wheel?
多級的時間輪是比較好了解的,時鐘是有小時,分鐘,秒的,秒轉一圈(Round)分鐘就轉一個格(Tick), 分鐘轉一圈(Round)小時就轉一格(Tick)。