天天看點

如何讓Task在非線程池線程中執行?

作者:opendotnet

Task承載的操作需要被排程才能被執行,由于.NET預設采用基于線程池的排程器,是以Task預設線上程池線程中執行。但是有的操作并不适合使用線程池,比如我們在一個ASP.NET Core應用中承載了一些需要長時間執行的背景操作,由于線程池被用來處理HTTP請求,如果這些背景操作也使用線程池來排程,就會造成互相影響。在這種情況下,使用獨立的一個或者多個線程來執行這些背景操作可能是一個更好的選擇。

一、基于線程池的排程

二、TaskCreationOptions.LongRunning

三、換成異步操作呢?

四、換種寫法呢?

五、調用Wait方法

六、自定義TaskScheduler

七、獨立線程池

一、基于線程池的排程

我們通過如下這個簡單的程式來驗證預設基于線程池的Task排程。我們調用Task類型的靜态屬性Factory傳回一個TaskFactory對象,并調用其StartNew方法啟動一個Task對象,這個Task指向的Run方法會在一個循環中調用Do方法。Do方法使用自旋等待的方式模拟一段耗時2秒的操作,并在控制台輸出目前線程的IsThreadPoolThread屬性确定是否是線程池線程。

Task.Factory.StartNew(Run);
Console.Read();

void Run()
{
while (true)
 {
 Do();
 }
}

