天天看點

.NET 并行程式設計——任務并行 下載下傳 Demo下載下傳 Samples for Parallel Programming with .net framework

本文内容

  • 并行程式設計
  • 任務并行
    • 隐式建立和運作任務
    • 顯式建立和運作任務
    • 任務 ID
    • 任務建立選項
    • 建立任務延續
    • 建立分離的子任務
    • 建立子任務
    • 等待任務完成
    • 組合任務
    • 任務中的異常處理
    • 取消任務
    • TaskFactory 類
    • 無委托的任務
    • 相關資料結構
  • 參考資料

下載下傳 Demo

下載下傳 Samples for Parallel Programming with .net framework

多核 CPU 已經相當普遍,使得多個線程能夠同時執行。将代碼并行化,工作也就分攤到多個 CPU 上。

過去,并行化需要線程和鎖的低級操作。而 Visual Studio 2010 和 .NET Framework 4 開始提供了新的運作時、新的類庫類型以及新的診斷工具,進而增強了對并行程式設計的支援。這些功能簡化了并行開發,通過固有方法編寫高效、細化且可伸縮的并行代碼,而不必直接處理線程或線程池。

下圖從較高層面上概述了 .NET Framework 4 中的并行程式設計體系結構。

.NET 并行程式設計——任務并行 下載下傳 Demo下載下傳 Samples for Parallel Programming with .net framework

任務并行庫(The Task Parallel Library,TPL)是 System.Threading 和 System.Threading.Tasks 空間中的一組公共類型和 API。TPL 的目的是通過簡化将并行和并發添加到應用程式的過程來提高開發人員的工作效率。TPL 能動态地最有效地使用所有可用的處理器。此外,TPL 還處理工作分區、ThreadPool 上的線程排程、取消支援、狀态管理以及其他低級别的細節操作。通過使用 TPL,你可以将精力集中于程式要完成的工作,同時最大程度地提高代碼的性能。

從 .NET Framework 4 開始,TPL 是編寫多線程代碼和并行代碼的首選方法。但并不是所有代碼都适合并行化,例如,如果某個循環在每次疊代時隻執行少量工作,或它在很多次疊代時都不運作,那麼并行化的開銷可能導緻代碼運作更慢。 此外,像任何多線程代碼一樣,并行化會增加程式執行的複雜性。 盡管 TPL 簡化了多線程方案,但建議對線程處理概念(例如,鎖、死鎖和争用條件)進行基本了解,以便能夠有效地使用 TPL。

“任務并行”是指一個或多個獨立的任務同時運作。好處當然是系統資源的使用效率更高,可伸縮性更好;對于線程或工作項,可以使用更多的程式設計控件。是以,在 .NET Framework 中,TPL 是用于編寫多線程、異步和并行代碼的首選 API。

Parallel.Invoke 方法提供了一種簡便方式,可同時運作任意數量的任意語句。隻需為每個工作項傳入 Action 委托即可。

下面的示例示範 Invoke 調用,建立并啟動同時運作三個任務。将對共享資料達爾文的《物種起源》執行三項操作:最長的詞、最頻繁出現的詞和字數。這些操作都不修改源,是以它們可以直接并行執行。

