天天看點

走進Task(1):什麼是Task

目錄

  • 前言
  • 從表象講起
    • Task 從何而來
    • Task 常見用法
    • Task 的分類
      • 按是否包含 Result 分,也就是是否是泛型 Task
      • 按得到 Task 的方式,可以分為
    • 對 Task 進行分解
  • Task 在哪執行?
    • 線程池
    • 一個獨立的背景線程中
    • 自定義的TaskScheduler裡
    • Task 可以封裝任何類型的别的任務
  • 小結

本系列會拆分為以下幾篇分次進行叙述:

  1. 什麼是 Task(本文)
  2. Task 的回調執行與 await(TODO)
  3. async 到底幹了什麼(TODO)
  4. 總結與常見誤區(TODO)

在 2 中,會和大家分享死鎖相關的問題。2 和 3 中會穿插自定義 Awaitable 的話題。

本系列會直接引用前一篇部落格概述 .NET 6 ThreadPool 實作 裡的結論,是以請沒看過的同學先麻煩看下。

文中所有例子均出于解釋目的,并非具有實際意義的代碼。有傳回值的 Task 和無傳回值的 Task 實際差別不是很大,下文大多數舉例不做特别區分。不糾結 api 的使用細節,隻講 Task 的整體設計思路。

代碼運作截圖是在 .NET 6 中的,其他版本的設計沒有大的改動,不影響學習。

筆者解讀并非權威解讀,隻是希望能給大家一個了解 Task 的方法。

以下僅做典型舉例,并非全部

  • new Task
new Task(_ =>
{
    Console.WriteLine("Hello World!");
}, null).Start();
           
  • TaskFactory.StartNew
new TaskFactory().StartNew(() =>
{
    Console.WriteLine("Hello World!");
});
           
  • Task.Run
Task.Run(() =>
{
    Console.WriteLine("Hello World!");
});
           
  • Task.FromResult 等直接建立一個已完成的 Task
Task.FromResult("Hello World!");
var task = Task.CompletedTask;
           
  • 某個不知道其内部實作的 async 方法
async Task<Bar> FooAsync();
           

  • 注冊一個回調,等待 Task 執行完成時擷取結果并執行回調
var task = Task.Run<string>(() => "Hello World!");
task.ContinueWith(t => Console.WriteLine(t.Result));
           
  • await 一個 Task 并得到結果
var task = Task.Run<string>(() => "Hello World!");
var result = await task;
Console.WriteLine(result);
           
  • 直接 GetResult
var task = Task.Run<string>(() => "Hello World!");
// 等效于 task.Result
var result = task.GetAwaiter().GetResult();
Console.WriteLine(result);
           

  • Task

  • Task<T>

  • 我知道這個 Task 是怎麼來的,這種情況下,我們自己參與了 Task 的建立過程,知道這個 Task 是在幹啥。比如:
Task task = Task.Run<int>(() => 1 + 2);
           

計算 1 + 2,并将結果作為 Task 的結果。

  • 不知道這個 Task 是怎麼來的。比如:
Task task = new HttpClient().GetStringAsync("http://localhost:5000/api/values");
           

而這兩種擷取方式的不同對應的是兩種完全不同的側重點:

  1. Task 是一個白盒,關注 Task 裡幹了什麼,在哪執行裡面這些代碼。
  2. Task 是一個黑盒,關注 Task 能給到我什麼,Task 完成執行之後,我該幹什麼。

按功能點可以将 Task 分為三個部分

  • 任務執行:通過 Task.Run 等方式執行一段我們自定義的邏輯。
  • 回調通知及回調執行:注冊一個回調,等待 Task 完成時執行。
  • await 文法支援:脫離了 await,task 的上述兩個功能依舊可以完整執行。但卻會喪失代碼的簡潔性。

Task 可以作為 ThreadPool 隊列系統的基本單元被 ThreadPool 排程執行。

下面這些常見的建立 Task 的方式,預設情況都是在 ThreadPool 中被排程執行的,這幾個本質上是一樣的,隻是使用方式上和可支援傳入的自定義選項上的差別。

new Task(_ =>
{
    Console.WriteLine("Hello World!");
}, null).Start();
           
new TaskFactory().StartNew(() =>
{
    Console.WriteLine("Hello World!");
});
           
// 可以看做簡化版的 TaskFactory.StartNew
Task.Run(() =>
{
    Console.WriteLine("Hello World!");
});
           

Task.Run

為例來看下裡面到底做了些什麼。

PortableThreadPool.TryCreateWorkerThread

和實際要要執行的 lambda 表達式中打上斷點,我們便可以清晰的看到整個執行過程。

走進Task(1):什麼是Task
走進Task(1):什麼是Task

整理一下的話,主要就是這個樣子,為簡化了解,ThreadPool 中的調用細節已省略。

走進Task(1):什麼是Task

Task 關鍵代碼摘錄:

class Task
{
    // 任務的主體,我們要執行的實際邏輯
    // 可能有傳回值,可能沒有
    internal Delegate m_action;

    // 任務的狀态
    internal volatile int m_stateFlags;

