本文介紹的 HashedWheelTimer 是來自于 Netty 的工具類,在 netty-common 包中。它用于實作延時任務。另外,下面介紹的内容和 Netty 無關。
如果你看過 Dubbo 的源碼,一定會在很多地方看到它。在需要失敗重試的場景中,它是一個非常友善好用的工具。
本文将會介紹 HashedWheelTimer 的使用,以及在後半部分分析它的源碼實作。
接口概覽
在介紹它的使用前,先了解一下它的接口定義,以及和它相關的類。
HashedWheelTimer
是接口
io.netty.util.Timer
的實作,從面向接口程式設計的角度,我們其實不需要關心 HashedWheelTimer,隻需要關心接口類 Timer 就可以了。這個 Timer 接口隻有兩個方法:
public interface Timer {
// 建立一個定時任務
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
// 停止所有的還沒有被執行的定時任務
Set<Timeout> stop();
}
Timer 是我們要使用的任務排程器,我們可以從方法上看出,它送出一個任務 TimerTask,傳回的是一個 Timeout 執行個體。是以這三個類之間的關系大概是下面這樣的:
TimerTask 非常簡單,就一個
run()
方法:
public interface TimerTask {
void run(Timeout timeout) throws Exception;
}
當然這裡有點意思的是,它把 Timeout 的執行個體也傳進來了,我們平時的代碼習慣,都是單向依賴。
這樣做也有好處,那就是在任務執行過程中,可以通過 timeout 執行個體來做點其他的事情。
Timeout 也是一個接口類:
public interface Timeout {
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}
它持有上層的 Timer 執行個體,和下層的 TimerTask 執行個體,然後取消任務的操作也在這裡面。
HashedWheelTimer 使用
有了第一節介紹的接口資訊,其實我們很容易就可以使用它了。我們先來随意寫幾行:
// 構造一個 Timer 執行個體
Timer timer = new HashedWheelTimer();
// 送出一個任務,讓它在 5s 後執行
Timeout timeout1 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("5s 後執行該任務");
}
}, 5, TimeUnit.SECONDS);
// 再送出一個任務,讓它在 10s 後執行
Timeout timeout2 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("10s 後執行該任務");
}
}, 10, TimeUnit.SECONDS);
// 取消掉那個 5s 後執行的任務
if (!timeout1.isExpired()) {
timeout1.cancel();
}
// 原來那個 5s 後執行的任務,已經取消了。這裡我們反悔了,我們要讓這個任務在 3s 後執行
// 我們說過 timeout 持有上、下層的執行個體,是以下面的 timer 也可以寫成 timeout1.timer()
timer.newTimeout(timeout1.task(), 3, TimeUnit.SECONDS);
通過這幾行代碼,大家就可以非常熟悉這幾個類的使用了,因為它們真的很簡單。
我們來看一下 Dubbo 中的一個例子。
下面這個代碼修改自 Dubbo 的叢集調用政策
FailbackClusterInvoker
中:
它在調用 provider 失敗以後,傳回空結果給消費端,然後由背景線程執行定時任務重試,多用于消息通知這種場景。
public class Application {
public static void main(String[] args) {
Application app = new Application();
app.invoke();
}
private static final Logger log = LoggerFactory.getLogger(Application.class);
private volatile Timer failTimer = null;
public void invoke() {
try {
doInvoke();
} catch (Throwable e) {
log.error("調用 doInvoke 方法失敗,5s 後将進入背景的自動重試,異常資訊: ", e);
addFailed(() -> doInvoke());
}
}
// 實際的業務實作
private void doInvoke() {
// 這裡讓這個方法故意失敗
throw new RuntimeException("故意抛出異常");
}
private void addFailed(Runnable task) {
// 延遲初始化
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
failTimer = new HashedWheelTimer();
}
}
}
RetryTimerTask retryTimerTask = new RetryTimerTask(task, 3, 5);
try {
// 5s 後執行第一次重試
failTimer.newTimeout(retryTimerTask, 5, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("送出定時任務失敗,exception: ", e);
}
}
}
下面是裡面使用到的 RetryTimerTask 類,當然,你也可以選擇寫成内部類:
public class RetryTimerTask implements TimerTask {
private static final Logger log = LoggerFactory.getLogger(RetryTimerTask.class);
// 每隔幾秒執行一次
private final long tick;
// 最大重試次數
private final int retries;
private int retryTimes = 0;
private Runnable task;
public RetryTimerTask(Runnable task, long tick, int retries) {
this.tick = tick;
this.retries = retries;
this.task = task;
}
@Override
public void run(Timeout timeout) {
try {
task.run();
} catch (Throwable e) {
if ((++retryTimes) >= retries) {
// 重試次數超過了設定的值
log.error("失敗重試次數超過門檻值: {},不再重試", retries);
} else {
log.error("重試失敗,繼續重試");
rePut(timeout);
}
}
}
// 通過 timeout 拿到 timer 執行個體,重新送出一個定時任務
private void rePut(Timeout timeout) {
if (timeout == null) {
return;
}
Timer timer = timeout.timer();
if (timeout.isCancelled()) {
return;
}
timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
}
}
上面的代碼也非常簡單,在調用
doInvoke()
方法失敗以後,送出一個定時任務在 5s 後執行重試,如果還是失敗,之後每 3s 重試一次,最多重試 5 次,如果重試 5 次都失敗,記錄錯誤日志,不再重試。
列印的日志如下:
15:47:36.232 [main] ERROR c.j.n.timer.Application - 調用 doInvoke 方法失敗,5s 後将進入背景的自動重試,異常資訊:
java.lang.RuntimeException: 故意抛出異常
at com.javadoop.nettylearning.timer.Application.doInvoke(Application.java:36)
at com.javadoop.nettylearning.timer.Application.invoke(Application.java:28)
at com.javadoop.nettylearning.timer.Application.main(Application.java:19)
15:47:41.793 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:44.887 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:47.986 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:51.084 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:54.186 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 失敗重試次數超過門檻值: 5,不再重試
HashedWheelTimer 的使用确實非常簡單,如果你是來學習怎麼使用它的,那麼看到這裡就可以了。
HashedWheelTimer 源碼分析
大家肯定都知道或聽說過,它用的是一個叫做時間輪(下載下傳算法介紹PPT)的算法,看下面我畫的圖:
我這裡先說說大緻的執行流程,之後再進行細緻的源碼分析。
預設地,時鐘每 100ms 滴答一下(tick),往前走一格,共 512 格,走完一圈以後繼續下一圈。把它想象成生活中的鐘表就可以了。
内部使用一個長度為 512 的數組存儲,數組元素(bucket)的資料結構是連結清單,連結清單每個元素代表一個任務,也就是我們前面介紹的 Timeout 的執行個體。
送出任務的線程,隻要把任務往虛線上面的任務隊列中存放即可傳回。工作線程是單線程,一旦開啟,不停地在時鐘上繞圈圈。
仔細看下面的介紹:
- 工作線程到達每個時間整點的時候,開始工作。在 HashedWheelTimer 中,時間都是相對時間,工作線程的啟動時間,定義為時間的 0 值。因為一次 tick 是 100ms(預設值),是以 100ms、200ms、300ms… 就是我說的這些整點。
- 如上圖,當時間到 200ms 的時候,發現任務隊列有任務,取出所有的任務。
- 按照任務指定的執行時間,将其配置設定到相應的 bucket 中。如上圖中,任務2 和任務6指定的時間為 100ms~200ms 這個區間,就被配置設定到第二個 bucket 中,形成連結清單,其他任務同理。
這裡還有輪次的概念,不過不用着急,比如任務 6 指定的時間可能是 150ms + (512*100ms),它也會落在這個 bucket 中,但是它是下一個輪次才能被執行的。
- 任務配置設定到 bucket 完成後,執行該次 tick 的真正的任務,也就是落在第二個 bucket 中的任務 2 和任務 6。
- 假設執行這兩個任務共消耗了 50ms,到達 250ms 的時間點,那麼工作線程會休眠 50ms,等待進入到 300ms 這個整點。
如果這兩個任務執行的時間超過 100ms 怎麼辦?這個問題就要看源碼來解答了。
開始源碼分析。我們從它的預設構造器開始,一步步到達最後一個最複雜的構造器:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
......
}
簡單說一下各個參數:
- threadFactory:定時任務都是背景任務,需要開啟線程,我們通常會通過自定義 threadFactory 來命名線程,嫌麻煩就使用
。Executors.defaultThreadFactory()
- tickDuration 和 timeUnit 定義了一格的時間長度,預設的就是 100ms。
- ticksPerWheel 定義了一圈有多少格,預設的就是 512;
- leakDetection:用于追蹤記憶體洩漏,本文不會介紹它,感興趣的讀者請自行去了解它。
- maxPendingTimeouts:最大允許等待的 Timeout 執行個體數,也就是我們可以設定不允許太多的任務等待。如果未執行任務數達到門檻值,那麼再次送出任務會抛出 RejectedExecutionException 異常。預設不限制。
初始化 HashedWheelTimer
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
// ...... 參數檢查
// 初始化時間輪,這裡做了向上"取整",保持數組長度為 2 的 n 次方
wheel = createWheel(ticksPerWheel);
// 掩碼,用來做取模
mask = wheel.length - 1;
// 100ms 轉換為納秒 100*10^6
this.tickDuration = unit.toNanos(tickDuration);
// 防止溢出
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
// 建立工作線程,這裡沒有啟動線程。後面會看到,在第一次送出任務的時候會啟動線程
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
// 指派最大允許等待任務數
this.maxPendingTimeouts = maxPendingTimeouts;
// 如果超過 64 個 HashedWheelTimer 執行個體,它會列印錯誤日志提醒你
// Netty 是真的到位,就怕你會用錯這個工具,到處執行個體化它。而且它隻會報錯一次。
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
上面,HashedWheelTimer 完成了初始化,初始化了時間輪數組
HashedWheelBucket[]
,稍微看一下内部類
HashedWheelBucket
,可以看到它是一個連結清單的結構。這個很好了解,因為每一格可能有多個任務。
送出第一個任務
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// 校驗等待任務數是否達到門檻值 maxPendingTimeouts
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果工作線程沒有啟動,這裡負責啟動
start();
/** 下面的代碼,建構 Timeout 執行個體,将其放到任務隊列中。 **/
// deadline 是一個相對時間,相對于 HashedWheelTimer 的啟動時間
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// timeout 執行個體,一個上層依賴 timer,一個下層依賴 task,另一個是任務到期時間
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 放到 timeouts 隊列中
timeouts.add(timeout);
return timeout;
}
送出任務的操作非常簡單,執行個體化 Timeout,然後放到任務隊列中。
我們可以看到,這裡使用的優先級隊列是一個 MPSC(Multiple Producer Single Consumer)的隊列,剛好适用于這裡的多生産線程,單消費線程的場景。而在 Dubbo 中,使用的隊列是 LinkedBlockingQueue,它是一個以連結清單方式組織的線程安全的隊列。
另外就是注意這裡調用的
start()
方法,如果該任務是第一個送出的任務,它會負責工作線程的啟動。
工作線程開始工作
其實隻要看懂下面的幾行代碼,HashedWheelTimer 的源碼就非常簡單了。
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// tick 過的次數,前面說過,時針每 100ms tick 一次
private long tick;
@Override
public void run() {
// 在 HashedWheelTimer 中,用的都是相對時間,是以需要啟動時間作為基準,并且要用 volatile 修飾
startTime = System.nanoTime();
if (startTime == 0) {
// 這裡不是很看得懂...請知道的讀者不吝賜教
startTime = 1;
}
// 第一個送出任務的線程正 await 呢,喚醒它
startTimeInitialized.countDown();
// 接下來這個 do-while 是真正執行任務的地方,非常重要
do {
// 往下滑,就在目前的代碼塊裡面,倒數第二個方法
// 比如之前介紹的圖,那傳回值 deadline 就是 200ms
final long deadline = waitForNextTick();
if (deadline > 0) {
// 該次 tick,bucket 數組對應的 index
int idx = (int) (tick & mask);
// 處理一下已經取消的任務,可忽略。
processCancelledTasks();
// bucket
HashedWheelBucket bucket = wheel[idx];
// 将隊列中所有的任務轉移到相應的 buckets 中。細節往下看,這個代碼塊的下一個方法。
transferTimeoutsToBuckets();
// 執行進入到這個 bucket 中的任務
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
/* 到這裡,說明這個 timer 要關閉了,做一些清理工作 */
// 将所有 bucket 中沒有執行的任務,添加到 unprocessedTimeouts 這個 HashSet 中,
// 主要目的是用于 stop() 方法傳回
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将任務隊列中的任務也添加到 unprocessedTimeouts 中
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
private void transferTimeoutsToBuckets() {
// 這裡一個 for 循環,還特地限制了 10 萬次,就怕你寫錯代碼,一直往裡面扔任務,可能實際也沒什麼用吧
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// 沒有任務了
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// 該任務剛剛被取消了(在transfer之前其實已經做過一次了)
continue;
}
// 下面就是将任務放到相應的 bucket 中,這裡還計算了每個任務的 remainingRounds
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 單個 bucket,是由 HashedWheelTimeout 執行個體組成的一個連結清單,
// 大家可以看一下這裡面的代碼,由于是單線程操作,不存在并發,是以代碼很簡單
bucket.addTimeout(timeout);
}
}
private void processCancelledTasks() {
//... 太簡單,忽略
}
/**
* 下面這個方法大家多看幾遍,注意它的傳回值
* 前面說過,我們用的都是相對時間,是以:
* 第一次進來的時候,工作線程會在 100ms 的時候傳回,傳回值是 100*10^6
* 第二次進來的時候,工作線程會在 200ms 的時候傳回,依次類推
* 另外就是注意極端情況,比如第二次進來的時候,由于被前面的任務阻塞,導緻進來的時候就已經是 250ms,
* 那麼,一進入這個方法就要立即傳回,傳回值是 250ms,而不是 200ms
* 剩下的自己看一下代碼
*/
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
// 嵌套在一個死循環裡面
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
// 什麼情況會進到這個分支?我也不清楚
return -Long.MAX_VALUE;
} else {
// 這裡是出口,是以傳回值是目前時間(相對時間)
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
// 如果 timer 已經 shutdown,那麼傳回 Long.MIN_VALUE
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
接下來應該看看怎麼執行 bucket 中的任務:
/**
* 這裡會執行這個 bucket 中,輪次為 0 的任務,也就是到期的任務。
* 這個方法的入參 deadline 其實沒什麼用,因為輪次為 0 的都是應該被執行的。
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 處理連結清單上的所有 timeout 執行個體
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 這行代碼負責執行具體的任務
timeout.expire();
} else {
// 這裡的代碼注釋也說,不可能進入到這個分支
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 輪次減 1
timeout.remainingRounds --;
}
timeout = next;
}
}
源碼分析部分已經說完了,有一些簡單的分支,這裡就不花篇幅介紹了。
Worker 線程是一個 bucket 一個 bucket 順次處理的,是以,即使有些任務執行時間超過了 100ms,“霸占”了之後好幾個 bucket 的處理時間,也沒關系,這些任務并不會被漏掉。但是有可能被延遲執行,畢竟工作線程是單線程。
說個有意思的點
在送出任務的
newTimeout
方法中,調用了啟動線程的
start()
方法,它會保證線程真的啟動以後并且指派完了
startTime
以後,
start()
方法再傳回。因為在 newTimeout 方法的後半段中需要一個正确的 startTime。
看下面兩個代碼片段。送出任務的線程:
public void start() {
......
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
工作線程
Worker
:
public void run() {
// 初始化 startTime
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// 這裡用了一個 CountDownLatch 執行個體來通信
startTimeInitialized.countDown();
......
}
這裡 startTime 是 volatile 修飾的屬性,為了保證它的可見性。
可是大家有沒有發現,其實有 startTimeInitialized 這個 CountDownLatch 執行個體,就能保證這裡的并發先後問題。
這裡簡單讨論兩個點:
1、第一個代碼片段中的 while 換成 if 行不行?
不行。
Object 中的 wait/notify 存在作業系統的假喚醒,是以一般都在 while 循環裡,但是這裡的 CountDownLatch 不會,是以看上去這裡的 while 是沒必要的。但是 CountDownLatch 沒有提供
方法,是以這裡其實在處理線程中斷的情況。
awaitUninterruptibly()
2、這裡 startTime 屬性不用 volatile 修飾行不行?
個人認為是可以的。
因為 CountDownLatch 提供了語義:在
之前的操作 happens-before
countDown()
await()
後的操作。
如果是我了解錯誤,歡迎大家指正。
小結
HashedWheelTimer 的源碼相對簡單,它的算法設計比較有意思。
我再把這個圖放到這裡,大家可以仔細回顧一下它的工作流程是怎樣的。
當然,本文的重點是分析主幹部分,而對于一些分支并沒有進行分析,因為它們相對比較簡單。
轉載:https://javadoop.com/post/HashedWheelTimer