天天看點

HashedWheelTimer 使用及源碼分析

本文介紹的 HashedWheelTimer 是來自于 Netty 的工具類,在 netty-common 包中。它用于實作延時任務。另外,下面介紹的内容和 Netty 無關。

如果你看過 Dubbo 的源碼,一定會在很多地方看到它。在需要失敗重試的場景中,它是一個非常友善好用的工具。

本文将會介紹 HashedWheelTimer 的使用,以及在後半部分分析它的源碼實作。

接口概覽

在介紹它的使用前,先了解一下它的接口定義,以及和它相關的類。

HashedWheelTimer

是接口

io.netty.util.Timer

的實作,從面向接口程式設計的角度,我們其實不需要關心 HashedWheelTimer,隻需要關心接口類 Timer 就可以了。這個 Timer 接口隻有兩個方法:

public interface Timer {

    // 建立一個定時任務
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    // 停止所有的還沒有被執行的定時任務
    Set<Timeout> stop();
}
           

Timer 是我們要使用的任務排程器,我們可以從方法上看出,它送出一個任務 TimerTask,傳回的是一個 Timeout 執行個體。是以這三個類之間的關系大概是下面這樣的:

HashedWheelTimer 使用及源碼分析

TimerTask 非常簡單,就一個

run()

方法:

public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}
           

當然這裡有點意思的是,它把 Timeout 的執行個體也傳進來了,我們平時的代碼習慣,都是單向依賴。

這樣做也有好處,那就是在任務執行過程中,可以通過 timeout 執行個體來做點其他的事情。

Timeout 也是一個接口類:

public interface Timeout {
    Timer timer();
    TimerTask task();
    boolean isExpired();
    boolean isCancelled();
    boolean cancel();
}
           

它持有上層的 Timer 執行個體,和下層的 TimerTask 執行個體,然後取消任務的操作也在這裡面。

HashedWheelTimer 使用

有了第一節介紹的接口資訊,其實我們很容易就可以使用它了。我們先來随意寫幾行:

// 構造一個 Timer 執行個體
Timer timer = new HashedWheelTimer();

// 送出一個任務,讓它在 5s 後執行
Timeout timeout1 = timer.newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) {
        System.out.println("5s 後執行該任務");
    }
}, 5, TimeUnit.SECONDS);

// 再送出一個任務,讓它在 10s 後執行
Timeout timeout2 = timer.newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) {
        System.out.println("10s 後執行該任務");
    }
}, 10, TimeUnit.SECONDS);

// 取消掉那個 5s 後執行的任務
if (!timeout1.isExpired()) {
    timeout1.cancel();
}

// 原來那個 5s 後執行的任務,已經取消了。這裡我們反悔了,我們要讓這個任務在 3s 後執行
// 我們說過 timeout 持有上、下層的執行個體,是以下面的 timer 也可以寫成 timeout1.timer()
timer.newTimeout(timeout1.task(), 3, TimeUnit.SECONDS);
           

通過這幾行代碼,大家就可以非常熟悉這幾個類的使用了,因為它們真的很簡單。

我們來看一下 Dubbo 中的一個例子。

下面這個代碼修改自 Dubbo 的叢集調用政策

FailbackClusterInvoker

中:

它在調用 provider 失敗以後,傳回空結果給消費端,然後由背景線程執行定時任務重試,多用于消息通知這種場景。
public class Application {

    public static void main(String[] args) {
        Application app = new Application();
        app.invoke();
    }

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    private volatile Timer failTimer = null;

    public void invoke() {
        try {
            doInvoke();
        } catch (Throwable e) {
            log.error("調用 doInvoke 方法失敗,5s 後将進入背景的自動重試,異常資訊: ", e);
            addFailed(() -> doInvoke());
        }
    }

    // 實際的業務實作
    private void doInvoke() {
        // 這裡讓這個方法故意失敗
        throw new RuntimeException("故意抛出異常");
    }

    private void addFailed(Runnable task) {
        // 延遲初始化
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    failTimer = new HashedWheelTimer();
                }
            }
        }
        RetryTimerTask retryTimerTask = new RetryTimerTask(task, 3, 5);
        try {
            // 5s 後執行第一次重試
            failTimer.newTimeout(retryTimerTask, 5, TimeUnit.SECONDS);
        } catch (Throwable e) {
            log.error("送出定時任務失敗,exception: ", e);
        }
    }
}
           

下面是裡面使用到的 RetryTimerTask 類,當然,你也可以選擇寫成内部類:

public class RetryTimerTask implements TimerTask {

    private static final Logger log = LoggerFactory.getLogger(RetryTimerTask.class);

    // 每隔幾秒執行一次
    private final long tick;

    // 最大重試次數
    private final int retries;

    private int retryTimes = 0;

