天天看點

多核時代 .NET Framework 4 中的并行程式設計6---并行LINQ

1. 并行LINQ(PLINQ)

并行 LINQ (PLINQ) 是 LINQ 模式的并行實作。 PLINQ 查詢在許多方面類似于非并行 LINQ to Objects 查詢。 PLINQ 嘗試充分利用系統中的所有處理器, 它利用所有處理器的方法是,将資料源分成片段,然後在多個處理器上對單獨工作線程上的每個片段并行執行查詢。 在許多情況下,并行執行意味着查詢運作速度顯著提高。

PLINQ有兩個重要的類:ParallelEnumerable和ParallelQuery.其中, ParallelEnumerable包含許多在ParallelQuery類型上進行操作的擴充方法. ParallelEnumerable同樣實作許多在上Enumerable的方法,因為 ParallelQuery 實作IEnumerable接口,使用我們可以很友善的建立ParallelQuery類的一個執行個體并運用在ParallelEnumerable上實作的并行方法.接下來,通過代碼來學習這些重要的類.

 2. ParallelEnumerable 運算符

ParallelEnumerable包含很多運算符,他們對下封裝很多複雜的實作過程, 以便對上我們可以友善的使用并行linq查詢.具體如下:

(1) AsParallel

 PLINQ 的入口點。指定如果可能,應并行化查詢的其餘部分。PLINQ 通常隻需向資料源添加 AsParallel 查詢操作,即可在某些查詢類型的舊版代碼上獲得顯著的性能改進。代碼如下:

int[] source = new int[1000];

            for (int i = 0; i < source.Length; i++)

            {

                source[i] = i;

            }

 var  presult = source.AsParallel().Select(c => Math.Pow(c, 3));

            foreach (var d in presult)

                Console.WriteLine("AsParallel Result  is:{0}", d);

通過上面的代碼,我們可以通過使用AsParallel()擴充方法,即可開啟并行查詢.是以,一般的linq查詢代碼我們隻需要使用AsParallel()擴充的方法,即可啟用并行查詢.

(2) AsSequential<TSource>

 指定查詢按順序運作,也就是将并行的查詢轉換成一般的順序查詢.代碼如下:

            int[] source = new int[1000];

                source[i] = i;

            var result = source.AsParallel().AsSequential().Where(c => c > 0);

            foreach (var d in result)

                Console.WriteLine("AsSequential Result:{0}", d);

(3) AsOrdered

 指定 PLINQ 應保留查詢的其餘部分的源序列排序,直到例如通過使用 orderby子句更改排序為止。但是,因為并行查詢的情況,預設未排序的,除非順序特别重要,否則從性能角度的考慮建議不要排序.

            int[] sourceData = new int[5];

            for (int i = 0; i < sourceData.Length; i++)

                sourceData[i] = i;

            var results1 = sourceData.AsParallel().AsOrdered().Select(c => Math.Pow(c, 3));

            foreach (double d in results1)

                Console.WriteLine("AsOrdered result {0} ", d);

(4) AsUnordered<TSource>

 指定查詢的其餘部分的 PLINQ 不需要保留源序列的排序。

(5)  WithCancellation<TSource>

 指定 PLINQ 應定期監視請求取消時提供的取消标記和取消執行的狀态,這樣我們就可以在需要時候取消查詢。代碼如下:

            CancellationTokenSource token = newCancellationTokenSource();

             int[] sourceData = new int[1000000];

            var results1 = sourceData.AsParallel().WithCancellation(token.Token).WithMergeOptions(ParallelMergeOptions.NotBuffered).Where(c => c > 1);

            Task.Factory.StartNew(() =>

                Thread.Sleep(5000);

                token.Cancel();

                Console.WriteLine("Token source cancelled");

            });

            try

                foreach (double d in results1)

                {

                    Console.WriteLine("CancellationTokenSource result {0},Thread Id:{1} ", d, Thread.CurrentThread.ManagedThreadId);

                }

            catch (OperationCanceledException)

                Console.WriteLine("Caught cancellation exception");

            Console.WriteLine("Press enter to finish");

            Console.ReadLine();

