天天看點

C#并行程式設計-Task

菜鳥學習并行程式設計,參考《C#并行程式設計進階教程.PDF》,如有錯誤,歡迎指正。

目錄

  • C#并行程式設計-相關概念
  • C#并行程式設計-Parallel
  • C#并行程式設計-Task
  • C#并行程式設計-并發集合
  • C#并行程式設計-線程同步原語
  • C#并行程式設計-PLINQ:聲明式資料并行

任務簡介

TPL引入新的基于任務的程式設計模型,通過這種程式設計模型可以發揮多核的功效,提升應用程式的性能,不需要編寫底層複雜且重量級的線程代碼。

但需要注意:任務并不是線程(任務運作的時候需要使用線程,但并不是說任務取代了線程,任務代碼是使用底層的線程(軟體線程,排程在特定的硬體線程或邏輯核心上)運作的,任務與線程之間并沒有一對一的關系。)

建立一個新的任務時,排程器(排程器依賴于底層的線程池引擎)會使用工作竊取隊列找到一個最合适的線程,然後将任務加入隊列,任務所包含的代碼會在一個線程中運作。如圖:

C#并行程式設計-Task

System.Threading.Tasks.Task

一個Task表示一個異步操作,Task提供了很多方法和屬性,通過這些方法和屬性能夠對Task的執行進行控制,并且能夠獲得其狀态資訊。

Task的建立和執行都是獨立的,是以可以對關聯操作的執行擁有完全的控制權。

使用Parallel.For、Parallel.ForEach的循環疊代的并行執行,TPL會在背景建立System.Threading.Tasks.Task的執行個體。

使用Parallel.Invoke時,TPL也會建立與調用的委托數目一緻的System.Threading.Tasks.Task的執行個體。

注意項

程式中添加很多異步的操作作為Task執行個體加載的時候,為了充分利用運作時所有可用的邏輯核心,任務排程器會嘗試的并行的運作這些任務,也會嘗試在所有的可用核心上對工作進行負載均衡。

但在實際的編碼過程當中,并不是所有的代碼片段都能夠友善的用任務來運作,因為任務會帶來額外的開銷,盡管這種開銷比添加線程所帶來的開銷要小,但是仍然需要将這個開銷考慮在内。

Task狀态與生命周期

一個Task執行個體隻會完成其生命周期一次,當Task到達它的3種肯呢過的最終狀态之一是,就無法回到之前的任何狀态

C#并行程式設計-Task
C#并行程式設計-Task
C#并行程式設計-Task

下面貼代碼,詳解見注釋,友善大家了解Task的狀态:

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            /*  建立一個任務 不調用 不執行  狀态為Created */
            Task tk = new Task(() =>
            {
            });
            Console.WriteLine(tk.Status.ToString());

            /*  建立一個任務 執行  狀态為 WaitingToRun */
            Task tk1 = new Task(() =>
            {
            });
            tk1.Start();/*對于安排好的任務,就算調用Start方法也不會立馬啟動 此時任務的狀态為WaitingToRun*/
            Console.WriteLine(tk1.Status.ToString());

            /*  建立一個主任務 */
            Task mainTask = new Task(() =>
            {
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 30000);
            });
            /*  将子任務加入到主任務完成之後執行 */
            Task subTask = mainTask.ContinueWith((t1) =>
            {
            });
            /*  啟動主任務 */
            mainTask.Start();
            /*  此時子任務狀态為 WaitingForActivation */
            Console.WriteLine(subTask.Status.ToString());


            /*  建立一個任務 執行 後 等待一段時間 并行未結束的情況下 狀态為 Running */
            Task tk2 = new Task(() =>
            {
                SpinWait.SpinUntil(() => false, 30000);
            });
            tk2.Start();/*對于安排好的任務,就算調用Start方法也不會立馬啟動*/
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk2.Status.ToString());


            /*  建立一個任務 然後取消該任務 狀态為Canceled */
            CancellationTokenSource cts = new CancellationTokenSource();
            Task tk3 = new Task(() =>
            {
                for (int i = 0; i < int.MaxValue; i++)
                {
                    if (!cts.Token.IsCancellationRequested)
                    {
                        cts.Token.ThrowIfCancellationRequested();
                    }
                }
            }, cts.Token);
            tk3.Start();/*啟動任務*/
            SpinWait.SpinUntil(() => false, 100);
            cts.Cancel();/*取消該任務執行 但并非立馬取消 是以對于Canceled狀态也不會立馬生效*/
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);

            /*建立一個任務 讓它成功的運作完成 會得到 RanToCompletion 狀态*/
            Task tk4 = new Task(() =>
            {
                SpinWait.SpinUntil(() => false, 10);
            });
            tk4.Start();
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk4.Status.ToString());

            /*建立一個任務 讓它運作失敗 會得到 Faulted 狀态*/
            Task tk5 = new Task(() =>
            {
                throw new Exception();
            });
            tk5.Start();
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk5.Status.ToString());

            Console.ReadLine();
        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }      