    private Runnable task;

    public RetryTimerTask(Runnable task, long tick, int retries) {
        this.tick = tick;
        this.retries = retries;
        this.task = task;
    }

    @Override
    public void run(Timeout timeout) {
        try {
            task.run();
        } catch (Throwable e) {
            if ((++retryTimes) >= retries) {
                // 重試次數超過了設定的值
                log.error("失敗重試次數超過門檻值: {},不再重試", retries);
            } else {
                log.error("重試失敗,繼續重試");
                rePut(timeout);
            }
        }
    }

    // 通過 timeout 拿到 timer 執行個體,重新送出一個定時任務
    private void rePut(Timeout timeout) {
        if (timeout == null) {
            return;
        }
        Timer timer = timeout.timer();
        if (timeout.isCancelled()) {
            return;
        }
        timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
    }
}
           

上面的代碼也非常簡單,在調用

doInvoke()

方法失敗以後,送出一個定時任務在 5s 後執行重試,如果還是失敗,之後每 3s 重試一次,最多重試 5 次,如果重試 5 次都失敗,記錄錯誤日志,不再重試。

列印的日志如下:

15:47:36.232 [main] ERROR c.j.n.timer.Application - 調用 doInvoke 方法失敗,5s 後将進入背景的自動重試,異常資訊: 
java.lang.RuntimeException: 故意抛出異常
    at com.javadoop.nettylearning.timer.Application.doInvoke(Application.java:36)
    at com.javadoop.nettylearning.timer.Application.invoke(Application.java:28)
    at com.javadoop.nettylearning.timer.Application.main(Application.java:19)
15:47:41.793 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:44.887 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:47.986 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:51.084 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重試失敗,繼續重試
15:47:54.186 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 失敗重試次數超過門檻值: 5,不再重試
           

HashedWheelTimer 的使用确實非常簡單,如果你是來學習怎麼使用它的,那麼看到這裡就可以了。

HashedWheelTimer 源碼分析

大家肯定都知道或聽說過,它用的是一個叫做時間輪(下載下傳算法介紹PPT)的算法,看下面我畫的圖:

HashedWheelTimer 使用及源碼分析

我這裡先說說大緻的執行流程,之後再進行細緻的源碼分析。

預設地,時鐘每 100ms 滴答一下(tick),往前走一格,共 512 格,走完一圈以後繼續下一圈。把它想象成生活中的鐘表就可以了。

内部使用一個長度為 512 的數組存儲,數組元素(bucket)的資料結構是連結清單,連結清單每個元素代表一個任務,也就是我們前面介紹的 Timeout 的執行個體。

送出任務的線程,隻要把任務往虛線上面的任務隊列中存放即可傳回。工作線程是單線程,一旦開啟,不停地在時鐘上繞圈圈。

仔細看下面的介紹:

  • 工作線程到達每個時間整點的時候,開始工作。在 HashedWheelTimer 中,時間都是相對時間,工作線程的啟動時間,定義為時間的 0 值。因為一次 tick 是 100ms(預設值),是以 100ms、200ms、300ms… 就是我說的這些整點。
  • 如上圖,當時間到 200ms 的時候,發現任務隊列有任務,取出所有的任務。
  • 按照任務指定的執行時間,将其配置設定到相應的 bucket 中。如上圖中,任務2 和任務6指定的時間為 100ms~200ms 這個區間,就被配置設定到第二個 bucket 中,形成連結清單,其他任務同理。
    這裡還有輪次的概念,不過不用着急,比如任務 6 指定的時間可能是 150ms + (512*100ms),它也會落在這個 bucket 中,但是它是下一個輪次才能被執行的。
  • 任務配置設定到 bucket 完成後,執行該次 tick 的真正的任務,也就是落在第二個 bucket 中的任務 2 和任務 6。
  • 假設執行這兩個任務共消耗了 50ms,到達 250ms 的時間點,那麼工作線程會休眠 50ms,等待進入到 300ms 這個整點。
    如果這兩個任務執行的時間超過 100ms 怎麼辦?這個問題就要看源碼來解答了。

開始源碼分析。我們從它的預設構造器開始,一步步到達最後一個最複雜的構造器:

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts) {
  ......
}
           

簡單說一下各個參數:

  • threadFactory:定時任務都是背景任務,需要開啟線程,我們通常會通過自定義 threadFactory 來命名線程,嫌麻煩就使用

    Executors.defaultThreadFactory()

  • tickDuration 和 timeUnit 定義了一格的時間長度,預設的就是 100ms。
  • ticksPerWheel 定義了一圈有多少格,預設的就是 512;
  • leakDetection:用于追蹤記憶體洩漏,本文不會介紹它,感興趣的讀者請自行去了解它。
  • maxPendingTimeouts:最大允許等待的 Timeout 執行個體數,也就是我們可以設定不允許太多的任務等待。如果未執行任務數達到門檻值,那麼再次送出任務會抛出 RejectedExecutionException 異常。預設不限制。