using System;      
using System.Linq;      
using System.Text;      
using System.Threading.Tasks;      
namespace ParallelInvokeDemo      
{      
class Program      
{      
static void Main(string[] args)      
{      
// Retrieve Darwin's "Origin of the Species" from Gutenberg.org.                
string[] words = CreateWordArray();      
// Perform three tasks in parallel on the source array      
Parallel.Invoke(() =>      
{      
Console.WriteLine("Begin first task...");      
GetLongestWord(words);      
},  // close first Action      
() =>      
{      
Console.WriteLine("Begin second task...");      
GetMostCommonWords(words);      
}, //close second Action      
() =>      
{      
Console.WriteLine("Begin third task...");      
GetCountForWord(words, "species");      
}  //close third Action      
); //close parallel.invoke      
Console.WriteLine("Returned from Parallel.Invoke");      
Console.WriteLine("Press any key to exit.");      
Console.ReadKey();      
}      
private static string GetLongestWord(string[] words)      
{      
var longestWord = (from w in words      
orderby w.Length descending      
select w).First();      
Console.WriteLine("Task 1 -- The longest word is {0}", longestWord);      
return longestWord;      
}      
private static void GetMostCommonWords(string[] words)      
{      
var frequencyOrder = from word in words      
where word.Length > 6      
group word by word into g      
orderby g.Count() descending      
select g.Key;      
var commonWords = frequencyOrder.Take(10);      
StringBuilder sb = new StringBuilder();      
sb.AppendLine("Task 2 -- The most common words are:");      
foreach (var v in commonWords)      
{      
sb.AppendLine("  " + v);      
}      
Console.WriteLine(sb.ToString());      
}      
private static void GetCountForWord(string[] words, string term)      
{      
var findWord = from word in words      
where word.ToUpper().Contains(term.ToUpper())      
select word;      
Console.WriteLine(@"Task 3 -- The word ""{0}"" occurs {1} times.", term, findWord.Count());      
}      
static string[] CreateWordArray()      
{      
string s = System.IO.File.ReadAllText("Origin of the Species.txt");      
// Separate string into an array of words, removing some common punctuation.      
return s.Split(      
new char[] { ' ', '\u000A', ',', '.', ';', ':', '-', '_', '/' },      
StringSplitOptions.RemoveEmptyEntries);      
}      
}      
}      
//RESULT:      
//Begin second task...      
//Begin first task...      
//Begin third task...      
//Task 2 -- The most common words are:      
//  species      
//  selection      
//  varieties      
//  natural      
//  animals      
//  between      
//  different      
//  distinct      
//  several      
//  conditions      
//Task 1 -- The longest word is characteristically      
//Task 3 -- The word "species" occurs 1927 times.      
//Returned from Parallel.Invoke      
//Press any key to exit.      
為了更好地控制任務執行或從任務傳回值,必須更加顯式地使用 Task 對象。

不傳回值的任務由 System.Threading.Tasks.Task 類表示。傳回值的任務由 System.Threading.Tasks.Task<TResult> 類表示,該類從 Task 繼承。

任務對象處理基礎結構詳細資訊,并提供可在任務的整個生存期内從調用線程通路的方法和屬性。 例如,可以随時通路任務的 Status 屬性,以确定它是已開始運作、已完成運作、已取消還是引發了異常。 狀态由 TaskStatus 枚舉表示。

在建立任務時,你賦予它一個使用者委托,該委托封裝該任務将執行的代碼。 該委托可以表示為命名的委托、匿名方法或 lambda 表達式。 lambda 表達式可以包含對命名方法的調用,如下面的示例所示。

using System;      
using System.Threading;      
using System.Threading.Tasks;      
namespace ExplicitTaskDemo      
{      
class Program      
{      
static void Main(string[] args)      
{      
Thread.CurrentThread.Name = "Main";      
// Create a task and supply a user delegate by using a lambda expression.       
Task taskA = new Task(() => Console.WriteLine("Hello from taskA."));      
// Start the task.      
taskA.Start();      
// Output a message from the calling thread.      
Console.WriteLine("Hello from thread '{0}'.", Thread.CurrentThread.Name);      
taskA.Wait();      
//Thread.CurrentThread.Name = "Main";      
//// Define and run the task.      
//Task taskA = Task.Run(() => Console.WriteLine("Hello from taskA."));      
//// Output a message from the calling thread.      
//Console.WriteLine("Hello from thread '{0}'.", Thread.CurrentThread.Name);      
//taskA.Wait();      
//Thread.CurrentThread.Name = "Main";      
//// Better: Create and start the task in one operation.       
//Task taskA = Task.Factory.StartNew(() => Console.WriteLine("Hello from taskA."));      
//// Output a message from the calling thread.      
//Console.WriteLine("Hello from thread '{0}'.", Thread.CurrentThread.Name);      
//taskA.Wait();      
Console.WriteLine("Press any Key to Exit.");      
Console.ReadKey();      
}      
}      
}      
//TaskParallel1 RESULT:      
//Hello from thread 'Main'.      
//Hello from taskA.      
//Press any Key to Exit.      
你可以用 new Task(),也可以用 Task.Run(),還可以用 Task.Factory.StartNew(),建立并啟動一個任務。其中,Task.Run() 是首選方法;不必将建立和計劃分開并且你需要其他任務建立選項或使用特定計劃程式時,或者需要通過 AsyncState 屬性将其他狀态傳遞到任務時,使用 Task.Factory.StartNew()。

建立任務的大多數 API 提供接受 TaskCreationOptions 枚舉參數。 通過指定下列選項之一,可訓示任務計劃程式如何線上程池中安排任務計劃。

