天天看點

C# Task 循環任務_C#并行程式設計(4):基于任務的并行原文位址:https://www.cnblogs.com/chenbaoshun/p/10621819.html

C#中的任務

Task

在C#程式設計中,實作并行可以直接使用線程,但使用起來很繁瑣;也可以使用線程池,線程池很大程度上簡化了線程的使用,但是也有着一些局限,比如我們不知道作業什麼時候完成,也取不到作業的傳回值;解決線程池局限性的方案是使用

任務

。本文将總結C#中

Task

的使用。

類似于線程池工作項對異步操作的封裝,任務是對異步操作的另一種形式的封裝,這種封裝抽象層次更高,讓我們能夠對異步操作進行更多的控制。

任務啟動後,通過任務排程器

TaskScheduler

來排程。.NET中提供兩種任務排程器,一種是線程池任務排程器,也是預設排程器,它會将任務派發給線程池工作者線程;另一種是上下文同步任務排程器,它會将任務派發給目前上下文線程,例如GUI線程。此外,我們也能自定義任務排程器,例如可以将異步IO任務派發給線程池IO線程。

Task的使用方法

隐式使用

Parallel

靜态類除了提供并行循環的各種重載,還提供了一個方法

Parallel.Invoke

。這個方法可以建立并執行一個或多個異步任務,使用方法如下:

private static void DoWork(int workId = 0){
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started work[{workId}].");
    Thread.Sleep(3000);
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] done work[{workId}].");
}




public static void ImplicitUsingOfTask(){
    Parallel.Invoke(()=>DoWork(1),()=>DoWork(2),() => DoWork(3));
}
           

上例的運作結果如下:

2019/3/27 20:40:18=> Thread[9] started work[1].

2019/3/27 20:40:18=> Thread[12] started work[3].

2019/3/27 20:40:18=> Thread[10] started work[2].

2019/3/27 20:40:21=> Thread[9] done work[1].

2019/3/27 20:40:21=> Thread[12] done work[3].

2019/3/27 20:40:21=> Thread[10] done work[2].

對于簡單的多任務并行,使用上述的方式很友善,但是這種方式與線程池一樣,我們不能控制任務的執行或者擷取任務傳回值。

顯式使用

相對于使用

Parallel.Invoke

執行并行操作,更常用的是使用

Task

Task

提供的方法進行異步和并行處理。下面是任務最基本的使用:

Task.Run(() =>
{

});
Task.Factory.StartNew(() =>
{

});
           

任務的常用操作

擷取任務的傳回值

具有傳回值的任務使用

Task

,

T

可根據我們的需求指定,下面是擷取任務傳回值的方法。

int> task = Task<
           

需要說明的是,擷取任務的結果會阻塞目前線程。

等待任務完成

有時候,我們需要等待一些任務全部完成後才能執行後續操作,有時候隻要多個任務中的一個完成了,就可以執行後續操作。

Task

提供了

Wait

WaitAll

WaitAny

等方法滿足我們的需求。下面的例子展示了各種等待方法的使用。

public static void TaskWait(){
    Stopwatch watch = new Stopwatch();

#region 場景1:等待一個任務完成
    Task task = Task.Run(() => DoWorkOfTask(1000));
    Console.WriteLine("start wait. work duration: 1000");
    watch.Start();
    task.Wait();
    watch.Stop();
    Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion

#region 場景2:等待多個任務完成
    Task[] tasks = new Task[3]
    {
        Task.Run(() => DoWorkOfTask(1000)),
        Task.Run(() => DoWorkOfTask(2000)),
        Task.Run(() => DoWorkOfTask(3000)),
    };

    Console.WriteLine("start wait all. work duration: min 1000 max 3000.");
    watch.Restart();
    Task.WaitAll(tasks);
    watch.Stop();
    Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion

#region 場景3:等待某個任務完成
    tasks = new Task[3]
    {
        Task.Run(() => DoWorkOfTask(1000)),
        Task.Run(() => DoWorkOfTask(2000)),
        Task.Run(() => DoWorkOfTask(3000)),
    };
    Console.WriteLine("start wait any. work duration: min 1000 max 3000.");
    watch.Restart();
    Task.WaitAny(tasks);
    watch.Stop();
    Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion
}





