1、Parallel Class
1.1、For方法
1.2、ForEach方法
1.3、Invoke方法
2、并发控制疑问?
2.1、使用Lock锁
2.2、使用PLINQ——用AsParallel
2.3、使用PLINQ——用ParallelEnumerable
2.4、使用Interlocked操作
2.5、使用Parallel.For的有Thread-Local变量重载函数
性能比较
using System.Threading.Tasks;
class Test
{
static int N = 1000;
static void TestMethod()
{
// Using a named method.
Parallel.For(0, N, Method2);
// Using an anonymous method.
Parallel.For(0, N, delegate(int i)
{
// Do Work.
});
// Using a lambda expression.
Parallel.For(0, N, i =>
}
static void Method2(int i)
// Do work.
}
上面这个例子简单易懂,上篇我们就是用的Parallel.For,这里就不解释了。其实Parallel类的方法主要分为下面三类:
For方法
ForEach方法
Invoke方法
在里面执行的for循环可能并行地运行,它有12个重载。这12个重载中Int32参数和Int64参数的方法各为6个,下面以Int32为例列出:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication2
class Program
// Demonstrated features:
// CancellationTokenSource
// Parallel.For()
// ParallelOptions
// ParallelLoopResult
// Expected results:
// An iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed.
// The order of execution of the iterations is undefined.
// The iteration when i=2 cancels the loop.
// Some iterations may bail out or not start at all; because they are temporally executed in unpredictable order,
// it is impossible to say which will start/complete and which won't.
// At the end, an OperationCancelledException is surfaced.
// Documentation:
// http://msdn.microsoft.com/en-us/library/system.threading.cancellationtokensource(VS.100).aspx
static void Main(string[] args)
CancellationTokenSource cancellationSource = new CancellationTokenSource();
ParallelOptions options = new ParallelOptions();
options.CancellationToken = cancellationSource.Token;
try
{
ParallelLoopResult loopResult = Parallel.For(
0,
10,
options,
(i, loopState) =>
{
Console.WriteLine("Start Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);
// Simulate a cancellation of the loop when i=2
if (i == 2)
{
cancellationSource.Cancel();
}
// Simulates a long execution
for (int j = 0; j < 10; j++)
Thread.Sleep(1 * 200);
// check to see whether or not to continue
if (loopState.ShouldExitCurrentIteration) return;
Console.WriteLine("Finish Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);
}
);
if (loopResult.IsCompleted)
{
Console.WriteLine("All iterations completed successfully. THIS WAS NOT EXPECTED.");
}
}
// No exception is expected in this example, but if one is still thrown from a task,
// it will be wrapped in AggregateException and propagated to the main thread.
catch (AggregateException e)
Console.WriteLine("Parallel.For has thrown an AggregateException. THIS WAS NOT EXPECTED.\n{0}", e);
// Catching the cancellation exception
catch (OperationCanceledException e)
Console.WriteLine("An iteration has triggered a cancellation. THIS WAS EXPECTED.\n{0}", e.ToString());
}
提供的每个动作可能并行地执行,它有2个重载。
例如下面代码执行了三个操作(来自MSDN):
static void Main()
Parallel.Invoke(
BasicAction, // Param #0 - static method
() => // Param #1 - lambda expression
Console.WriteLine("Method=beta, Thread={0}", Thread.CurrentThread.ManagedThreadId);
},
delegate() // Param #2 - in-line delegate
Console.WriteLine("Method=gamma, Thread={0}", Thread.CurrentThread.ManagedThreadId);
// No exception is expected in this example, but if one is still thrown from a task,
// it will be wrapped in AggregateException and propagated to the main thread.
Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}", e.InnerException.ToString());
static void BasicAction()
Console.WriteLine("Method=alpha, Thread={0}", Thread.CurrentThread.ManagedThreadId);
有人提出以下疑问:“如果For里面的东西,对于顺序敏感的话,会不会有问题。并行处理的话,说到底应该是多线程。如果需要Lock住什么东西的话,应该怎么做呢?例如这个例子不是对数组填充,是对文件操作呢?对某个资源操作呢?”
{
int loops=0;
while (loops <= 100)
long sum = 0;
Parallel.For(1, 1001, delegate(long i)
sum += i;
});
System.Console.WriteLine(sum);
loops++;
在上述代码中,为了校验正确性我进行了重复做了100次,得出如下结果:
<a target="_blank" href="http://blog.51cto.com/attachment/201008/163420107.png"></a>
图1、100次的前面部分结果
我们知道500500才是正确的答案,这说明Parallel.For不能保证对sum正确的并发执行,对此我们应该加上适当的控制,并借机来回答上面提出的如何加锁的问题。下面有几种方案可以解决这个问题:
这个我就不多解释了,直接上代码:
int loops = 0;
object moniter = new object();
long sum = 0;
lock (moniter) { sum += i; }
我们加上lock锁之后就会得出正确的结果。
关于PLINQ,以后将会介绍到,这里不会详细介绍,感兴趣的自行查阅资料。代码如下:
long sum = 0;
sum = Enumerable.Range(0, 1001).AsParallel().Sum();
运行可以得到正确的结果。
这个也不多说,直接上代码,因为关于PLINQ将在以后详细介绍,感兴趣的自行查阅资料。
sum = ParallelEnumerable.Range(0, 1001).Sum();
运行同样可以得到正确结果。
代码如下:
Interlocked.Add(ref sum, i);
运行可以得到正确结果。
这个方法已经在1.2中介绍,这里直接上代码,代码如下:
int sum = 0;
Parallel.For(0, 1001, () => 0, (i, state,subtotal) =>
subtotal += i;
return subtotal;
},
partial => Interlocked.Add(ref sum, partial));
运行可得正确结果。
上面的解决方案那个比较好呢?请大家各抒己见!关于这个我已经测试了一下。
PS:感觉写这篇的时候很累,思绪也很乱,不知大家对这篇还满意(⊙_⊙)?有什么地方需要改进,或者说不易理解,或者哪个地方错了!
本文转自Saylor87 51CTO博客,原文链接:http://blog.51cto.com/skynet/365694,如需转载请自行联系原作者