(6) WithDegreeOfParallelism<TSource>

 指定 PLINQ 應當用來并行化查詢的處理器的最大數目。并行度是将用于處理查詢的同時執行的任務的最大數目. 代碼如下:

  var results1 = sourceData.AsParallel().WithDegreeOfParallelism(3).Where(c => c > 1);

    我們不防想想,如果将WithDegreeOfParallelism設定為1,會是什麼情況呢?

(7) WithMergeOptions<TSource>

 提供有關 PLINQ 應當如何(如果可能)将并行結果合并回到使用線程上的一個序列的提示, 指定查詢對輸出進行緩沖處理的方式.

   var results1 = sourceData.AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Where(c => c > 1);

其中ParallelMergeOptions枚舉可選擇以下幾個值:

Ø Default 使用預設合并類型,即 AutoBuffered。 

Ø NotBuffered 不利用輸出緩沖區進行合并。一旦計算出結果元素,就向查詢使用者提供這些元素。 

Ø AutoBuffered 利用系統標明大小的輸出緩沖區進行合并。 在向查詢使用者提供結果之前,會先将結果累計到輸出緩沖區中。 

Ø FullyBuffered 利用整個輸出緩沖區進行合并。 在向查詢使用者提供任何結果之前,系統會先累計所有結果。 

(8) WithExecutionMode<TSource>

 指定 PLINQ 應當如何并行化查詢(即使預設行為是按順序運作查詢)。

var results1 = sourceData.AsParallel().WithExecutionMode(ParallelExecutionMode.Default).Where(c => c > 1);

 其中, ParallelExecutionMode枚舉可以選擇如下兩種值:

Ø Default 這是預設設定。 PLINQ 将檢查查詢的結構,僅在可能帶來加速時才對查詢進行并行化。 如果查詢結構訓示不可能獲得加速,則 PLINQ 會将查詢當作普通的 LINQ to Objects 查詢來執行。 

Ø ForceParallelism 并行化整個查詢,即使要使用系統開銷大的算法。 如果認為并行執行查詢将帶來加速,則使用此标志,但處于預設模式的 PLINQ 将按順序執行它。 

(9) ForAll<TSource>

 多線程枚舉方法,與循環通路查詢結果不同,它允許在不首先合并回到使用者線程的情況下并行處理結果。這是處理并行化查詢輸出的高效方法,因為它不需要在結束時執行合并步驟.代碼如下:

  int[] sourceData = new int[5];

            sourceData.AsParallel().ForAll((d) =>

                Console.WriteLine("ForAll result {0} ", d);

(10) Aggregate 重載

 對于 PLINQ 唯一的重載,它啟用對線程本地分區的中間聚合以及一個用于合并所有分區結果的最終聚合函數。

3. 延遲執行

預設情況下PLINQ和LINQ類似,它隻在需要查詢結果時才計算.為了快速得到結果,我們可以在查詢之後再調用ToArray(), ToDictionary(),ToList()方法,以便使其立刻計算.

4. 異常處理

5. Range方法

ParallelEnumerable類還提供了Rang擴充方法. 它可生成指定範圍内的整數的并行序列,也就是在指定的範圍内進行并行處理.代碼如下:

var result1 =

            from e in ParallelEnumerable.Range(0, 100)

            where e % 2 == 0

            select Math.Pow(e, 2);

6. Repeat方法

ParallelEnumerable類還提供了Repeat擴充方法. 生成包含一個重複值的并行序列,也就是并行生成重複值.代碼如下:

var result2 =

            ParallelEnumerable.Repeat(10, 1000)

            .Select(item => Math.Pow(item, 2));

此次,有關PLINQ的基本操作介紹完畢,後續還有其他内容繼續介紹.

    本文轉自風車車  部落格園部落格,原文連結:http://www.cnblogs.com/xray2005/archive/2011/09/01/2161491.html,如需轉載請自行聯系原作者

繼續閱讀