private static void DoWorkOfTask(int workDuration){
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task[{Task.CurrentId}].");
    Thread.Sleep(workDuration);
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task[{Task.CurrentId}].");
}
           

使用

Wait

WaitAll

WaitAny

方法時,我們可以設定逾時時間或者傳入取消Token,以控制等待時間。但這些方法傳回布爾值,隻能表明是否等待成功;假如我們需要知道所等待的任務傳回值,則可以使用

WhenAll

WhenAny

方法,這兩個方法不能控制等待時間,但會傳回一個完成的任務。如下例:

int>[] tasks = 
           

Task.WhenAll

 和

Task.WhenAny

在等待結束時,都會建立一個完成狀态的任務,

WhenAll

将等待的所有已完成任務的結果放入建立任務的結果中,

WhenAny

則将等待的已完成任務放到建立任務的結果中。

任務延續

有時候,我們需要在一個任務完成時開始另一個任務。對于這種需求,我們可以使用

Task

ContinueWith

等方法來處理。

3000));
           

運作結果:

2019/3/27 21:25:09=> Thread[10] started task[1].

2019/3/27 21:25:12=> Thread[10] completed task[1].

2019/3/27 21:25:12=> Thread[11] started task[2].

2019/3/27 21:25:13=> Thread[11] completed task[2].

我們還可以通過

TaskContinuationOptions

指定延續任務的執行條件,如任務取消時或者任務出現異常時才執行,等。

子任務的使用

有時候,我們要在一個任務裡面建立一些其他任務,并且還要在任務裡面等待建立的任務完成,此時我們可以使用子任務。

Task parent = Task.Factory.StartNew(() =>
{
    Console.WriteLine($"parent task #{Task.CurrentId} run.");
for (int i = 0; i < 10; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"child task #{Task.CurrentId} run.");
            Thread.Sleep(1000);
            Console.WriteLine($"child task #{Task.CurrentId} done.");
        }, TaskCreationOptions.AttachedToParent);
    }
});
parent.Wait();
Console.WriteLine($"parent task #{parent.Id} done.");
           

在一個任務中建立的新任務,預設情況下與父級任務是分離的,各自的運作不受影響,除非在建立任務時顯式附加到父級任務中。例如,上例中如果不指定

TaskCreationOptions.AttachedToParent

,

parent.Wait()

就不會持續到所有子任務都執行完成。

任務的取消

我們在啟動任務時,傳入取消令牌

CancellationToken

,當收到取消請求時,抛出取消異常并在等待任務完成時捕獲異常

TaskCanceledException

。我們通過這種方式控制任務的取消。

public static void TaskCancle(){
    Console.WriteLine("Press any key to begin. Press 'c' to cancel. ");
    Console.ReadKey(true);
    Console.WriteLine();

    CancellationTokenSource tokenSource = new CancellationTokenSource();
    ConcurrentBag tasks = new ConcurrentBag();
    Task task1 = Task.Factory.StartNew(() => DoWorkOfTask(5000, tokenSource.Token), tokenSource.Token);
    tasks.Add(task1);
    Task task2 = Task.Factory.StartNew(() =>
    {for (int i = 0; i < 10; i++)
        {int duration = 1000 * i;
            tasks.Add(Task.Factory.StartNew(()=>DoWorkOfTask(duration, tokenSource.Token), tokenSource.Token));
        }
        DoWorkOfTask(5000,tokenSource.Token);
    }, tokenSource.Token);
    tasks.Add(task2);char ch = Console.ReadKey().KeyChar;if (ch == 'c' || ch == 'C')
    {
        tokenSource.Cancel();
        Console.WriteLine($"{DateTime.Now}=> Task cancellation requested.");
    }try
    {
        Task.WaitAll(tasks.ToArray());
    }catch (AggregateException ae)
    {foreach (Exception ex in ae.InnerExceptions)
        {
            TaskCanceledException tce = ex as TaskCanceledException;string cancelledTask = tce == null ? string.Empty : $"Task #{tce.Task.Id}";
            Console.WriteLine($"Exception: {ex.GetType().Name}. {cancelledTask}");
        }
    }finally
    {
        tokenSource.Dispose();
    }
    Console.WriteLine();foreach (Task task in tasks)
    {
        Console.WriteLine($"Task: #{task.Id} now is {task.Status}");
    }
}private static void DoWorkOfTask(int workDuration, CancellationToken cancleToken){if (cancleToken.IsCancellationRequested)
    {
        Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled before it got started.");
        cancleToken.ThrowIfCancellationRequested();
    }
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task #{Task.CurrentId}.");
    Thread.Sleep(workDuration);if (cancleToken.IsCancellationRequested)
    {
        Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled.");
        cancleToken.ThrowIfCancellationRequested();
    }
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task #{Task.CurrentId}.");
}
           