View Code

C#并行程式設計-Task

使用任務來對代碼進行并行化

使用Parallel.Invoke可以并行加載多個方法,使用Task執行個體也能完成同樣的工作,下面貼代碼:

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);});
            Task tk2 = new Task(() => SetProduct(2));
            tk1.Start();
            tk2.Start();
          
          
            Console.ReadLine();
        }
        static void SetProduct(int index)
        {
            Parallel.For(0, 10000, (i) =>
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            });
            Console.WriteLine("SetProduct {0} 執行完成", index);
        }
    } 
    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }      
C#并行程式設計-Task

等待任務完成Task.WaitAll

Task.WaitAll 方法,這個方法是同步執行的,在Task作為參數被接受,所有Task結束其執行前,主線程不會繼續執行下一條指令,下面貼代碼

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); });
            Task tk2 = new Task(() => SetProduct(2));
            tk1.Start();
            tk2.Start();
            /*等待任務執行完成後再輸出 ====== */
            Task.WaitAll(tk1, tk2);
            Console.WriteLine("等待任務執行完成後再輸出 ======");

            Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); });
            Task tk4 = new Task(() => SetProduct(2));
            tk3.Start();
            tk4.Start();
            /*等待任務執行前輸出 ====== */
            Console.WriteLine("等待任務執行前輸出 ======");
            Task.WaitAll(tk3, tk4);


            Console.ReadLine();
        }
        static void SetProduct(int index)
        {
            Parallel.For(0, 10000, (i) =>
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            });
            Console.WriteLine("SetProduct {0} 執行完成", index);
        }
    }      
C#并行程式設計-Task

Task.WaitAll 限定等待時長

C#并行程式設計-Task
C#并行程式設計-Task
queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);});
            Task tk2 = new Task(() => SetProduct(2));
            tk1.Start();
            tk2.Start();

            /*如果tk1 tk2 沒能在10毫秒内完成 則輸出 *****  */
            if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10))
            {
                Console.WriteLine("******");
            }
          
            Console.ReadLine();      

如圖10毫秒沒有完成任務,則輸出了****

C#并行程式設計-Task

通過取消标記取消任務

通過取消标記來中斷Task執行個體的執行。 CancellationTokenSource,CancellationToken下的IsCanceled屬性标志目前是否已經被取消,取消任務,任務也不一定會馬上取消,下面貼代碼:

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            System.Threading.CancellationTokenSource token = new CancellationTokenSource();
            Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Thread.Sleep(10);
            /*取消任務操作*/
            token.Cancel();
            try
            {
                /*等待完成*/
                Task.WaitAll(new Task[] { tk1, tk2 });
            }
            catch (AggregateException ex)
            {
                /*如果目前的任務正在被取消,那麼還會抛出一個TaskCanceledException異常,這個異常包含在AggregateException異常中*/
                Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
                Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
            }

            Thread.Sleep(2000);
            Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
            Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
            Console.ReadLine();
        }
        static void SetProduct(System.Threading.CancellationToken ct)
        {
            /* 每一次循環疊代,都會有新的代碼調用 ThrowIfCancellationRequested 
             * 這行代碼能夠對 OpreationCanceledException 異常進行觀察
             * 并且這個異常的标記與Task執行個體關聯的那個标記進行比較,如果兩者相同 ,而且IsCancelled屬性為True,那麼Task執行個體就知道存在一個要求取消的請求,并且會将狀态轉變為Canceled狀态,中斷任務執行。  
             * 如果目前的任務正在被取消,那麼還會抛出一個TaskCanceledException異常,這個異常包含在AggregateException異常中
            /*檢查取消标記*/
            ct.ThrowIfCancellationRequested();
            for (int i = 0; i < 50000; i++)
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            
                ct.ThrowIfCancellationRequested();
            }
            Console.WriteLine("SetProduct   執行完成");
        }
    }
    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }      