    // ThreadPool 調用入口,由于 JIT 的内聯優化,調用棧裡隻能看到 ExecuteEntryUnsafe,看不到這個方法
    internal virtual void ExecuteFromThreadPool(Thread threadPoolThread) => ExecuteEntryUnsafe(threadPoolThread);

    internal void ExecuteEntryUnsafe(Thread? threadPoolThread)
    {
        // 設定 Task 狀态為已經執行
        m_stateFlags |= (int)TaskStateFlags.DelegateInvoked;

        if (!IsCancellationRequested & !IsCanceled)
        {
            ExecuteWithThreadLocal(ref t_currentTask, threadPoolThread);
        }
        else
        {
            ExecuteEntryCancellationRequestedOrCanceled();
        }
    }

    // 建立 Task 的時候可傳入的資料,用于執行時使用
    // new Task(state => Console.WriteLine(state), "Hello World").Start();
    internal object? m_stateObject;

    private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null)
    {
        // 執行上下文維護着代碼執行邏輯上下文的一些資料,如 AsyncLocal
        // 具體請看我的 AsyncLocal 部落格 https://www.cnblogs.com/eventhorizon/p/12240767.html
        ExecutionContext? ec = CapturedContext;
        if (ec == null)
        {
            // 沒有執行上下文,直接執行
            InnerInvoke();
        }
        else
        {
            // 是否是在 ThreadPool 線程上執行
            if (threadPoolThread is null)
            {
                ExecutionContext.RunInternal(ec, s_ecCallback, this);
            }
            else
            {
                ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, ec, s_ecCallback, this);
            }
        }
    }

    // 不管 ExecuteWithThreadLocal 分支如何,最後會走到 InnerInvoke
    internal virtual void InnerInvoke()
    {
        if (m_action is Action action)
        {
            action();
            return;
        }

        if (m_action is Action<object?> actionWithState)
        {
            actionWithState(m_stateObject);
        }
    }
}
           

可以看到 Task 以 ThreadPoolTaskScheduler 為媒介,進入了 ThreadPool。ThreadPool 調用 Task.ExecuteFromThreadPool 方法最終觸發 Task 所封裝的 action 的執行。

與 ThreadPool 中另一種基本單元 IThreadPoolWorkItem 一樣,Task 在進入 ThreadPoolWorkQueue 時會有兩種可能,進入全局隊列或者本地隊列。

了解這個問題,我們需要看一下 ThreadPoolTaskScheduler.QueueTask 裡做了些什麼。

internal sealed class ThreadPoolTaskScheduler : TaskScheduler
{
    protected internal override void QueueTask(Task task)
    {
        TaskCreationOptions options = task.Options;
        if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
        {
            // 建立獨立線程,和線程池無關
            new Thread(s_longRunningThreadWork)
            {
                IsBackground = true,
                Name = ".NET Long Running Task"
            }.UnsafeStart(task);
        }
        else
        {
            // 第二個參數是 preferLocal
            // options & TaskCreationOptions.PreferFairness 這個位标志的枚舉用法可檢視官方資料
            // https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/builtin-types/enum#enumeration-types-as-bit-flags
            ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
        }
    }
}
           

上面代碼裡的 TaskCreationOptions 是我們在建立 Task 的時候可以指定的一個選項,預設是 None。

Task.Run 不支援傳入該選項,可使用

TaskFactory.StartNew

的重載進行指定:

new TaskFactory().StartNew(() =>
{
    Console.WriteLine("Hello World!");
}, TaskCreationOptions.PreferFairness);
           

根據 TaskCreationOptions 的不同,出現了三個分支

  • LongRunning:獨立線程,和線程池無關
  • 包含 PreferFairness時:preferLocal=false,進入全局隊列
  • 不包含 PreferFairness時:preferLocal=ture,進入本地隊列

進入全局隊列的任務能夠公平地被各個線程池中的線程領取執行,也是就是

prefer fairness

這個詞組的字面意思了。

下圖中 Task666 先進入全局隊列,随後被 Thread1 領走。Thread3 通過 WorkStealing 機制竊取了 Thread2 中的 Task2。

走進Task(1):什麼是Task

也就是上文提到的建立 Task 時使用

TaskCreationOptions.LongRunning

,如果你需要一個執行一個長時間的任務,比如一段耗時很久的同步代碼,就可以使用這個。執行異步代碼(指 await xxx)時不推薦使用,後面會講原因。

new TaskFactory().StartNew(() =>
{
    // 耗時較長的同步代碼
}, TaskCreationOptions.LongRunning);
           

ThreadPool 管理的線程是出于可複用的目的設計的,不停地從隊列系統中領取任務執行。如果一個 WorkThread 阻塞在一個耗時較長的任務上,它就沒辦法處理其他任務,ThreadPool 的吞吐率會受影響。

當然并不意味着 ThreadPool 不能處理這樣的任務。舉個極端的例子,如果線程池目前的 WorkThread 全在處理 LongRunning Task。在 Starvation Avoidance 機制(每隔500ms)建立新的 WorkThread 之前,ThreadPool 沒法執行新的任務。