using System;      
using System.Threading.Tasks;      
namespace TaskCreationOptionsDemo      
{      
class Program      
{      
static void Main(string[] args)      
{      
var task3 = new Task(() => MyLongRunningMethod(),      
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness);      
task3.Start();      
task3.Wait();      
Console.WriteLine("Press any key to Exit.");      
Console.ReadKey();      
}      
static void MyLongRunningMethod()      
{      
Console.WriteLine("A long, long time ago......");      
}      
}      
}      
TaskCreationOptions 參數值 說明
None 未指定任何選項時的預設值。 計劃程式将使用其預設試探法來計劃任務。
PreferFairness 指定應當計劃任務,以使越早建立的任務将更可能越早執行,而越晚建立的任務将更可能越晚執行。
LongRunning 指定該任務表示長時間運作的運算。
AttachedToParent 指定應将任務建立為目前任務(如果存在)的附加子級。 有關更多資訊,請參見已附加和已分離的子任務。
DenyChildAttach 指定如果内部任務指定 AttachedToParent 選項,則該任務不會成為附加的子任務。
HideScheduler 指定通過調用特定任務内部的 TaskFactory.StartNew 或 Task<TResult>.ContinueWith 等方法建立的任務的任務計劃程式是預設計劃程式,而不是正在運作此任務的計劃程式。

使用 Task.ContinueWith 和 Task<TResult>.ContinueWith 方法可以指定在先行任務完成時要啟動的任務。 延續任務的委托已傳遞了對先行任務的引用,是以它可以檢查先行任務的狀态,并通過檢索 Task<TResult>.Result 屬性的值将先行任務的輸出用作延續任務的輸入。

var getData = Task.Factory.StartNew(() =>      
{      
Random rnd = new Random();      
int[] values = new int[100];      
for (int ctr = 0; ctr <= values.GetUpperBound(0); ctr++)      
values[ctr] = rnd.Next();      
return values;      
});      
var processData = getData.ContinueWith((x) =>      
{      
int n = x.Result.Length;      
long sum = 0;      
double mean;      
for (int ctr = 0; ctr <= x.Result.GetUpperBound(0); ctr++)      
sum += x.Result[ctr];      
mean = sum / (double)n;      
return Tuple.Create(n, sum, mean);      
});      
var displayData = processData.ContinueWith((x) =>      
{      
return String.Format("N={0:N0}, Total = {1:N0}, Mean = {2:N2}",      
x.Result.Item1, x.Result.Item2, x.Result.Item3);      
});      
Console.WriteLine(displayData.Result);      
//N=100, Total = 108,192,345,466, Mean = 1,081,923,454.66      
//Press any key to Exit.      

也可以用鍊式寫法:

var displayData = Task.Factory.StartNew(() =>      
{      
Random rnd = new Random();      
int[] values = new int[100];      
for (int ctr = 0; ctr <= values.GetUpperBound(0); ctr++)      
values[ctr] = rnd.Next();      
return values;      
}).      
ContinueWith((x) =>      
{      
int n = x.Result.Length;      
long sum = 0;      
double mean;      
for (int ctr = 0; ctr <= x.Result.GetUpperBound(0); ctr++)      
sum += x.Result[ctr];      
mean = sum / (double)n;      
return Tuple.Create(n, sum, mean);      
}).      
ContinueWith((x) =>      
{      
return String.Format("N={0:N0}, Total = {1:N0}, Mean = {2:N2}",      
x.Result.Item1, x.Result.Item2,      
x.Result.Item3);      
});      
Console.WriteLine(displayData.Result);      

如果在任務中運作的使用者代碼建立一個新任務,且未指定 AttachedToParent 選項,則該新任務不采用任何特殊方式與父任務同步。 這種不同步的任務類型稱為“分離的嵌套任務”或“分離的子任務”。

var outer = Task.Factory.StartNew(() =>      
{      
Console.WriteLine("Outer task beginning.");      
var child = Task.Factory.StartNew(() =>      
{      
Thread.SpinWait(5000000);      
Console.WriteLine("Detached task completed.");      
});      
});      
outer.Wait();      
Console.WriteLine("Outer task completed.");      
//The example displays the following output:      
//Outer task beginning.      
//Outer task completed.      
//Detached task completed.      
從輸出資訊看,outer 沒有等 child。

