天天看點

并行Linq

并行LINQ

并行查詢

.NET4在System.Linq名稱空間中包含一個新類ParalleIEnumerable ,可以分解查詢的工作使其分布在多個線程上。盡管Enmerable類給IEnunerable<T>接口定義了擴充方法,但

ParalleIEnumerable 類的大多數擴充方法是ParallelQuery<TSource>類的擴充。一個重要的例外是AsParallel()方法,它擴充了IEnumerable<TSource>接口,傳回ParallelQuery<TSource>類,是以正常的集合類可以以平行方式查詢。

例:

   const int arraySize = 100000000;

           var data = new int[arraySize];

           var r = new Random();

           for (int i = 0; i < arraySize; i++)

           {

               data[i] = r.Next(40);

           }

   現在可以使用LINQ查詢篩選資料,擷取篩選資料的總和。該查詢用where子句定義了一個篩選器,僅會中對應值小于20的項,接着調用聚合函數Sum()方法 。與前面的LINQ查詢的唯一差別是,這次調用了AsParallel()方法。

   var sum = (from x in data.AsParallel()

                                  where x < 20

                                  select x).Sum();

與前面的LINQ查詢一樣,編譯器會修改文法,以調用AsParallel()、Where()、Select()和Sum()方法。AsParallel()方法用ParallelEnumerable類定義,以擴充IEnumerable<T>接口,是以對簡單的資料調用它。AsParallel()方法傳回ParallelQuery<TSource>。因為傳回的類型,是以編譯器選擇的Where()方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。在下面的代碼中Select()和Sum()方法也來自ParallelEnumerable類。與Enumerable類的實作代碼相反,對于ParallelEnumerable類,查詢是分區的,以便多個線程可以同時處理該查詢。數組可以分為多個部分,其中每個部分由不同的線程處理,以篩選其餘項。完成分區的工作後,就需要合并,獲得所有部分的總和。

       var sum=data.AsParallel().Where(x=>x<20).Select(x=>x).Sum();

   運作這行代碼就會啟動任務管理器,這樣就可以看出系統的所有CPU都在忙碌。如果删除AsParallel()方法,就不可能使用多個CPU。當然,如果系統上沒有多個CPU,就不會看到并行版本帶來改進。

分區器

AsParallel()方法不僅擴充了IEnumerable<T>接口,還擴充了Partition類。通過它,可以影響要建立的分區。

   Partitioner類用System.Collection.Concurrent命名空間定義,并且有不同變體。Create方法接受實作了IList<T>類的數組或對象。根據這一點,以及類型的參數loadBalance和該方法的一些重載版本,會傳回一個不同的Partitioner類型。對于數組,.Net4包含派生自抽象基類OrderablePartitioner<TSource>的DynamicPartitionerForArray<TSource>類和StaticPartitionerForArray<TSource>類。

var q1 = (from x in Partitioner.Create(data).AsParallel()

                     where x < 20

                    select x).Sum();

也可以調用WithExecutionMode()和WithDegreeOfParallelism()方法可以傳遞ParallelExecutionMode的一個Default值或者ForceParallelism值。預設情況下,并行LINQ避免使用系統開銷很高的并行機制。對于WithDegreeOfParallelism()方法,可以傳遞一個整數值,以指定并行運作的最大任務數。

 const int arraySize = 100000000;

           Stopwatch watch = new Stopwatch();

           watch.Start();

           //一種寫法,沒有添加動态負載均衡,執行完所需要的時間1300毫秒

           var q1 = (from x in Partitioner.Create(data).AsParallel()

                     where x < 80

       //第二種寫法,添加了動态負載均衡,執行完所需要的時間為660毫秒。

var q1 = (from x in Partitioner.Create(data,true).AsParallel()

           watch.Stop();

           Console.WriteLine(watch.ElapsedMilliseconds.ToString());

取消

.Net提供了一種标準方式,來取消長時間運作的任務,這也适用于并行LINQ。要取消長時間的查詢,可以給查詢添加WithCancellation()方法,并傳遞一個CancellationToken令牌作為參數。CancellationToken令牌從CancellationTokenSource類中建立。該查詢在單獨的線程中運作,在該線程中,捕獲一個OperationCancelException類型的異常。如果取消了查詢,就觸發這個異常。在主線程中,調用CancellationTokenSource類的Cancel()方法可以取消任務。

           var cts = new CancellationTokenSource();

           new Thread(() =>

               {

                   try

                   {

                       var sum = (from x in data.AsParallel().WithCancellation(cts.Token)

                                  where x < 80

                       Console.WriteLine("query finished, sum: {0}", sum);

                   }

                   catch (OperationCanceledException ex)

                       Console.WriteLine(ex.Message);

               }).Start();

           Console.WriteLine("query started");

           Console.Write("cancel? ");

           int input = Console.Read();

           if (input == 'Y' || input == 'y')

               // cancel!

               cts.Cancel();

              }

繼續閱讀