void Do()
{
var end = DateTime.UtcNow.AddSeconds(2);
 SpinWait.SpinUntil(
 () => DateTimeOffset.UtcNow > end);
var isThreadPoolThread = Thread.CurrentThread
 .IsThreadPoolThread;
 Console.WriteLine(
$"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}
           

通過如下所示的輸出結果,我們得到了答案:利用TaskFactory建立的Task在預設情況下确實是通過線程池的形式被排程的。

如何讓Task在非線程池線程中執行?

二、TaskCreationOptions.LongRunning

很明顯,上述Run方法是一個需要永久執行的LongRunning操作,并不适合使用線程池來執行,實際上TaskFactory在設計的時候就考慮到了這一點,我們利用它建立一個Task的時候可以指定對應的TaskCreationOptions選項,其中一個選項就是LongRuning。我們通過如下的方式修改了上面這段程式,在調用StartNew方法時指定了這個選項。

Task.Factory.StartNew(Run, 
TaskCreationOptions.LongRunning);
Console.Read();

void Run()
{
while (true)
 {
 Do();
 }
}

void Do()
{
var end = DateTime.UtcNow.AddSeconds(2);
 SpinWait.SpinUntil(
 () => DateTimeOffset.UtcNow > end);
var isThreadPoolThread = Thread.CurrentThread
 .IsThreadPoolThread;
 Console.WriteLine(
$"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}
           

再次執行我們的程式,就會通過如下的輸出結果看到Do方法将不會線上程池線程中執行了。

如何讓Task在非線程池線程中執行?

三、換成異步操作呢?

由于LongRunning操作經常會涉及IO操作,是以我們執行方法經常會寫成異步的形式。如下所示的代碼中,我們将Do方法替換成DoAsync,将2秒的自旋等待替換成Task.Delay。由于DoAsync寫成了異步的形式,Run也換成對應的RunAsync。

Task.Factory.StartNew(RunAsync, 
 TaskCreationOptions.LongRunning);
Console.Read();

async Task RunAsync()
{
while (true)
 {
await DoAsync();
 }
}

async Task DoAsync()
{
await Task.Delay(2000);
var isThreadPoolThread = Thread
 .CurrentThread.IsThreadPoolThread;
 Console.WriteLine(
$"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}
           

再次啟動程式後,我們發現又切換成了線程池排程了。為什麼會這樣呢?其實很好了解,由于原來傳回void的Run方法被替換成了傳回Task的RunAsync,傳入StartNew方法表示執行操作的委托類型從Action切換成了Func<Task>,雖然我們指定了LongRunning選項,但是StartNew方法隻是采用這種模式執行Func<Task>這個委托對象而已,而這個委托在遇到await的時候就傳回了。至于傳回的Task對象,還是按照預設的方式進行排程執行。

如何讓Task在非線程池線程中執行?

四、換種寫法呢?

有人說,上面我們使用的是一個方法來表示作為參數的委托對象,如果我們按照如下的方式使用基于async/await的Lambda表達式呢?實際上這樣的Lambda表達式就是Func<Task>的另一種程式設計方式而已。

Task.Factory.StartNew(
async () => { while (true) await DoAsync();}, 
 TaskCreationOptions.LongRunning);
Console.Read();


async Task DoAsync()
{
await Task.Delay(2000);
var isThreadPoolThread = Thread.CurrentThread
 .IsThreadPoolThread;
 Console.WriteLine(
$"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}
           

五、調用Wait方法

其實這個問題很好解決,按照如下的方式将DoAsync方法換成同步形式的Do,将基于await的等待替換成針對Wait方法的調用就可以了。我想當你接觸Task的時候,就有很多人不斷提醒你,謹慎使用Wait方法,因為它會阻塞目前線程。實際上對于我們的目前的應用場景,調用Wait方法才是正确的選擇,因為我們的初衷就是使用一個獨立的線程以獨占的方式來執行背景操作。

Task.Factory.StartNew(
 () => { while (true) Do(); }, 
 TaskCreationOptions.LongRunning);
Console.Read();

void Do()
{
 Task.Delay(2000).Wait();
var isThreadPoolThread = Thread.CurrentThread
 .IsThreadPoolThread;
 Console.WriteLine(
$"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}
           

六、自定義TaskScheduler

既然針對線程池的使用是“Task排程”導緻的,我們自然可以通過重寫TaskScheduler的方式來解決這個問題。如下這個自定義的DedicatedThreadTaskScheduler 會采用獨立的線程來執行被排程的Task,線程的數量可以參數來指定。

internal sealed class DedicatedThreadTaskScheduler 
 : TaskScheduler
{
private readonly BlockingCollection<Task> _tasks = new();
private readonly Thread[] _threads;
protected override IEnumerable<Task>? GetScheduledTasks() 
 => _tasks;
protected override void QueueTask(Task task) 
=> _tasks.Add(task);
protected override bool TryExecuteTaskInline(
 Task task, 
bool taskWasPreviouslyQueued) 
=> false;
public DedicatedThreadTaskScheduler(int threadCount)
{
 _threads = new Thread[threadCount];
for (int index = 0; index < threadCount; index++)
 {
 _threads[index] = new Thread(_ =>
 {
while (true)
 {
 TryExecuteTask(_tasks.Take());
 }
 });
 }
 Array.ForEach(_threads, it => it.Start());
 }
}
           

我們示範執行個體中Run/Do方法再次還原成如下所示的純異步模式的RunAsync/DoAsync,并在調用StartNew方法的時候建立一個DedicatedThreadTaskScheduler對象作為最後一個參數。

Task.Factory.StartNew(
 RunAsync, 
 CancellationToken.None, 
 TaskCreationOptions.LongRunning, 
 new DedicatedThreadTaskScheduler(1));
Console.Read();

async Task RunAsync()
{
while (true)
 {
await DoAsync();
 }
}

async Task DoAsync()
{
await Task.Delay(2000);
var isThreadPoolThread = Thread.CurrentThread
 .IsThreadPoolThread;
 Console.WriteLine(
$"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}
           

由于建立的Task将會使用指定的DedicatedThreadTaskScheduler 對象來排程,DoAsync方法自然就不會線上程池線程中執行了。

如何讓Task在非線程池線程中執行?

七、獨立線程池

.NET提供的線程池是一個全局共享的線程池,而我們定義的DedicatedThreadTaskScheduler相當于建立了一個獨立的線程池,對象池的效果可以通過如下這個簡單的程式展現出來。

Task.Factory.StartNew(
 ()=> Task.WhenAll( Enumerable.Range(1,6).Select(it=>DoAsync(it))),
 CancellationToken.None,
 TaskCreationOptions.None,
new DedicatedThreadTaskScheduler(2));

async Task DoAsync(int index)
{
await Task.Yield();
 Console.WriteLine(
$"[{DateTimeOffset.Now.ToString("hh:MM:ss")}]Task {index} 
 is executed in thread {Environment.CurrentManagedThreadId}");
var endTime = DateTime.UtcNow.AddSeconds(4);
 SpinWait.SpinUntil(() => DateTime.UtcNow > endTime);
await Task.Delay(1000);
}
Console.ReadLine();
           

如上面的代碼片段所示,異步方法DoAsync利用自旋等待模拟了一段耗時4秒的操作,通過調用Task.Delay方法模拟了一段耗時1秒的IO操作。我們在其中輸出了任務開始執行的時間和目前線程ID。在調用的StartNew方法中,我們調用這個DoAsync方法建立了6個Task,這些Task交給建立的DedicatedThreadTaskScheduler進行排程。我們為這個DedicatedThreadTaskScheduler指定的線程數量為2。從如下所示的輸出結果可以看出,6個操作确實在兩個線程中執行的。

如何讓Task在非線程池線程中執行?