如果在一個任務中運作的使用者代碼建立任務時指定了 AttachedToParent 選項,則該新任務稱為父任務的“附加子任務”。 因為父任務隐式地等待所有附加子任務完成,是以你可以使用 AttachedToParent 選項表示結構化的任務并行。

var parent = Task.Factory.StartNew(() =>      
{      
Console.WriteLine("Parent task beginning.");      
for (int ctr = 0; ctr < 10; ctr++)      
{      
int taskNo = ctr;      
Task.Factory.StartNew((x) =>      
{      
Thread.SpinWait(5000000);      
Console.WriteLine("Attached child #{0} completed.",      
x);      
},      
taskNo, TaskCreationOptions.AttachedToParent);      
}      
});      
parent.Wait();      
Console.WriteLine("Parent task completed.");      
//The example displays the following output:      
//Parent task beginning.      
//Attached child #9 completed.      
//Attached child #0 completed.      
//Attached child #8 completed.      
//Attached child #1 completed.      
//Attached child #3 completed.      
//Attached child #7 completed.      
//Attached child #4 completed.      
//Attached child #5 completed.      
//Attached child #2 completed.      
//Attached child #6 completed.      
//Parent task completed.      

System.Threading.Tasks.Task 類型和 System.Threading.Tasks.Task<TResult> 類型提供了 Task.Wait 和 Task<TResult>.Wait 方法的若幹重載,使你能夠等待任務完成。

此外,使用靜态 Task.WaitAll 和 Task.WaitAny 方法的重載可以等待一批任務中的任一任務或所有任務完成。

通常,會出于以下某個原因等待任務:

  • 主線程依賴于任務計算的最終結果。
  • 你必須處理可能從任務引發的異常。
  • 應用程式可以在所有任務執行完畢之前終止。 例如,執行 Main(應用程式入口點)中的所有同步代碼後,控制台應用程式将立即終止。

Task 類和 Task<TResult> 類提供多種方法,這些方法能夠幫助你組合多個任務以實作常見模式,并更好地使用由 C#、Visual Basic 和 F# 提供的異步語言功能。 本節介紹了 WhenAll、WhenAny、Delay 和 FromResult<TResult> 方法。

Task.WhenAll 和 Task.WhenAny

using System;      
using System.Collections.Generic;      
using System.Linq;      
using System.Text;      
using System.Threading;      
using System.Threading.Tasks;      
namespace TaskWaitAllWaitAnyDemo      
{      
class Program      
{      
static Random rand = new Random();      
static void Main(string[] args)      
{      
// Wait on a single task with no timeout specified.      
Task taskA = Task.Factory.StartNew(() => DoSomeWork(10000000));      
taskA.Wait();      
Console.WriteLine("taskA has completed.");      
// Wait on a single task with a timeout specified.      
Task taskB = Task.Factory.StartNew(() => DoSomeWork(10000000));      
taskB.Wait(100); //Wait for 100 ms.      
if (taskB.IsCompleted)      
Console.WriteLine("taskB has completed.");      
else      
Console.WriteLine("Timed out before taskB completed.");      
// Wait for all tasks to complete.      
Task[] tasks = new Task[10];      
for (int i = 0; i < 10; i++)      
{      
tasks[i] = Task.Factory.StartNew(() => DoSomeWork(10000000));      
}      
Task.WaitAll(tasks);      
// Wait for first task to complete.      
Task<double>[] tasks2 = new Task<double>[3];      
// Try three different approaches to the problem. Take the first one.      
tasks2[0] = Task<double>.Factory.StartNew(() => TrySolution1());      
tasks2[1] = Task<double>.Factory.StartNew(() => TrySolution2());      
tasks2[2] = Task<double>.Factory.StartNew(() => TrySolution3());      
int index = Task.WaitAny(tasks2);      
double d = tasks2[index].Result;      
Console.WriteLine("task[{0}] completed first with result of {1}.", index, d);      
Console.ReadKey();      
}      
static void DoSomeWork(int val)      
{      
// Pretend to do something.      
Thread.SpinWait(val);      
}      
static double TrySolution1()      
{      
int i = rand.Next(1000000);      
// Simulate work by spinning      
Thread.SpinWait(i);      
return DateTime.Now.Millisecond;      
}      
static double TrySolution2()      
{      
int i = rand.Next(1000000);      
// Simulate work by spinning      
Thread.SpinWait(i);      
return DateTime.Now.Millisecond;      
}      
static double TrySolution3()      
{      
int i = rand.Next(1000000);      
// Simulate work by spinning      
Thread.SpinWait(i);      
Thread.SpinWait(1000000);      
return DateTime.Now.Millisecond;      
}      
}      
}      
//The example displays the following output:      
//taskA has completed.      
//taskB has completed.      
//task[0] completed first with result of 353.      
最後一個輸出結果是不确定的,因為 int index = Task.WaitAny(tasks2); 語句,task2 任務數組中有一個完成,就可以執行該語句下面的語句。

