天天看點

并行程式設計之資料并行1:簡單例子 2:編寫具有線程本地變量的 Parallel.For 循環 3:編寫具有線程本地變量的 Parallel.ForEach 循環 4:取消 Parallel.For 或 ForEach Loop 5:停止或中斷 Parallel.For 循環 6:并行中的異常處理

任務并行庫 (TPL) 是 .NET Framework 4 版的 http://msdn.microsoft.com/zh-cn/library/system.threading.aspx System.Threading 和 http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.aspx System.Threading.Tasks 命名空間中的一組公共類型和 API。System.Threadings.Tasks 命名空間提供可簡化并發和異步代碼編寫工作的類型。主要類型為 http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.task.aspx Task(表示可以等待和取消的異步操作)和 http://msdn.microsoft.com/zh-cn/library/dd321424.aspx Task<(Of <(TResult>)>)(可以傳回值的任務)。Factory 類提供用于建立和啟動任務的靜态方法, http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.taskscheduler.aspx

TaskScheduler 類提供預設線程排程基礎結構。

TPL 的目的在于簡化向應用程式中添加并行性和并發性的過程,進而提高開發人員的工作效率。 TPL 會動态地按比例調節并發程度,以便最有效地使用所有可用的處理器。此外,TPL 還處理工作分區、

http://msdn.microsoft.com/zh-cn/library/system.threading.threadpool.aspx

ThreadPool 上的線程排程、取消支援、狀态管理以及其他低級别的細節操作。通過使用 TPL,您可以在将精力集中于程式要完成的工作,同時最大程度地提高代碼的性能。

從 .NET Framework 4 開始,TPL 是編寫多線程代碼和并行代碼的首選方法。例如,如果某個循環在每次疊代時隻執行少量工作,或它在很多次疊代時都不運作,那麼并行化的開銷可能導緻代碼運作更慢。

1:簡單例子

資料并行是指對源集合或數組中的元素同時(即并行)執行相同操作的情況。資料并行是指對源集合或數組中的元素同時(即并行)執行相同操作的情況。

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallel.aspx

System.Threading.Tasks..::.Parallel 類中 For 和 ForEach 方法的若幹重載支援使用強制性文法的資料并行。在資料并行操作中,将對源集合進行分區,以便多個線程能夠同時對不同的片段進行操作。TPL 支援通過

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallel.aspx

System.Threading.Tasks..::.Parallel 類實作的資料并行。此類提供

http://msdn.microsoft.com/zh-cn/library/ch45axte.aspx

for 和

http://msdn.microsoft.com/zh-cn/library/ttw7t8t6.aspx

foreach 循環(Visual Basic 中為 For 和 For Each)基于方法的并行實作。為 ParallelFor()()() 或 ParallelForEach()()() 循環編寫循環邏輯與編寫順序循環非常類似。您不必建立線程或隊列工作項。在基本的循環中,您不必采用鎖。TPL 将為您處理所有低級别工作。下面的代碼示例示範一個簡單的 foreach 循環及其并行等效項。

public void Method1()
        {
            List<string> sourceCollection = new List<string>() { "111", "222", "333" };
            // Sequential version            
            foreach (var item in sourceCollection)
            {
                Process(item);
            }

            // Parallel equivalent
            Parallel.ForEach(sourceCollection, item => Process(item));
        }

        private void Process(string item)
        {
            item += " done";
        }

      

當并行循環運作時,TPL 将對資料源進行分區,以便循環能夠同時對多個部分進行操作。在背景,任務計劃程式将根據系統資源和工作負荷來對任務進行分區。如有可能,計劃程式會在工作負荷變得不平衡的情況下在多個線程和處理器之間重新配置設定工作。

ParallelFor()()() 和 ParallelForEach()()() 方法都有若幹重載,利用這些重載可以停止或中斷循環執行、監視其他線程上循環的狀态、維護線程本地狀态、完成線程本地對象、控制并發程度,等等。啟用此功能的幫助器類型包括

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.aspx

ParallelLoopState、

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.paralleloptions.aspx

ParallelOptions 以及

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopresult.aspx

ParallelLoopResult、

http://msdn.microsoft.com/zh-cn/library/system.threading.cancellationtoken.aspx

CancellationToken 和

http://msdn.microsoft.com/zh-cn/library/system.threading.cancellationtokensource.aspx

CancellationTokenSource。

2:編寫具有線程本地變量的 Parallel.For 循環

此示例示範如何使用線程本地變量來存儲和檢索由

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallel.for.aspx

