天天看點

.NET(C#) Internals: 以一個數組填充的例子初步了解.NET 4.0中的并行(二)

1、Parallel Class

1.1、For方法

1.2、ForEach方法

1.3、Invoke方法

2、并發控制疑問?

2.1、使用Lock鎖

2.2、使用PLINQ——用AsParallel

2.3、使用PLINQ——用ParallelEnumerable

2.4、使用Interlocked操作

2.5、使用Parallel.For的有Thread-Local變量重載函數

性能比較

using System.Threading.Tasks;     

class Test  

{  

    static int N = 1000;  

    static void TestMethod()  

    {  

        // Using a named method.  

        Parallel.For(0, N, Method2);  

        // Using an anonymous method.  

        Parallel.For(0, N, delegate(int i)  

        {  

            // Do Work.  

        });  

        // Using a lambda expression.  

        Parallel.For(0, N, i =>  

    }  

    static void Method2(int i)  

        // Do work.  

上面這個例子簡單易懂,上篇我們就是用的Parallel.For,這裡就不解釋了。其實Parallel類的方法主要分為下面三類:

For方法

ForEach方法

Invoke方法

在裡面執行的for循環可能并行地運作,它有12個重載。這12個重載中Int32參數和Int64參數的方法各為6個,下面以Int32為例列出:

using System;  

using System.Collections.Generic;  

using System.Linq;  

using System.Text;  

using System.Threading;  

using System.Threading.Tasks;  

namespace ConsoleApplication2  

    class Program  

        // Demonstrated features:  

        //        CancellationTokenSource  

        //         Parallel.For()  

        //        ParallelOptions  

        //        ParallelLoopResult  

        // Expected results:  

        //         An iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed.  

        //        The order of execution of the iterations is undefined.  

        //        The iteration when i=2 cancels the loop.  

        //        Some iterations may bail out or not start at all; because they are temporally executed in unpredictable order,   

        //          it is impossible to say which will start/complete and which won't.  

        //        At the end, an OperationCancelledException is surfaced.  

        // Documentation:  

        //        http://msdn.microsoft.com/en-us/library/system.threading.cancellationtokensource(VS.100).aspx  

        static void Main(string[] args)  

            CancellationTokenSource cancellationSource = new CancellationTokenSource();  

            ParallelOptions options = new ParallelOptions();  

            options.CancellationToken = cancellationSource.Token;  

            try 

            {  

                ParallelLoopResult loopResult = Parallel.For(  

                    0,  

                    10,  

                    options,  

                    (i, loopState) =>  

                    {  

                        Console.WriteLine("Start Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);  

                        // Simulate a cancellation of the loop when i=2  

                        if (i == 2)  

                        {  

                            cancellationSource.Cancel();  

                        }  

                        // Simulates a long execution  

                        for (int j = 0; j < 10; j++)  

                            Thread.Sleep(1 * 200);  

                            // check to see whether or not to continue  

                            if (loopState.ShouldExitCurrentIteration) return;  

                        Console.WriteLine("Finish Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);  

                    }  

                );  

                if (loopResult.IsCompleted)  

                {  

                    Console.WriteLine("All iterations completed successfully. THIS WAS NOT EXPECTED.");  

                }  

            }  

                // No exception is expected in this example, but if one is still thrown from a task,  

                // it will be wrapped in AggregateException and propagated to the main thread.  

            catch (AggregateException e)  

                Console.WriteLine("Parallel.For has thrown an AggregateException. THIS WAS NOT EXPECTED.\n{0}", e);  

                // Catching the cancellation exception  

            catch (OperationCanceledException e)  

                Console.WriteLine("An iteration has triggered a cancellation. THIS WAS EXPECTED.\n{0}", e.ToString());  

        }  

提供的每個動作可能并行地執行,它有2個重載。

例如下面代碼執行了三個操作(來自MSDN):

        static void Main()  

                Parallel.Invoke(  

                    BasicAction,    // Param #0 - static method  

                    () =>            // Param #1 - lambda expression  

                        Console.WriteLine("Method=beta, Thread={0}", Thread.CurrentThread.ManagedThreadId);  

                    },  

                    delegate()        // Param #2 - in-line delegate  

                        Console.WriteLine("Method=gamma, Thread={0}", Thread.CurrentThread.ManagedThreadId);  

            // No exception is expected in this example, but if one is still thrown from a task,  

            // it will be wrapped in AggregateException and propagated to the main thread.  

                Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}", e.InnerException.ToString());  

        static void BasicAction()  

            Console.WriteLine("Method=alpha, Thread={0}", Thread.CurrentThread.ManagedThreadId);  

有人提出以下疑問:“如果For裡面的東西,對于順序敏感的話,會不會有問題。并行處理的話,說到底應該是多線程。如果需要Lock住什麼東西的話,應該怎麼做呢?例如這個例子不是對數組填充,是對檔案操作呢?對某個資源操作呢?”

    {          

            int loops=0;  

            while (loops <= 100)  

                long sum = 0;                  

                Parallel.For(1, 1001, delegate(long i)  

                    sum += i;  

                });  

                System.Console.WriteLine(sum);  

                loops++;  

在上述代碼中,為了校驗正确性我進行了重複做了100次,得出如下結果:

<a target="_blank" href="http://blog.51cto.com/attachment/201008/163420107.png"></a>

圖1、100次的前面部分結果

我們知道500500才是正确的答案,這說明Parallel.For不能保證對sum正确的并發執行,對此我們應該加上适當的控制,并借機來回答上面提出的如何加鎖的問題。下面有幾種方案可以解決這個問題:

這個我就不多解釋了,直接上代碼:

            int loops = 0;  

            object moniter = new object();  

                long sum = 0;  

                    lock (moniter) { sum += i; }  

我們加上lock鎖之後就會得出正确的結果。

關于PLINQ,以後将會介紹到,這裡不會詳細介紹,感興趣的自行查閱資料。代碼如下:

                long sum = 0;       

                sum = Enumerable.Range(0, 1001).AsParallel().Sum();  

運作可以得到正确的結果。

這個也不多說,直接上代碼,因為關于PLINQ将在以後詳細介紹,感興趣的自行查閱資料。

                sum = ParallelEnumerable.Range(0, 1001).Sum();   

運作同樣可以得到正确結果。

代碼如下:

                    Interlocked.Add(ref sum, i);  

運作可以得到正确結果。

這個方法已經在1.2中介紹,這裡直接上代碼,代碼如下:

                int sum = 0;  

                Parallel.For(0, 1001, () =&gt; 0, (i, state,subtotal) =&gt;  

                    subtotal += i;  

                    return subtotal;  

                },  

                partial =&gt; Interlocked.Add(ref sum, partial));  

運作可得正确結果。

上面的解決方案那個比較好呢?請大家各抒己見!關于這個我已經測試了一下。

PS:感覺寫這篇的時候很累,思緒也很亂,不知大家對這篇還滿意(⊙_⊙)?有什麼地方需要改進,或者說不易了解,或者哪個地方錯了!

     本文轉自Saylor87 51CTO部落格,原文連結:http://blog.51cto.com/skynet/365694,如需轉載請自行聯系原作者

繼續閱讀