Task.FromResult<TResult>

using System;      
using System.Collections.Concurrent;      
using System.Collections.Generic;      
using System.Diagnostics;      
using System.Linq;      
using System.Net;      
using System.Text;      
using System.Threading.Tasks;      
namespace TaskFromResultDemo      
{      
/// <summary>      
/// Demonstrates how to use Task<TResult>.FromResult to create a task that holds a pre-computed result.      
/// </summary>      
class Program      
{      
// Holds the results of download operations.      
static ConcurrentDictionary<string, string> cachedDownloads = new ConcurrentDictionary<string, string>();      
// Asynchronously downloads the requested resource as a string.      
public static Task<string> DownloadStringAsync(string address)      
{      
// First try to retrieve the content from cache.      
string content;      
if (cachedDownloads.TryGetValue(address, out content))      
{      
return Task.FromResult<string>(content);      
}      
// If the result was not in the cache, download the       
// string and add it to the cache.      
return Task.Run(async () =>      
{      
content = await new WebClient().DownloadStringTaskAsync(address);      
cachedDownloads.TryAdd(address, content);      
return content;      
});      
}      
static void Main(string[] args)      
{      
// The URLs to download.      
string[] urls = new string[]      
{      
"http://msdn.microsoft.com",      
"http://www.contoso.com",      
"http://www.microsoft.com"      
};      
// Used to time download operations.      
Stopwatch stopwatch = new Stopwatch();      
// Compute the time required to download the URLs.      
stopwatch.Start();      
var downloads = from url in urls      
select DownloadStringAsync(url);      
Task.WhenAll(downloads).ContinueWith(results =>      
{      
stopwatch.Stop();      
// Print the number of characters download and the elapsed time.      
Console.WriteLine("Retrieved {0} characters. Elapsed time was {1} ms.",      
results.Result.Sum(result => result.Length),      
stopwatch.ElapsedMilliseconds);      
})      
.Wait();      
// Perform the same operation a second time. The time required      
// should be shorter because the results are held in the cache.      
stopwatch.Restart();      
downloads = from url in urls      
select DownloadStringAsync(url);      
Task.WhenAll(downloads).ContinueWith(results =>      
{      
stopwatch.Stop();      
// Print the number of characters download and the elapsed time.      
Console.WriteLine("Retrieved {0} characters. Elapsed time was {1} ms.",      
results.Result.Sum(result => result.Length),      
stopwatch.ElapsedMilliseconds);      
})      
.Wait();      
Console.WriteLine("Press any key to Exit.");      
Console.ReadKey();      
}      
}      
}      
//The example displays the following output:      
//Retrieved 45462 characters. Elapsed time was 23734 ms.      
//Retrieved 45462 characters. Elapsed time was 0 ms.      
//Press any key to Exit.      

當某個任務抛出一個或多個異常時,異常包裝在 AggregateException 異常中。 該異常傳播回與該任務聯接的線程,通常該線程正在等待該任務完成或該線程通路 Result 屬性。此行為用于強制實施 .NET Framework 政策 - 預設所有未處理的異常應終止程序。

可以通過将 Wait、WaitAll 或 WaitAny 方法,以及 Result 屬性放入 try/catch 塊中來處理異常。

捕獲 System.AggregateException,然後檢查其 InnerExceptions 以檢視是否存在任何可由程式代碼處理的異常。如下代碼所示:

static void HandleExceptions()      
{      
// Assume this is a user-entered string.      
string path = @"C:\";      
// Use this line to throw UnauthorizedAccessException, which we handle.      
Task<string[]> task1 = Task<string[]>.Factory.StartNew(() => GetAllFiles(path));      
// Use this line to throw an exception that is not handled.      
//  Task task1 = Task.Factory.StartNew(() => { throw new IndexOutOfRangeException(); } );      
try      
{      
task1.Wait();      
}      
catch (AggregateException ae)      
{      
ae.Handle((x) =>      
{      
if (x is UnauthorizedAccessException) // This we know how to handle.      
{      
Console.WriteLine("You do not have permission to access all folders in this path.");      
Console.WriteLine("See your network administrator or try another path.");      
return true;      
}      
return false; // Let anything else stop the application.      
});      
}      
Console.WriteLine("task1 has completed.");      
}      
static string[] GetAllFiles(string str)      
{      
// Should throw an AccessDenied exception on Vista.      
return System.IO.Directory.GetFiles(str, "*.txt", System.IO.SearchOption.AllDirectories);      
}      

在此示例中,捕獲 System.AggregateException,但不嘗試處理任何其内部異常。 而是使用 Flatten 方法從任何嵌套的 AggregateException 執行個體中提取内部異常,并再次引發直接包含所有内部未處理異常的單個 AggregateException。 展平異常将使其更便于供用戶端代碼處理。

