天天看點

一起談.NET技術,4.0中的并行計算和多線程詳解(一)

  并行計算部分

  沿用微軟的寫法,System.Threading.Tasks.::.Parallel類,提供對并行循環和區域的支援。 我們會用到的方法有For,ForEach,Invoke。

  首先我們初始化一個List用于循環,這裡我們循環10次。(後面的代碼都會按這個标準進行循環)

Code

            Program.Data = new List<int>();

            for (int i = 0; i < 10; i++)

            {

                Data.Add(i);

            }

  下面我們定義4個方法,分别為for,foreach,并行For,并行ForEach。并測試他們的運作時長。

        /// <summary>

        /// 是否顯示執行過程

        /// </summary>

        public bool ShowProcessExecution = false;

        /// 這是普通循環for

        private void Demo1()

        {

            List<int> data = Program.Data;

            DateTime dt1 = DateTime.Now;

            for (int i = 0; i < data.Count; i++)

                Thread.Sleep(500);

                if (ShowProcessExecution)

                    Console.WriteLine(data[i]);

            DateTime dt2 = DateTime.Now;

            Console.WriteLine("普通循環For運作時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);

        }

        /// 這是普通循環foreach

        private void Demo2()

            foreach (var i in data)

                    Console.WriteLine(i);

        /// 這是并行計算For

        private void Demo3()

            Parallel.For(0, data.Count, (i) =>

            });

            Console.WriteLine("并行運算For運作時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);

        /// 這是并行計算ForEach

        private void Demo4()

            Parallel.ForEach(data, (i) =>

            Console.WriteLine("并行運算ForEach運作時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);

  下面是運作結果:

一起談.NET技術,4.0中的并行計算和多線程詳解(一)

  這裡我們可以看出并行循環在執行效率上的優勢了。

  結論1:在對一個數組内的每一個項做單獨處理時,完全可以選擇并行循環的方式來提升執行效率。

  原理1:并行計算的線程開啟是緩步開啟的,線程數量1,2,4,8緩步提升。(不詳,PLinq最多64個線程,可能這也是64)

  當在進行循環時,偶爾會需要中斷循環或跳出循環。下面是兩種跳出循環的方法Stop和Break,LoopState是循環狀态的參數。

        /// 中斷Stop

        private void Demo5()

            Parallel.For(0, data.Count, (i, LoopState) =>

                if (data[i] > 5)

                    LoopState.Stop();

                Console.WriteLine(data[i]);

            Console.WriteLine("Stop執行結束。");

        /// 中斷Break

        private void Demo6()

            Parallel.ForEach(data, (i, LoopState) =>

                if (i > 5)

                    LoopState.Break();

                Console.WriteLine(i);

            Console.WriteLine("Break執行結束。");

  執行結果如下:

一起談.NET技術,4.0中的并行計算和多線程詳解(一)

  結論2:使用Stop會立即停止循環,使用Break會執行完畢所有符合條件的項。

  上面的應用場景其實并不是非常多見,畢竟隻是為了周遊一個數組内的資源,我們更多的時候是為了周遊資源,找到我們所需要的。那麼請繼續看。

  下面是我們一般會想到的寫法:

        private void Demo7()

            List<int> data = new List<int>();

            Parallel.For(0, Program.Data.Count, (i) =>

                if (Program.Data[i] % 2 == 0)

                    data.Add(Program.Data[i]);

            Console.WriteLine("執行完成For.");

        private void Demo8()

            Parallel.ForEach(Program.Data, (i) =>

            Console.WriteLine("執行完成ForEach.");

  看起來應該是沒有問題的,但是我們多次運作後會發現,偶爾會出現錯誤如下:

一起談.NET技術,4.0中的并行計算和多線程詳解(一)

  這是因為List是非線程安全的類,我們需要使用System.Collections.Concurrent命名空間下的類型來用于并行循環體内。

說明

BlockingCollection<T>

為實作 IProducerConsumerCollection<T> 的線程安全集合提供阻止和限制功能。

ConcurrentBag<T>

表示對象的線程安全的無序集合。

ConcurrentDictionary<TKey, TValue>

表示可由多個線程同時通路的鍵值對的線程安全集合。

ConcurrentQueue<T>

表示線程安全的先進先出 (FIFO) 集合。

ConcurrentStack<T>

表示線程安全的後進先出 (LIFO) 集合。

OrderablePartitioner<TSource>

表示将一個可排序資料源拆分成多個分區的特定方式。

Partitioner

提供針對數組、清單和可枚舉項的常見分區政策。

Partitioner<TSource>

表示将一個資料源拆分成多個分區的特定方式。

  那麼我們上面的代碼可以修改為,加了了ConcurrentQueue和ConcurrentStack的最基本的操作。

        /// 并行循環操作集合類,集合内隻取5個對象

            ConcurrentQueue<int> data = new ConcurrentQueue<int>();

                    data.Enqueue(Program.Data[i]);//将對象加入到隊列末尾

            int R;

            while (data.TryDequeue(out R))//傳回隊列中開始處的對象

                Console.WriteLine(R);

        /// 并行循環操作集合類

            ConcurrentStack<int> data = new ConcurrentStack<int>();

                    data.Push(Program.Data[i]);//将對象壓入棧中

            while (data.TryPop(out R))//彈出棧頂對象

  ok,這裡傳回一個序列的問題也解決了。

  結論3:在并行循環内重複操作的對象,必須要是thread-safe(線程安全)的。集合類的線程安全對象全部在System.Collections.Concurrent命名空間下。

  使用循環的時候經常也會用到疊代,那麼在并行循環中叫做 含有局部變量的循環 。下面的代碼中詳細的解釋,這裡就不啰嗦了。

        /// 具有線程局部變量的For循環

        private void Demo9()

            long total = 0;

            //這裡定義傳回值為long類型友善下面各個參數的解釋

            Parallel.For<long>(0,           // For循環的起點

                data.Count,                 // For循環的終點

                () => 0,                    // 初始化局部變量的方法(long),既為下面的subtotal的初值

                (i, LoopState, subtotal) => // 為每個疊代調用一次的委托,i是目前索引,LoopState是循環狀态,subtotal為局部變量名

                {

                    subtotal += data[i];    // 修改局部變量

                    return subtotal;        // 傳遞參數給下一個疊代

                },

                (finalResult) => Interlocked.Add(ref total, finalResult) //對每個線程結果執行的最後操作,這裡是将所有的結果相加

                );

            Console.WriteLine(total);

        /// 具有線程局部變量的ForEach循環

        private void Demo10()

            Parallel.ForEach<int, long>(data, // 要循環的集合對象

                () => 0,                      // 初始化局部變量的方法(long),既為下面的subtotal的初值

                (i, LoopState, subtotal) =>   // 為每個疊代調用一次的委托,i是目前元素,LoopState是循環狀态,subtotal為局部變量名

                    subtotal += i;            // 修改局部變量

                    return subtotal;          // 傳遞參數給下一個疊代

  結論4:并行循環中的疊代,确實很傷人。代碼太難了解了。

  上面介紹完了For和ForEach的并行計算盛宴,微軟也沒忘記在Linq中加入并行計算。下面介紹Linq中的并行計算。

  4.0中在System.Linq命名空間下加入了下面幾個新的類:

ParallelEnumerable

提供一組用于查詢實作 ParallelQuery{TSource} 的對象的方法。這是 Enumerable 的并行等效項。

ParallelQuery

表示并行序列。

ParallelQuery<TSource>

  原理2:PLinq最多會開啟64個線程

  原理3:PLinq會自己判斷是否可以進行并行計算,如果不行則會以順序模式運作。

  原理4:PLinq會在昂貴的并行算法或成本較低的順序算法之間進行選擇,預設情況下它選擇順序算法。

  在ParallelEnumerable中提供的并行化的方法:

ParallelEnumerable 運算符

AsParallel()

PLINQ 的入口點。指定如果可能,應并行化查詢的其餘部分。

AsSequential()

指定查詢的其餘部分應像非并行 LINQ 查詢一樣按順序運作。

AsOrdered()

指定 PLINQ 應保留查詢的其餘部分的源序列排序,直到例如通過使用 orderby 子句更改排序為止。

AsUnordered()

指定查詢的其餘部分的 PLINQ 不需要保留源序列的排序。

WithCancellation()

指定 PLINQ 應定期監視請求取消時提供的取消标記和取消執行的狀态。

WithDegreeOfParallelism()

指定 PLINQ 應當用來并行化查詢的處理器的最大數目。

WithMergeOptions()

提供有關 PLINQ 應當如何(如果可能)将并行結果合并回到使用線程上的一個序列的提示。

WithExecutionMode()

指定 PLINQ 應當如何并行化查詢(即使預設行為是按順序運作查詢)。

ForAll()

多線程枚舉方法,與循環通路查詢結果不同,它允許在不首先合并回到使用者線程的情況下并行處理結果。

Aggregate() 重載

對于 PLINQ 唯一的重載,它啟用對線程本地分區的中間聚合以及一個用于合并所有分區結果的最終聚合函數。

  下面是PLinq的簡單代碼:

        /// PLinq簡介

        private void Demo11()

            var source = Enumerable.Range(1, 10000);

            //查詢結果按source中的順序排序

            var evenNums = from num in source.AsParallel().AsOrdered()

                       where num % 2 == 0

                       select num;

            //ForAll的使用

            ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();

            var query = from num in source.AsParallel()

                        where num % 10 == 0

                        select num;

            query.ForAll((e) => concurrentBag.Add(e * e));

  上面代碼中使用了ForAll,ForAll和foreach的差別如下:

一起談.NET技術,4.0中的并行計算和多線程詳解(一)

  PLinq的東西很繁雜,但是都隻是幾個簡單的方法,熟悉下方法就好了。

繼續閱讀