LongRunning 的 Task 生命周期與 ThreadPool 設計目的不符合,是以需獨立開來。

除了

ThreadPoolTaskScheduler

外,我們還可以定義自己的

TaskScheduler

首先需要繼承

TaskScheduler

這個抽象類,有三個抽象方法需要我們實作。

public abstract class TaskScheduler
{
    // 入口,待排程執行的 Task 會通過該方法傳入
    protected internal abstract void QueueTask(Task task);

    // 這個是在執行 Task 回調的時候才會被執行到的方法,放到後面再講
    protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);

    // 擷取所有排程到該 TaskScheduler 的 Task
    protected abstract IEnumerable<Task>? GetScheduledTasks();
}
           

在我們自定義的 TaskScheduler 裡,在 QueueTask 被執行時會拿到 Task,但是 Task 要怎麼去觸發裡面的 action 呢。

Task 針對 ThreadPool 的調用場景暴露了一個 ExecuteFromThreadPool 的 internal 方法,同時也提供了一個 ExecuteEntry 方法供其他場景調用,但是這個方法也是 internal 的。隻能通過 TaskScheduler 的 protect 方法進行間接調用。

public abstract class TaskScheduler
{
    protected bool TryExecuteTask(Task task)
    {
        if (task.ExecutingTaskScheduler != this)
        {
            throw new InvalidOperationException(SR.TaskScheduler_ExecuteTask_WrongTaskScheduler);
        }

        return task.ExecuteEntry();
    }
}


下面是一個自定義的 TaskScheduler,在一個固定的線程上順序執行 Task。
```C#
class CustomTaskScheduler : TaskScheduler
{
    private readonly BlockingCollection<Task> _queue = new();

    public CustomTaskScheduler()
    {
        new Thread(() =>
        {
            while (true)
            {
                var task = _queue.Take();
                Console.WriteLine($"task {task.Id} is going to be executed");
                TryExecuteTask(task);
                Console.WriteLine($"task {task.Id} has been executed");
            }
        })
        {
            IsBackground = true
        }.Start();
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _queue.ToArray();
    }

    protected override void QueueTask(Task task)
    {
        _queue.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }
}
           

在 TaskFactory 的構造函數中可以傳入我們自定義的 TaskScheduler

var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() =>
    Console.WriteLine($"task {Task.CurrentId}" +
                      $" threadId: {Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() =>
    Console.WriteLine($"task {Task.CurrentId}" +
                      $" threadId: {Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();
           

輸出結果如下:

var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() =>
    Console.WriteLine($"task {Task.CurrentId}" +
                      $" threadId: {Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() =>
    Console.WriteLine($"task {Task.CurrentId}" +
                      $" threadId: {Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();
           
task 1 is going to be executed
task 1 threadId: 10
task 1 has been executed
task 2 is going to be executed
task 2 threadId: 10
task 2 has been executed
           

所有的 Task 都會在一個線程裡被排程執行。

上面兩種情況,Task 都存在明确的執行實體,但有時候,可能是沒有的。看下面這樣的例子。

var task = FooAsync();
var action = typeof(Task).GetField("m_action", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(task);
Console.WriteLine($"Task action is null: {action == null}");
task.ContinueWith(t => Console.WriteLine(t.Result));
// 回調可以注冊多個
task.ContinueWith(t => Console.WriteLine(t.Result));


Task<string> FooAsync()
{
    var tsc = new TaskCompletionSource<string>();
    new Thread(() =>
    {
        Thread.Sleep(1000);
        tsc.SetResult("Hello World");
    })
    {
        IsBackground = true
    }.Start();
    return tsc.Task;
}
           

輸出:

Task action is null: True
Hello World
Hello World
           

從 FooAsync 外部和内部兩個角度來看這個問題

  • FooAsync 外:拿到了一個 Task 并注冊了回調
  • FooAsync 内:相當于間接的持有了這個回調,并通過 tsc.SetResult 間接地調用了這個回調。

下面是關鍵代碼的摘錄

class Task
{
    // 儲存一個或一組回調
    private volatile object? m_continuationObject;

    internal void FinishContinuations()
    {
        // 處理回調的執行
    }
}

class Task<T> : Task
{
    internal bool TrySetResult(TResult result)
    {
        // ...
        this.m_result = result;
        // 複用父類的邏輯
        FinishContinuations();
        // ...
    }
}

public class TaskCompletionSource<TResult>
{
    public TaskCompletionSource() => _task = new Task<TResult>();

    public Task<TResult> Task => _task;

    public void SetResult(TResult result)
    {
        TrySetResult(result);
    }

    public bool TrySetResult(TResult result)
    {
        _task.TrySetResult(result);
        // ...
    }
}
           

有時候 Task.TrySetResult() 的觸發源可能是一個異步IO完成事件導緻的,也就是我們常說的異步IO,硬體有自己的處理晶片,在異步IO完成通知CPU(硬體中斷 hardware interrupt)之前,CPU并不需要參與,這也是異步IO的價值所在。

Task 是個已經完成或者将在未來某個時間點完成的任務,可以向其注冊一個回調等待任務完成時被執行。

繼續閱讀