For 循環建立的每個單獨任務中的狀态。通過使用線程本地資料,您可以避免将大量的通路同步為共享狀态的開銷。在任務的所有疊代完成之前,您将計算和存儲值,而不是寫入每個疊代上的共享資源。然後,您可以将最終結果一次性寫入共享資源,或将其傳遞到另一個方法。

//int[] nums = Enumerable.Range(0, 1000000).ToArray();
            int[] nums = { 5, 6 };
            long total = 0;
            Parallel.For<long>(0, nums.Length, () => 1, (i, loopState, subtotal) =>
            {
                subtotal += nums[i];
                return subtotal;
            },
                (x) => Interlocked.Add(ref total, x)
            );
            MessageBox.Show(total.ToString());

      

我們來看Paralle.For方法中各個參數表示的意思:

(其中<long>是指傳回值類型)

參數 意義
開始索引
nums.Length 結束索引(不含)
() => 1 使用者傳回每個線程的本地資料初始狀态的函數委托。在這裡是将total初始化為1

(i, loop, subtotal) =>

{

subtotal += nums[i];

return subtotal;

}

将為每次疊代調用一次的委托,其中

i:循環變量,自動加1

loopState:為ParallelLoopState

Subtotal:傳回值,為long

(x) => Interlocked.Add(ref total, x) 對于每個線程的本地狀态執行一個最終狀态的委托。在這裡是計算了total

3:編寫具有線程本地變量的 Parallel.ForEach 循環

下面的代碼示範如何編寫使用線程本地變量的

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallel.foreach.aspx

ForEach 方法。當 ForEach 循環執行時,它會将其源集合劃分為多個分區。每個分區都将獲得自己的"線程本地"變量的副本。(術語"線程本地"在此處不太準确,因為在某些情況下兩個分區可能在同一線程上運作。)

若要在

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallel.foreach.aspx

ForEach 循環中使用線程本地變量,您必須使用采用兩個 type 參數的方法版本。第一個參數指定源元素的類型,第二個參數指定線程本地變量的類型。

//int[] nums = Enumerable.Range(0, 1000000).ToArray();
            int[] nums = { 5, 6 };
            long total = 0;
            Parallel.ForEach<int, long>(nums, () => 1, (i, loopState, subtotal) => 
            {
                subtotal += nums[i]; 
                return subtotal;
            },
                (x) => Interlocked.Add(ref total, x)
            );
            MessageBox.Show(total.ToString());

      

我們來看Paralle.ForEach方法中各個參數表示的意思:

(其中<int, long>中int是指資料源類型,long是指傳回值類型)

nums 資料源數組
(i, loopState, subtotal) =>

4:取消 Parallel.For 或 ForEach Loop

int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();

            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;

            Task.Factory.StartNew(() =>
            {
                if (Console.ReadKey().KeyChar == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit");
                Console.ReadKey();
            });

            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }

      

5:停止或中斷 Parallel.For 循環

"中斷"表示完成目前線程上目前疊代之前的所有線程上的所有疊代,然後退出循環。""停止"表示在友善的情況下盡快停止所有疊代。

namespace StopOrBreak
{
    using System;
    using System.Collections.Concurrent;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;

    class Test
    {
        static void Main()
        {
            StopLoop();
            BreakAtThreshold();

            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }

        private static void StopLoop()
        {
            Console.WriteLine("Stop loop...");
            double[] source = MakeDemoSource(1000, 1);
            ConcurrentStack<double> results = new ConcurrentStack<double>();

            // i is the iteration variable. loopState is a 
            // compiler-generated ParallelLoopState
            Parallel.For(0, source.Length, (i, loopState) =>
            {
                // Take the first 100 values that are retrieved
                // from anywhere in the source.
                if (i < 100)
                {
                    // Accessing shared object on each iteration
                    // is not efficient. See remarks.
                    double d = Compute(source[i]);
                    results.Push(d);
                }
                else
                {
                    loopState.Stop();
                    return;
                }

            } // Close lambda expression.
            ); // Close Parallel.For

            Console.WriteLine("Results contains {0} elements", results.Count());
        }


        static void BreakAtThreshold()
        {
            double[] source = MakeDemoSource(10000, 1.0002);
            ConcurrentStack<double> results = new ConcurrentStack<double>();

            // Store all values below a specified threshold.
            Parallel.For(0, source.Length, (i, loopState) =>
            {
                double d = Compute(source[i]);
                results.Push(d);
                if (d > .2)
                {
                    // Might be called more than once!
                    loopState.Break();
                    Console.WriteLine("Break called at iteration {0}. d = {1} ", i, d);
                    Thread.Sleep(1000);
                }
            });

            Console.WriteLine("results contains {0} elements", results.Count());
        }