static void RethrowAllExceptions()      
{      
// Assume this is a user-entered string.      
string path = @"C:\";      
Task<string[]>[] tasks = new Task<string[]>[3];      
tasks[0] = Task<string[]>.Factory.StartNew(() => GetAllFiles(path));      
tasks[1] = Task<string[]>.Factory.StartNew(() => GetValidExtensions(path));      
tasks[2] = Task<string[]>.Factory.StartNew(() => new string[10]);      
//int index = Task.WaitAny(tasks2);      
//double d = tasks2[index].Result;      
try      
{      
Task.WaitAll(tasks);      
}      
catch (AggregateException ae)      
{      
throw ae.Flatten();      
}      
Console.WriteLine("task1 has completed.");      
}      
static string[] GetValidExtensions(string path)      
{      
if (path == @"C:\")      
throw new ArgumentException("The system root is not a valid path.");      
return new string[10];      
}      

Task 類支援協作取消,并與 .NET Framework 4 中新增的 System.Threading.CancellationTokenSource 類和 System.Threading.CancellationToken 類完全內建。

建立任務的大多數 API 提供接受 CancellationToken 參數。

var tokenSource2 = new CancellationTokenSource();      
CancellationToken ct = tokenSource2.Token;      
var task = Task.Factory.StartNew(() =>      
{      
// Were we already canceled?      
ct.ThrowIfCancellationRequested();      
bool moreToDo = true;      
while (moreToDo)      
{      
// Poll on this property if you have to do      
// other cleanup before throwing.      
if (ct.IsCancellationRequested)      
{      
// Clean up here, then...      
ct.ThrowIfCancellationRequested();      
}      
}      
}, tokenSource2.Token); // Pass same token to StartNew.      
tokenSource2.Cancel();      
// Just continue on this thread, or Wait/WaitAll with try-catch:      
try      
{      
task.Wait();      
}      
catch (AggregateException e)      
{      
foreach (var v in e.InnerExceptions)      
Console.WriteLine(e.Message + " " + v.Message);      
}      

TaskFactory 類提供靜态方法,這些方法封裝了用于建立和啟動任務和延續任務的一些常用模式。

  • 最常用模式為 StartNew,它在一個語句中建立并啟動任務。
  • 從多個先行任務建立延續任務時,請使用 ContinueWhenAll 方法或 ContinueWhenAny 方法,或者它們在 Task<TResult> 類中的等效方法。 有關更多資訊,請參見延續任務。
  • 若要在 Task 或 Task<TResult> 執行個體中封裝異步程式設計模型 BeginX 和 EndX 方法,請使用 FromAsync 方法。

預設的 TaskFactory 可作為 Task 類或 Task<TResult> 類上的靜态屬性通路。 你還可以直接執行個體化 TaskFactory 并指定各種選項,包括 CancellationToken、TaskCreationOptions 選項、TaskContinuationOptions 選項或 TaskScheduler。 建立任務工廠時所指定的任何選項将應用于它建立的所有任務,除非 Task 是通過使用 TaskCreationOptions 枚舉建立的(在這種情況下,任務的選項重寫任務工廠的選項)。

在某些情況下,可能需要使用 Task 封裝由外部元件(而不是你自己的使用者委托)執行的某個異步操作。 如果該操作基于異步程式設計模型 Begin/End 模式,你可以使用 FromAsync 方法。 如果不是這種情況,你可以使用 TaskCompletionSource<TResult> 對象将該操作包裝在任務中,并因而獲得 Task 可程式設計性的一些好處,例如對異常傳播和延續的支援。

在許多方案中,啟用一個 Task<TResult> 來表示外部異步操作非常有用。 出于此目的,提供了 TaskCompletionSource<TResult>。 它允許建立可以分發到使用者的任務,這些使用者可以同其他情況下一樣使用該任務的成員。 但是,與大多數任務不同,TaskCompletionSource 建立的任務的狀态由 TaskCompletionSource 上的方法顯式控制。 這使得要傳播到基礎任務的外部異步操作能夠完成。 分離還確定使用者沒有通路對應的 TaskCompletionSource 時不能轉換狀态。

using System;      
using System.Collections.Generic;      
using System.Diagnostics;      
using System.Linq;      
using System.Text;      
using System.Threading;      
using System.Threading.Tasks;      
namespace TaskCompletionSourceDemo      
{      
class Program      
{      
// Demonstrated features:      
//         TaskCompletionSource ctor()      
//         TaskCompletionSource.SetResult()      
//         TaskCompletionSource.SetException()      
//        Task.Result      
// Expected results:      
//         The attempt to get t1.Result blocks for ~1000ms until tcs1 gets signaled. 15 is printed out.      
//         The attempt to get t2.Result blocks for ~1000ms until tcs2 gets signaled. An exception is printed out.      
// Documentation:      
//        http://msdn.microsoft.com/en-us/library/dd449199(VS.100).aspx      
static void Main(string[] args)      
{      
TaskCompletionSource<int> tcs1 = new TaskCompletionSource<int>();      
Task<int> t1 = tcs1.Task;      
// Start a background task that will complete tcs1.Task      
Task.Factory.StartNew(() =>      
{      
Thread.Sleep(1000);      
tcs1.SetResult(15);      
});      
// The attempt to get the result of t1 blocks the current thread until the completion source gets signaled.      
// It should be a wait of ~1000 ms.      
Stopwatch sw = Stopwatch.StartNew();      
int result = t1.Result;      
sw.Stop();      
Console.WriteLine("(ElapsedTime={0}): t1.Result={1} (expected 15) ", sw.ElapsedMilliseconds, result);      
// ------------------------------------------------------------------      
// Alternatively, an exception can be manually set on a TaskCompletionSource.Task      
TaskCompletionSource<int> tcs2 = new TaskCompletionSource<int>();      
Task<int> t2 = tcs2.Task;      
// Start a background Task that will complete tcs2.Task with an exception      
Task.Factory.StartNew(() =>      
{      
Thread.Sleep(1000);      
tcs2.SetException(new InvalidOperationException("SIMULATED EXCEPTION"));      
});      
// The attempt to get the result of t2 blocks the current thread until the completion source gets signaled with either a result or an exception.      
// In either case it should be a wait of ~1000 ms.      
sw = Stopwatch.StartNew();      
try      
{      
result = t2.Result;      
Console.WriteLine("t2.Result succeeded. THIS WAS NOT EXPECTED.");      
Console.WriteLine("Press any key to Exit.");      
Console.ReadKey();      
}      
catch (AggregateException e)      
{      
Console.Write("(ElapsedTime={0}): ", sw.ElapsedMilliseconds);      
Console.WriteLine("The following exceptions have been thrown by t2.Result: (THIS WAS EXPECTED)");      
for (int j = 0; j < e.InnerExceptions.Count; j++)      
{      
Console.WriteLine("\n-------------------------------------------------\n{0}", e.InnerExceptions[j].ToString());      
}      
Console.WriteLine("Press any key to Exit.");      
Console.ReadKey();      
}      
}      
}      
}      

TPL 有幾種在并行和順序方案中都有用的新公共類型。 它們包括 System.Collections.Concurrent 命名空間中的一些線程安全的、快速且可縮放的集合類,還包括一些新的同步類型(例如 System.Threading.Semaphore 和 System.Threading.ManualResetEventSlim),對特定類型的工作負荷而言,這些新同步類型比舊的同步類型效率更高。 .NET Framework 4 中的其他新類型(例如 System.Threading.Barrier 和 System.Threading.SpinLock)提供了早期版本中未提供的功能。

  • Microsoft Developer Network 并行程式設計
  • Microsft Developer Network 任務并行 

繼續閱讀