目錄
- C#并行程式設計-相關概念
- C#并行程式設計-Parallel
- C#并行程式設計-Task
- C#并行程式設計-并發集合
- C#并行程式設計-線程同步原語
- C#并行程式設計-PLINQ:聲明式資料并行
背景
通過LINQ可以友善的查詢并處理不同的資料源,使用Parallel LINQ (PLINQ)來充分獲得并行化所帶來的優勢。
PLINQ不僅實作了完整的LINQ操作符,而且還添加了一些用于執行并行的操作符,與對應的LINQ相比,通過PLINQ可以獲得明顯的加速,但是具體的加速效果還要取決于具體的場景,不過在并行化的情況下一段會加速。
如果一個查詢涉及到大量的計算和記憶體密集型操作,而且順序并不重要,那麼加速會非常明顯,然而,如果順序很重要,那麼加速就會受到影響。
AsParallel() 啟用查詢的并行化
下面貼代碼,看下效果,詳情見注釋:

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條資料 可以修改資料量檢視Linq和Plinq的性能*/
Parallel.For(0, 600000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
/*采用LINQ查詢符合條件的資料*/
Stopwatch sw = new Stopwatch();
sw.Restart();
var productListLinq = from product in products
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用Linq 查詢得出數量為:{0}", productListLinq.Count());
sw.Stop();
Console.WriteLine("采用Linq 耗時:{0}", sw.ElapsedMilliseconds);
/*采用PLINQ查詢符合條件的資料*/
sw.Restart();
var productListPLinq = from product in products.AsParallel() /*AsParallel 試圖利用運作時所有可用的邏輯核心,進而使運作的速度比串行的版本要快 但是需要注意開銷所帶來的性能損耗*/
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用PLinq 查詢得出數量為:{0}", productListPLinq.Count());
sw.Stop();
Console.WriteLine("采用PLinq 耗時:{0}", sw.ElapsedMilliseconds);
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
View Code
目前模拟的資料量比較少,資料量越多,采用并行化查詢的效果越明顯
AsOrdered()與orderby
AsOrdered:保留查詢的結果按源序列排序,在并行查詢中,多條資料會被分在多個區域中進行查詢,查詢後再将多個區的資料結果合并到一個結果集中并按源序列順序傳回。
orderby:将傳回的結果集按指定順序進行排序
下面貼代碼友善大家了解:

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<string> products = new ConcurrentQueue<string>();
products.Enqueue("E");
products.Enqueue("F");
products.Enqueue("B");
products.Enqueue("G");
products.Enqueue("A");
products.Enqueue("C");
products.Enqueue("SS");
products.Enqueue("D");
/*不采用并行化 其資料輸出結果 不做任何處理 */
var productListLinq = from product in products
where (product.Length == 1)
select product;
string appendStr = string.Empty;
foreach (string str in productListLinq)
{
appendStr += str + " ";
}
Console.WriteLine("不采用并行化 輸出:{0}", appendStr);
/*不采用任何排序政策 其資料輸出結果 是直接将分區資料結果合并起來 不做任何處理 */
var productListPLinq = from product in products.AsParallel()
where (product.Length == 1)
select product;
appendStr = string.Empty;
foreach (string str in productListPLinq)
{
appendStr += str + " ";
}
Console.WriteLine("不采用AsOrdered 輸出:{0}", appendStr);
/*采用 AsOrdered 排序政策 其資料輸出結果 是直接将分區資料結果合并起來 并按原始資料順序排序*/
var productListPLinq1 = from product in products.AsParallel().AsOrdered()
where (product.Length == 1)
select product;
appendStr = string.Empty;
foreach (string str in productListPLinq1)
{
appendStr += str + " ";
}
Console.WriteLine("采用AsOrdered 輸出:{0}", appendStr);
/*采用 orderby 排序政策 其資料輸出結果 是直接将分區資料結果合并起來 并按orderby要求進行排序*/
var productListPLinq2 = from product in products.AsParallel()
where (product.Length == 1)
orderby product
select product;
appendStr = string.Empty;
foreach (string str in productListPLinq2)
{
appendStr += str + " ";
}
Console.WriteLine("采用orderby 輸出:{0}", appendStr);
Console.ReadLine();
}
}
在PLINQ查詢中,AsOrdered()和orderby子句都會降低運作速度,是以如果順序并不是必須的,那麼在請求特定順序的結果之前,将加速效果與串行執行的性能進行比較是非常重要的。
指定執行模式 WithExecutionMode
對串行化代碼進行并行化,會帶來一定的額外開銷,Plinq查詢執行并行化也是如此,在預設情況下,執行PLINQ查詢的時候,.NET機制會盡量避免高開銷的并行化算法,這些算法有可能會将執行的性能降低到地獄串行執行的性能。
.NET會根據查詢的形态做出決策,并不開了資料集大小和委托執行的時間,不過也可以強制并行執行,而不用考慮執行引擎分析的結果,可以調用WithExecutionMode方法來進行設定。、
下面貼代碼,友善大家了解

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條資料*/
Parallel.For(0, 6000000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
/*采用并行化整個查詢 查詢符合條件的資料*/
Stopwatch sw = new Stopwatch();
sw.Restart();
var productListLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism)
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用并行化整個查詢 查詢得出數量為:{0}", productListLinq.Count());
sw.Stop();
Console.WriteLine("采用并行化整個查詢 耗時:{0}", sw.ElapsedMilliseconds);
/*采用預設設定 由.NET進行決策 查詢符合條件的資料*/
sw.Restart();
var productListPLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.Default)
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用預設設定 由.NET進行決策 查詢得出數量為:{0}", productListPLinq.Count());
sw.Stop();
Console.WriteLine("采用預設設定 由.NET進行決策 耗時:{0}", sw.ElapsedMilliseconds);
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
通過PLINQ執行歸約操作
PLINQ可以簡化對一個序列或者一個組中所有成員應用一個函數的過程,這個過程稱之為歸約操作,如在PLINQ查詢中使用類似于Average,Max,Min,Sum之類的聚合函數就可以充分利用并行所帶來好處。
并行執行的規約和串行執行的規約的執行結果可能會不同,因為在操作不能同時滿足可交換和可傳遞的情況下産生攝入,在每次執行的時候,序列或組中的元素在不同并行任務中分布可能也會有差別,因而在這種操作的情況下可能會産生不同的最終結果,是以,一定要通過對于的串行版本來興義原始的資料源,這樣才能幫助PLINQ獲得最優的執行結果。
下面貼代碼:

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<int> products = new ConcurrentQueue<int>();
/*向集合中添加多條資料*/
Parallel.For(0, 6000000, (num) =>
{
products.Enqueue(num);
});
/*采用LINQ 傳回 IEumerable<int>*/
var productListLinq = (from product in products
select product).Average();
Console.WriteLine("采用Average計算平均值:{0}", productListLinq);
/*采用PLINQ 傳回 ParallelQuery<int>*/
var productListPLinq = (from product in products.AsParallel()
select product).Average();
Console.WriteLine("采用Average計算平均值:{0}", productListPLinq);
Console.ReadLine();
}
}
如上述代碼所示
在LINQ版本中,該方法會傳回一個 IEumerable<int>,即調用 Eumerable.Range方法生成指定範圍整數序列的結果,
在PLINQ版本中,該方法會傳回一個 ParallelQuery<int>,即調用并行版本中System.Linq.ParallelEumerable的ParallelEumerable.Range方法,通過這種方法得到的結果序列也是并行序列,可以再PLINQ中并行運作。
如果想對特定資料源進行LINQ查詢時,可以定義為 private IEquatable<int> products
如果想對特定資料源進行PLINQ查詢時,可以定義為 private ParallelQuery<int> products
并發PLINQ任務

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條資料*/
Parallel.For(0, 600000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
CancellationTokenSource cts = new CancellationTokenSource();
/*建立tk1 任務 查詢 符合 條件的資料*/
Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) =>
{
Console.WriteLine("開始執行 tk1 任務", products.Count);
Console.WriteLine("tk1 任務中 資料結果集數量為:{0}", products.Count);
var result = products.AsParallel().Where(p => p.Name.Contains("1") && p.Name.Contains("2"));
return result;
}, cts.Token);
/*建立tk2 任務,在執行tk1任務完成 基于tk1的結果查詢 符合 條件的資料*/
Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
{
Console.WriteLine("開始執行 tk2 任務", products.Count);
Console.WriteLine("tk2 任務中 資料結果集數量為:{0}", tk.Result.Count());
var result = tk.Result.Where(p => p.Category.Contains("1") && p.Category.Contains("2"));
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
/*建立tk3 任務,在執行tk1任務完成 基于tk1的結果查詢 符合 條件的資料*/
Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
{
Console.WriteLine("開始執行 tk3 任務", products.Count);
Console.WriteLine("tk3 任務中 資料結果集數量為:{0}", tk.Result.Count());
var result = tk.Result.Where(p => p.SellPrice > 1111 && p.SellPrice < 222222);
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
tk1.Start();
Task.WaitAll(tk1, tk2, tk3);
Console.WriteLine("tk2任務結果輸出,篩選後記錄總數為:{0}", tk2.Result.Count());
Console.WriteLine("tk3任務結果輸出,篩選後記錄總數為:{0}", tk3.Result.Count());
tk1.Dispose();
tk2.Dispose();
tk3.Dispose();
cts.Dispose();
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
如代碼所示tk1,tk2,tk3三個任務,tk2,tk3任務的運作需要基于tk1任務的結果,是以,參數中指定了TaskContinuationOptions.OnlyOnRanToCompletion,通過這種方式,每個被串聯的任務都會等待之前的任務完成之後才開始執行,tk2,tk3在tk1執行完成後,這兩個任務的PLINQ查詢可以并行運作,并将會可能地使用多個邏輯核心。
取消PLINQ WithCancellation
通過WithCancellation取消目前PLINQ正在執行的查詢操作,代碼如下:

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條資料*/
Parallel.For(0, 600000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
/*建立tk1 任務 查詢 符合 條件的資料*/
Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) =>
{
var result = products.AsParallel();
try
{
Console.WriteLine("開始執行 tk1 任務", products.Count);
Console.WriteLine("tk1 任務中 資料結果集數量為:{0}", products.Count);
result = products.AsParallel().WithCancellation(token).Where(p => p.Name.Contains("1") && p.Name.Contains("2"));
}
catch (AggregateException ex)
{
foreach (Exception e in ex.InnerExceptions)
{
Console.WriteLine("tk3 錯誤:{0}", e.Message);
}
}
return result;
}, cts.Token);
/*建立tk2 任務,在執行tk1任務完成 基于tk1的結果查詢 符合 條件的資料*/
Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
{
var result = tk.Result;
try
{
Console.WriteLine("開始執行 tk2 任務", products.Count);
Console.WriteLine("tk2 任務中 資料結果集數量為:{0}", tk.Result.Count());
result = tk.Result.WithCancellation(token).Where(p => p.Category.Contains("1") && p.Category.Contains("2"));
}
catch (AggregateException ex)
{
foreach (Exception e in ex.InnerExceptions)
{
Console.WriteLine("tk3 錯誤:{0}", e.Message);
}
}
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
/*建立tk3 任務,在執行tk1任務完成 基于tk1的結果查詢 符合 條件的資料*/
Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
{
var result = tk.Result;
try
{
Console.WriteLine("開始執行 tk3 任務", products.Count);
Console.WriteLine("tk3 任務中 資料結果集數量為:{0}", tk.Result.Count());
result = tk.Result.WithCancellation(token).Where(p => p.SellPrice > 1111 && p.SellPrice < 222222);
}
catch (AggregateException ex)
{
foreach (Exception e in ex.InnerExceptions)
{
Console.WriteLine("tk3 錯誤:{0}", e.Message);
}
}
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
tk1.Start();
try
{
Thread.Sleep(10);
cts.Cancel();//取消任務
Task.WaitAll(tk1, tk2, tk3);
Console.WriteLine("tk2任務結果輸出,篩選後記錄總數為:{0}", tk2.Result.Count());
Console.WriteLine("tk3任務結果輸出,篩選後記錄總數為:{0}", tk3.Result.Count());
}
catch (AggregateException ex)
{
foreach (Exception e in ex.InnerExceptions)
{
Console.WriteLine("錯誤:{0}", e.Message);
}
}
tk1.Dispose();
tk2.Dispose();
tk3.Dispose();
cts.Dispose();
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
指定查詢時所需的并行度 WithDegreeOfParallelism
預設情況下,PLINQ總是會試圖利用所有的可用邏輯核心達到最佳性能,在程式中我們可以利用WithDegreeOfParallelism方法指定一個不同最大并行度。

/*tk1任務 采用所有可用處理器*/
result = products.AsParallel().WithCancellation(token).WithDegreeOfParallelism(Environment.ProcessorCount).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
/*tk1任務 采用1個可用處理器*/
result = products.AsParallel().WithCancellation(token).WithDegreeOfParallelism(1).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
好處:如果計算機有8個可用的邏輯核心,PLINQ查詢最多運作4個并發任務,這樣可用使用Parallel.Invoke 加載多個帶有不同并行度的PLINQ查詢,有一些PLINQ查詢的可擴充性有限,是以這些選項可用讓您充分利用額外的核心。
使用ForAll 并行周遊結果
下面貼代碼:

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條資料*/
Parallel.For(0, 1000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
products.AsParallel().Where(P => P.Name.Contains("1") && P.Name.Contains("2") && P.Name.Contains("3")).ForAll(product =>
{
Console.WriteLine("Name:{0}", product.Name);
});
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
ForAll是并行,foreach是串行,如果需要以特定的順序處理資料,那麼必須使用上述串行循環或方法。
WithMergeOptions
通過WithMergeOptions擴充方法提示PLINQ應該優先使用哪種方式合并并行結果片段,如下:
下面貼代碼檢視下差異:

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
Console.WriteLine("目前計算機處理器數:{0}", Environment.ProcessorCount);
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條資料*/
Parallel.For(0, 600000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
Stopwatch sw = new Stopwatch();
Thread.Sleep(1000);
sw.Restart();
int count = 0;
Task tk1 = Task.Factory.StartNew(() =>
{
var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.AutoBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
count = result.Count();
});
Task.WaitAll(tk1);
sw.Stop();
Console.WriteLine("ParallelMergeOptions.AutoBuffered 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count);
sw.Restart();
int count1 = 0;
Task tk2 = Task.Factory.StartNew(() =>
{
var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.Default).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
count1 = result.Count();
});
Task.WaitAll(tk2);
sw.Stop();
Console.WriteLine("ParallelMergeOptions.Default 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count1);
sw.Restart();
int count2 = 0;
Task tk3 = Task.Factory.StartNew(() =>
{
var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.FullyBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
count2 = result.Count();
});
Task.WaitAll(tk3);
sw.Stop();
Console.WriteLine("ParallelMergeOptions.FullyBuffered 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count2);
sw.Restart();
int count3 = 0;
Task tk4 = Task.Factory.StartNew(() =>
{
var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
count3 = result.Count();
});
Task.WaitAll(tk4);
sw.Stop();
Console.WriteLine("ParallelMergeOptions.NotBuffered 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count3);
tk4.Dispose();
tk3.Dispose();
tk2.Dispose();
tk1.Dispose();
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
需要注意的是:每一個選項都有其優點和缺點,是以一定奧測量顯示第一個結果的時間以及完成整個查詢所需要的時間,這點很重要 。
使用PLINQ執行MapReduce算法 ILookup IGrouping
mapreduce ,也稱為Map/reduce 或者Map&Reduce ,是一種非常流行的架構,能夠充分利用并行化處理巨大的資料集,MapReduce的基本思想非常簡單:将資料處理問題分解為以下兩個獨立且可以并行執行的操作:
映射(Map)-對資料源進行操作,為每一個資料項計算出一個鍵值對。運作的結果是一個鍵值對的集合,根據鍵進行分組。
規約(Reduce)-對映射操作産生的根據鍵進行分組的所有鍵值對進行操作,對每一個組執行歸約操作,這個操作可以傳回一個或多個值。
下面貼代碼,友善大家了解,但是該案列所展示的并不是一個純粹的MapReduce算法實作:

class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<string> list = new ConcurrentQueue<string>();
list.Enqueue("A");
list.Enqueue("B");
list.Enqueue("C");
list.Enqueue("D");
list.Enqueue("A");
list.Enqueue("D");
Console.WriteLine("Select.......");
list.AsParallel().Select(p => new
{
Name = p,
Count = 1
}).ForAll((p) =>
{
Console.WriteLine("{0}\t{1}", p.Name, p.Count);
});
Console.WriteLine("ILookup.......");
/*map操作生成的鍵值對由一個單詞和數量1組成,該代碼意在将每個單詞作為鍵并将1作為值加入*/
ILookup<string, int> map = list.AsParallel().ToLookup(p => p, k => 1);
foreach (var v in map)
{
Console.Write(v.Key);
foreach (int val in v)
Console.WriteLine("\t{0}", val);
}
/*reduce操作單詞出現的次數*/
var reduce = from IGrouping<string, int> reduceM in map.AsQueryable()
select new
{
key = reduceM.Key,
count = reduceM.Count()
};
Console.WriteLine("IGrouping.......");
foreach (var v in reduce)
{
Console.Write(v.key);
Console.WriteLine("\t{0}", v.count);
}
Console.ReadLine();
}
}
關于PLINQ:聲明式資料并行就寫到這,主要是PLINQ下的查詢注意項和查詢調優的一些擴充方法。如有問題,歡迎指正。
作者:釋迦苦僧 出處:http://www.cnblogs.com/woxpp/p/3951096.html 本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。
作者:釋迦苦僧
出處:http://www.cnblogs.com/woxpp
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。
生活不易,五行缺金,求打點