        static double Compute(double d)
        {
            //Make the processor work just a little bit.
            return Math.Sqrt(d);
        }


        // Create a contrived array of monotonically increasing
        // values for demonstration purposes. 
        static double[] MakeDemoSource(int size, double valToFind)
        {
            double[] result = new double[size];
            double initialval = .01;
            for (int i = 0; i < size; i++)
            {
                initialval *= valToFind;
                result[i] = initialval;
            }

            return result;
        }
    }

}

      

ParallelFor()()() 或 [Overload:System.Threading.Tasks.Parallel.Parallel.ForEach`1] 循環中,不能使用與順序循環中相同的

http://msdn.microsoft.com/zh-cn/library/adbctzc4.aspx

break 或

http://msdn.microsoft.com/zh-cn/library/t2at9t47.aspx

Exit 語句,這是因為這些語言構造對于循環是有效的,而并行"循環"實際上是方法,不是循環。相反,可以使用

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.stop.aspx

Stop 或

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.break.aspx

Break 方法。

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallel.for.aspx

Parallel..::.For 的一些重載接受 Action<int, ParallelLoopState>(在 Visual Basic 中為 Action(Of Integer, ParallelLoopState))作為輸入參數。

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.aspx

ParallelLoopState 對象由運作時在背景建立,您可以在 lambda 表達式中為它指定您喜歡的任何名稱。

在下面的示例中,該方法隻需要源序列中的 100 個值,檢索出哪些元素并不重要。在此案例中,使用

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.stop.aspx

Stop 方法,因為它将告知循環的所有疊代(包括那些在其他線程上的目前疊代之前開始的疊代)在友善的情況下盡快停止。

在第二個方法中,将檢索所有元素,直到源序列中指定的索引。在此案例中,調用

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.break.aspx

Break,這是因為當到達一個線程上的索引時,源中前面的元素有可能尚未處理。中斷會導緻其他線程放棄對後續片段的工作(如果它們正忙于任何這樣的工作),并在退出循環之前處理完所有以前的元素。

在調用

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.stop.aspx http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.break.aspx

Break 後,循環中的其他線程可能會繼續運作一段時間(這不受應用程式開發人員的控制),了解這一點很重要。可以使用

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.isstopped.aspx

ParallelLoopState..::.IsStopped 屬性檢查是否已在另一個線程上停止該循環。在下面的示例中,如果

http://msdn.microsoft.com/zh-cn/library/system.threading.tasks.parallelloopstate.isstopped.aspx

IsStopped 為 true,則不會再有資料寫入到集合中。

6:并行中的異常處理

private void temp2_Click(object sender, RoutedEventArgs e)
        {
            byte[] data = new byte[5000];
            Random r = new Random();
            r.NextBytes(data);
            try
            {
                ProcessDataInParallel(data);
            }
            catch (AggregateException ae)
            {
                foreach (var ex in ae.InnerExceptions)
                {
                    if (ex is ArgumentException)
                        Console.WriteLine(ex.Message);
                    else
                        throw ex;
                }
            }
        }

        private static void ProcessDataInParallel(byte[] data)
        {
            var exceptions = new ConcurrentQueue<Exception>();
            Parallel.ForEach(data, d =>
            {
                try
                {
                    if (d < 0x3)
                        throw new ArgumentException(String.Format("value is {0:x}. Elements must be greater than 0x3.", d));
                    else
                        Console.Write(d + " ");
                }
                catch (Exception e) { exceptions.Enqueue(e); }
            });
            if (exceptions.Count > 0) throw new AggregateException(exceptions);
        }


      
并行程式設計之資料并行1:簡單例子 2:編寫具有線程本地變量的 Parallel.For 循環 3:編寫具有線程本地變量的 Parallel.ForEach 循環 4:取消 Parallel.For 或 ForEach Loop 5:停止或中斷 Parallel.For 循環 6:并行中的異常處理

本文基于

Creative Commons Attribution 2.5 China Mainland License

釋出,歡迎轉載,演繹或用于商業目的,但是必須保留本文的署名

http://www.cnblogs.com/luminji

(包含連結)。如您有任何疑問或者授權方面的協商,請給我留言。