初始化 HashedWheelTimer

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts) {

    // ...... 參數檢查

    // 初始化時間輪,這裡做了向上"取整",保持數組長度為 2 的 n 次方
    wheel = createWheel(ticksPerWheel);
    // 掩碼,用來做取模
    mask = wheel.length - 1;

    // 100ms 轉換為納秒 100*10^6
    this.tickDuration = unit.toNanos(tickDuration);

    // 防止溢出
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    }
    // 建立工作線程,這裡沒有啟動線程。後面會看到,在第一次送出任務的時候會啟動線程
    workerThread = threadFactory.newThread(worker);

    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

    // 指派最大允許等待任務數
    this.maxPendingTimeouts = maxPendingTimeouts;

    // 如果超過 64 個 HashedWheelTimer 執行個體,它會列印錯誤日志提醒你
    // Netty 是真的到位,就怕你會用錯這個工具,到處執行個體化它。而且它隻會報錯一次。
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}
           

上面,HashedWheelTimer 完成了初始化,初始化了時間輪數組

HashedWheelBucket[]

,稍微看一下内部類

HashedWheelBucket

,可以看到它是一個連結清單的結構。這個很好了解,因為每一格可能有多個任務。

送出第一個任務

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    // 校驗等待任務數是否達到門檻值 maxPendingTimeouts
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }

    // 如果工作線程沒有啟動,這裡負責啟動
    start();

    /** 下面的代碼,建構 Timeout 執行個體,将其放到任務隊列中。 **/

    // deadline 是一個相對時間,相對于 HashedWheelTimer 的啟動時間
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // timeout 執行個體,一個上層依賴 timer,一個下層依賴 task,另一個是任務到期時間
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // 放到 timeouts 隊列中
    timeouts.add(timeout);
    return timeout;
}
           

送出任務的操作非常簡單,執行個體化 Timeout,然後放到任務隊列中。

我們可以看到,這裡使用的優先級隊列是一個 MPSC(Multiple Producer Single Consumer)的隊列,剛好适用于這裡的多生産線程,單消費線程的場景。而在 Dubbo 中,使用的隊列是 LinkedBlockingQueue,它是一個以連結清單方式組織的線程安全的隊列。

另外就是注意這裡調用的

start()

方法,如果該任務是第一個送出的任務,它會負責工作線程的啟動。

工作線程開始工作

其實隻要看懂下面的幾行代碼,HashedWheelTimer 的源碼就非常簡單了。

private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    // tick 過的次數,前面說過,時針每 100ms tick 一次
    private long tick;

    @Override
    public void run() {
        // 在 HashedWheelTimer 中,用的都是相對時間,是以需要啟動時間作為基準,并且要用 volatile 修飾
        startTime = System.nanoTime();
        if (startTime == 0) {
            // 這裡不是很看得懂...請知道的讀者不吝賜教
            startTime = 1;
        }

        // 第一個送出任務的線程正 await 呢,喚醒它
        startTimeInitialized.countDown();

        // 接下來這個 do-while 是真正執行任務的地方,非常重要
        do {
            // 往下滑,就在目前的代碼塊裡面,倒數第二個方法
            // 比如之前介紹的圖,那傳回值 deadline 就是 200ms
            final long deadline = waitForNextTick();

            if (deadline > 0) {
                // 該次 tick,bucket 數組對應的 index
                int idx = (int) (tick & mask);

                // 處理一下已經取消的任務,可忽略。
                processCancelledTasks();

                // bucket
                HashedWheelBucket bucket = wheel[idx];

                // 将隊列中所有的任務轉移到相應的 buckets 中。細節往下看,這個代碼塊的下一個方法。
                transferTimeoutsToBuckets();

                // 執行進入到這個 bucket 中的任務
                bucket.expireTimeouts(deadline);

                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        /* 到這裡,說明這個 timer 要關閉了,做一些清理工作 */

        // 将所有 bucket 中沒有執行的任務,添加到 unprocessedTimeouts 這個 HashSet 中,
        // 主要目的是用于 stop() 方法傳回
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        // 将任務隊列中的任務也添加到 unprocessedTimeouts 中
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }

    private void transferTimeoutsToBuckets() {
        // 這裡一個 for 循環,還特地限制了 10 萬次,就怕你寫錯代碼,一直往裡面扔任務,可能實際也沒什麼用吧
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // 沒有任務了
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // 該任務剛剛被取消了(在transfer之前其實已經做過一次了)
                continue;
            }

            // 下面就是将任務放到相應的 bucket 中,這裡還計算了每個任務的 remainingRounds

            long calculated = timeout.deadline / tickDuration;
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
            int stopIndex = (int) (ticks & mask);

            HashedWheelBucket bucket = wheel[stopIndex];
            // 單個 bucket,是由 HashedWheelTimeout 執行個體組成的一個連結清單,
            // 大家可以看一下這裡面的代碼,由于是單線程操作,不存在并發,是以代碼很簡單
            bucket.addTimeout(timeout);
        }
    }

    private void processCancelledTasks() {
        //... 太簡單,忽略
    }

    /**
     * 下面這個方法大家多看幾遍,注意它的傳回值
     * 前面說過,我們用的都是相對時間,是以:
     *   第一次進來的時候,工作線程會在 100ms 的時候傳回,傳回值是 100*10^6
     *   第二次進來的時候,工作線程會在 200ms 的時候傳回,依次類推
     * 另外就是注意極端情況,比如第二次進來的時候,由于被前面的任務阻塞,導緻進來的時候就已經是 250ms,
     *   那麼,一進入這個方法就要立即傳回,傳回值是 250ms,而不是 200ms
     * 剩下的自己看一下代碼
     */
    private long waitForNextTick() {
        long deadline = tickDuration * (tick + 1);

        // 嵌套在一個死循環裡面
        for (;;) {
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    // 什麼情況會進到這個分支?我也不清楚
                    return -Long.MAX_VALUE;
                } else {
                    // 這裡是出口,是以傳回值是目前時間(相對時間)
                    return currentTime;
                }
            }

            // Check if we run on windows, as if thats the case we will need
            // to round the sleepTime as workaround for a bug that only affect
            // the JVM if it runs on windows.
            //
            // See https://github.com/netty/netty/issues/356
            if (PlatformDependent.isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                // 如果 timer 已經 shutdown,那麼傳回 Long.MIN_VALUE
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

    public Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);
    }
}
           