Task異常處理 當很多任務并行運作的時候,可能會并行發生很多異常。Task執行個體能夠處理一組一組的異常,這些異常有System.AggregateException類處理

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            System.Threading.CancellationTokenSource token = new CancellationTokenSource();
            Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Thread.Sleep(2000);
            if (tk1.IsFaulted)
            {
                /*  循環輸出異常    */
                foreach (Exception ex in tk1.Exception.InnerExceptions)
                {
                    Console.WriteLine("tk1 Exception:{0}", ex.Message);
                }
            }
            Console.ReadLine();
        }

        static void SetProduct(System.Threading.CancellationToken ct)
        {
            for (int i = 0; i < 5; i++)
            {
                throw new Exception(string.Format("Exception Index {0}", i));
            }
            Console.WriteLine("SetProduct   執行完成");
        }
    }      
C#并行程式設計-Task

Task傳回值  Task<TResult>

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct());
            Task.WaitAll(tk1);
            Console.WriteLine(tk1.Result.Count);
            Console.WriteLine(tk1.Result[0].Name);
            Console.ReadLine();
        }
        static List<Product> SetProduct()
        {
            List<Product> result = new List<Product>();
            for (int i = 0; i < 500; i++)
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                result.Add(model);
            }
            Console.WriteLine("SetProduct   執行完成");
            return result;
        }
    }      
C#并行程式設計-Task

通過延續串聯多個任務

ContinueWith:建立一個目标Task完成時,異步執行的延續程式,await,如代碼所示:

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            /*建立任務t1*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("執行 t1 任務");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

            });
            /*建立任務t2   t2任務的執行 依賴與t1任務的執行完成*/
            Task t2 = t1.ContinueWith((t) =>
            {
                Console.WriteLine("執行 t2 任務"); 
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

            });    
            /*建立任務t3   t3任務的執行 依賴與t2任務的執行完成*/
            Task t3 = t2.ContinueWith((t) =>
            {
                Console.WriteLine("執行 t3 任務");
            });
            Console.ReadLine();
        }
    }      
C#并行程式設計-Task

TaskContinuationOptions

TaskContinuationOptions參數,可以控制延續另一個任的任務排程和執行的可選行為。下面看代碼:

C#并行程式設計-Task
C#并行程式設計-Task
class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            /*建立任務t1*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("執行 t1 任務");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);
                throw new Exception("異常");
            });

            /*建立任務t2   t2任務的執行 依賴與t1任務的執行完成*/
            Task t2 = t1.ContinueWith((t) =>
            {
                Console.WriteLine(t.Status);
                Console.WriteLine("執行 t2 任務");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

                /*定義 TaskContinuationOptions 行為為 NotOnFaulted 在 t1 任務抛出異常後,t1 的任務狀态為 Faulted , 則t2 不會執行裡面的方法 但是需要注意的是t3任務*/
                /*t2在不符合條件時 傳回Canceled狀态狀态讓t3任務執行*/
            }, TaskContinuationOptions.NotOnFaulted);
            /*建立任務t3   t3任務的執行 依賴與t2任務的執行完成*/

            /*t2在不符合條件時 傳回Canceled狀态狀态讓t3任務執行*/
            Task t3 = t2.ContinueWith((t) =>
            {
                Console.WriteLine(t.Status);
                Console.WriteLine("執行 t3 任務");
            });

            Console.ReadLine();
        }
    }      
C#并行程式設計-Task

TaskContinuationOptions 屬性有很多,如下所示

C#并行程式設計-Task

 關于并行程式設計中的Task就寫到這,如有問題,請指正。

作者:釋迦苦僧 出處:http://www.cnblogs.com/woxpp/p/3928788.html

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。

作者:釋迦苦僧

出處:http://www.cnblogs.com/woxpp

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。

生活不易,五行缺金,求打點

C#并行程式設計-Task

繼續閱讀