并行LINQ
并行查詢
.NET4在System.Linq名稱空間中包含一個新類ParalleIEnumerable ,可以分解查詢的工作使其分布在多個線程上。盡管Enmerable類給IEnunerable<T>接口定義了擴充方法,但
ParalleIEnumerable 類的大多數擴充方法是ParallelQuery<TSource>類的擴充。一個重要的例外是AsParallel()方法,它擴充了IEnumerable<TSource>接口,傳回ParallelQuery<TSource>類,是以正常的集合類可以以平行方式查詢。
例:
const int arraySize = 100000000;
var data = new int[arraySize];
var r = new Random();
for (int i = 0; i < arraySize; i++)
{
data[i] = r.Next(40);
}
現在可以使用LINQ查詢篩選資料,擷取篩選資料的總和。該查詢用where子句定義了一個篩選器,僅會中對應值小于20的項,接着調用聚合函數Sum()方法 。與前面的LINQ查詢的唯一差別是,這次調用了AsParallel()方法。
var sum = (from x in data.AsParallel()
where x < 20
select x).Sum();
與前面的LINQ查詢一樣,編譯器會修改文法,以調用AsParallel()、Where()、Select()和Sum()方法。AsParallel()方法用ParallelEnumerable類定義,以擴充IEnumerable<T>接口,是以對簡單的資料調用它。AsParallel()方法傳回ParallelQuery<TSource>。因為傳回的類型,是以編譯器選擇的Where()方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。在下面的代碼中Select()和Sum()方法也來自ParallelEnumerable類。與Enumerable類的實作代碼相反,對于ParallelEnumerable類,查詢是分區的,以便多個線程可以同時處理該查詢。數組可以分為多個部分,其中每個部分由不同的線程處理,以篩選其餘項。完成分區的工作後,就需要合并,獲得所有部分的總和。
var sum=data.AsParallel().Where(x=>x<20).Select(x=>x).Sum();
運作這行代碼就會啟動任務管理器,這樣就可以看出系統的所有CPU都在忙碌。如果删除AsParallel()方法,就不可能使用多個CPU。當然,如果系統上沒有多個CPU,就不會看到并行版本帶來改進。
分區器
AsParallel()方法不僅擴充了IEnumerable<T>接口,還擴充了Partition類。通過它,可以影響要建立的分區。
Partitioner類用System.Collection.Concurrent命名空間定義,并且有不同變體。Create方法接受實作了IList<T>類的數組或對象。根據這一點,以及類型的參數loadBalance和該方法的一些重載版本,會傳回一個不同的Partitioner類型。對于數組,.Net4包含派生自抽象基類OrderablePartitioner<TSource>的DynamicPartitionerForArray<TSource>類和StaticPartitionerForArray<TSource>類。
var q1 = (from x in Partitioner.Create(data).AsParallel()
where x < 20
select x).Sum();
也可以調用WithExecutionMode()和WithDegreeOfParallelism()方法可以傳遞ParallelExecutionMode的一個Default值或者ForceParallelism值。預設情況下,并行LINQ避免使用系統開銷很高的并行機制。對于WithDegreeOfParallelism()方法,可以傳遞一個整數值,以指定并行運作的最大任務數。
const int arraySize = 100000000;
Stopwatch watch = new Stopwatch();
watch.Start();
//一種寫法,沒有添加動态負載均衡,執行完所需要的時間1300毫秒
var q1 = (from x in Partitioner.Create(data).AsParallel()
where x < 80
//第二種寫法,添加了動态負載均衡,執行完所需要的時間為660毫秒。
var q1 = (from x in Partitioner.Create(data,true).AsParallel()
watch.Stop();
Console.WriteLine(watch.ElapsedMilliseconds.ToString());
取消
.Net提供了一種标準方式,來取消長時間運作的任務,這也适用于并行LINQ。要取消長時間的查詢,可以給查詢添加WithCancellation()方法,并傳遞一個CancellationToken令牌作為參數。CancellationToken令牌從CancellationTokenSource類中建立。該查詢在單獨的線程中運作,在該線程中,捕獲一個OperationCancelException類型的異常。如果取消了查詢,就觸發這個異常。在主線程中,調用CancellationTokenSource類的Cancel()方法可以取消任務。
var cts = new CancellationTokenSource();
new Thread(() =>
{
try
{
var sum = (from x in data.AsParallel().WithCancellation(cts.Token)
where x < 80
Console.WriteLine("query finished, sum: {0}", sum);
}
catch (OperationCanceledException ex)
Console.WriteLine(ex.Message);
}).Start();
Console.WriteLine("query started");
Console.Write("cancel? ");
int input = Console.Read();
if (input == 'Y' || input == 'y')
// cancel!
cts.Cancel();
}