接下來應該看看怎麼執行 bucket 中的任務:

/**
 * 這裡會執行這個 bucket 中,輪次為 0 的任務,也就是到期的任務。
 * 這個方法的入參 deadline 其實沒什麼用,因為輪次為 0 的都是應該被執行的。
 */
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // 處理連結清單上的所有 timeout 執行個體
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // 這行代碼負責執行具體的任務
                timeout.expire();
            } else {
                // 這裡的代碼注釋也說,不可能進入到這個分支
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            // 輪次減 1
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}
           

源碼分析部分已經說完了,有一些簡單的分支,這裡就不花篇幅介紹了。

Worker 線程是一個 bucket 一個 bucket 順次處理的,是以,即使有些任務執行時間超過了 100ms,“霸占”了之後好幾個 bucket 的處理時間,也沒關系,這些任務并不會被漏掉。但是有可能被延遲執行,畢竟工作線程是單線程。

說個有意思的點

在送出任務的

newTimeout

方法中,調用了啟動線程的

start()

方法,它會保證線程真的啟動以後并且指派完了

startTime

以後,

start()

方法再傳回。因為在 newTimeout 方法的後半段中需要一個正确的 startTime。

看下面兩個代碼片段。送出任務的線程:

public void start() {
    ......

    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}
           

工作線程

Worker

public void run() {
    // 初始化 startTime
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    }

    // 這裡用了一個 CountDownLatch 執行個體來通信
    startTimeInitialized.countDown();
    ......
}
           

這裡 startTime 是 volatile 修飾的屬性,為了保證它的可見性。

可是大家有沒有發現,其實有 startTimeInitialized 這個 CountDownLatch 執行個體,就能保證這裡的并發先後問題。

這裡簡單讨論兩個點:

1、第一個代碼片段中的 while 換成 if 行不行?

不行。

Object 中的 wait/notify 存在作業系統的假喚醒,是以一般都在 while 循環裡,但是這裡的 CountDownLatch 不會,是以看上去這裡的 while 是沒必要的。但是 CountDownLatch 沒有提供

awaitUninterruptibly()

方法,是以這裡其實在處理線程中斷的情況。

2、這裡 startTime 屬性不用 volatile 修飾行不行?

個人認為是可以的。

因為 CountDownLatch 提供了語義:在

countDown()

之前的操作 happens-before

await()

後的操作。

如果是我了解錯誤,歡迎大家指正。

小結

HashedWheelTimer 的源碼相對簡單,它的算法設計比較有意思。

我再把這個圖放到這裡,大家可以仔細回顧一下它的工作流程是怎樣的。

HashedWheelTimer 使用及源碼分析

當然,本文的重點是分析主幹部分,而對于一些分支并沒有進行分析,因為它們相對比較簡單。

轉載:https://javadoop.com/post/HashedWheelTimer

繼續閱讀