任務的異常處理

上面提到通過取消令牌抛出

TaskCanceledException

的方式控制任務的取消,實際上,Task會把自身執行過程中的所有異常都包裝到一個

AggregateException

中,并傳回調用線程。我們在主線程中通過捕獲

AggregateException

來進行異常處理。

簡單的處理方式

我們可以在任務的調用線程捕獲并周遊

AggregateException

的内部異常,或者使用

AggregateException

提供的Handle方法進行處理,如下:

Task task = Task.Run(() =>
{
throw new Exception($"Task #{Task.CurrentId} thrown an exception");
});
try
{
    task.Wait();
}
catch (AggregateException ae)
{

foreach (Exception ex in ae.InnerExceptions)
    {
        Console.WriteLine($"foreach: {ex.Message}");
    }


    ae.Handle(ex=>
    {
        Console.WriteLine($"handle: {ex.Message}");
return true ;
    });
}
           

使用延續任務處理任務的異常

有時候,我們可以給任務附加一個任務異常時才會執行的延續任務,并在延續任務中進行異常處理。

throw 
           

嵌套任務的異常處理

下面是一個3層嵌套的任務。

Task parent = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
    {
        Task.Factory.StartNew(() =>
        {
for (int j = 0; j < 10; j++)
            {
                Task.Factory.StartNew(() =>
                {
throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
                });
            }

throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
        });
    }

throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
try
{
    parent.Wait();
}
catch (AggregateException ae)
{
    ae.Flatten().Handle(ex =>
    {
        Console.WriteLine(ex.Message);
return true;
    });
}
           

運作上面的代碼隻會得到一行輸出:

Task #1 thrown an exception.

看起來有點奇怪,為什麼隻捕獲到一個異常呢?其實也是在情理之中的:任務預設隻會把自身異常傳遞到它自己的調用線程,子任務是在父任務中調用的,其異常隻會傳遞到父任務的執行線程,是以我們在父任務的調用線程,也就是我們的主線程中是捕獲不到子任務的異常的。

取消上面代碼的兩處

,就會捕獲到所有異常。

任務排程器

.NET提供的任務排程器

任務是由

TaskScheduler

排程的,啟動任務時,預設使用線程池任務排程器,任務将會被派發到線程池工作線程。線程池的排程前面已經總結過,這裡不再展開。.NET提供的另一種任務排程器是同步上下文排程器,用

TaskScheduler.FromCurrentSynchronizationContext()

擷取,這個排程器會把任務派發給目前的上下文線程,常用在GUI應用程式中。

例如,我們在一個窗體中建立一個ListBox,建立幾個任務向其中添加項,代碼如下:

this.lbxMsg.Items.Add(
           

運作上面的代碼可以發現建立的任務都是由界面線程執行的。這裡如果使用預設的任務排程器将産生"線程間操作無效"的異常。

實際使用時,可以給一個異步任務添加延續任務,來處理異步任務的結果或者異常等。如下:

Task.Run(() =>
{
    Thread.Sleep(3000);
return 1000;
}).ContinueWith(t =>
{
this.lbxMsg.Items.Add(t.Result);
}, TaskScheduler.FromCurrentSynchronizationContext());
           

自定義任務排程器

除了使用.NET提供的排程器外,我們能夠繼承類

TaskScheduler

來實作自己的任務排程器。這裡不再展開,需要了解的可以參考Samples for Parallel Programming with the .NET Framework。

原文位址:https://www.cnblogs.com/chenbaoshun/p/10621819.html

.NET社群新聞,深度好文,歡迎通路公衆号文章彙總 http://www.csharpkit.com 

C# Task 循環任務_C#并行程式設計(4):基于任務的并行原文位址:https://www.cnblogs.com/chenbaoshun/p/10621819.html