原文: C#多線程程式設計系列(四)- 使用線程池 目錄
- 1.1 簡介
- 1.2 線上程池中調用委托
- 1.3 向線程池中放入異步操作
- 1.4 線程池與并行度
- 1.5 實作一個取消選項
- 1.6 線上程池中使用等待事件處理器及逾時
- 1.7 使用計時器
- 1.8 使用BackgroundWorker元件
- 參考書籍
- 筆者水準有限,如果錯誤歡迎各位批評指正!
在本章中,主要介紹線程池(ThreadPool)的使用;在C#中它叫
System.Threading.ThreadPool
,在使用線程池之前首先我們得明白一個問題,那就是為什麼要使用線程池。其主要原因是建立一個線程的代價是昂貴的,建立一個線程會消耗很多的系統資源。
那麼線程池是如何解決這個問題的呢?線程池在初始時會自動建立一定量的線程供程式調用,使用時,開發人員并不直接配置設定線程,而是将需要做的工作放入線程池工作隊列中,由線程池配置設定已有的線程進行處理,等處理完畢後線程不是被銷毀,而是重新回到線程池中,這樣節省了建立線程的開銷。
但是在使用線程池時,需要注意以下幾點,這将非常重要。
- 線程池不适合處理長時間運作的作業,或者處理需要與其它線程同步的作業。
- 避免将線程池中的工作線程配置設定給I/O首先的任務,這種任務應該使用TPL模型。
- 如非必須,不要手動設定線程池的最小線程數和最大線程數,CLR會自動的進行線程池的擴張和收縮,手動幹預往往讓性能更差。
本節展示的是如何線上程池中如何異步的執行委托,然後将介紹一個叫異步程式設計模型(Asynchronous Programming Model,簡稱APM)的異步程式設計方式。
在本節及以後,為了降低代碼量,在引用程式集聲明位置預設添加了
using static System.Console
和
using static System.Threading.Thead
聲明,這樣聲明可以讓我們在程式中少些一些意義不大的調用語句。
示範代碼如下所示,使用了普通建立線程和APM方式來執行同一個任務。
static void Main(string[] args)
{
int threadId = 0;
RunOnThreadPool poolDelegate = Test;
var t = new Thread(() => Test(out threadId));
t.Start();
t.Join();
WriteLine($"手動建立線程 Id: {threadId}");
// 使用APM方式 進行異步調用 異步調用會使用線程池中的線程
IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "委托異步調用");
r.AsyncWaitHandle.WaitOne();
// 擷取異步調用結果
string result = poolDelegate.EndInvoke(out threadId, r);
WriteLine($"Thread - 線程池工作線程Id: {threadId}");
WriteLine(result);
Console.ReadLine();
}
// 建立帶一個參數的委托類型
private delegate string RunOnThreadPool(out int threadId);
private static void Callback(IAsyncResult ar)
{
WriteLine("Callback - 開始運作Callback...");
WriteLine($"Callback - 回調傳遞狀态: {ar.AsyncState}");
WriteLine($"Callback - 是否為線程池線程: {CurrentThread.IsThreadPoolThread}");
WriteLine($"Callback - 線程池工作線程Id: {CurrentThread.ManagedThreadId}");
}
private static string Test(out int threadId)
{
string isThreadPoolThread = CurrentThread.IsThreadPoolThread ? "ThreadPool - ": "Thread - ";
WriteLine($"{isThreadPoolThread}開始運作...");
WriteLine($"{isThreadPoolThread}是否為線程池線程: {CurrentThread.IsThreadPoolThread}");
Sleep(TimeSpan.FromSeconds(2));
threadId = CurrentThread.ManagedThreadId;
return $"{isThreadPoolThread}線程池工作線程Id: {threadId}";
}
運作結果如下圖所示,其中以Thread開頭的為手動建立的線程輸出的資訊,而TheadPool為開始線程池任務輸出的資訊,Callback為APM模式運作任務結束後,執行的回調方法,可以清晰的看到,Callback的線程也是線程池的工作線程。

在上文中,使用
BeginOperationName/EndOperationName
方法和.Net中的
IAsyncResult
對象的方式被稱為異步程式設計模型(或APM模式),這樣的方法被稱為異步方法。使用委托的
BeginInvoke
方法來運作該委托,
BeginInvoke
接收一個回調函數,該回調函數會在任務處理完成後背調用,并且可以傳遞一個使用者自定義的狀态給回調函數。
現在這種APM程式設計方式用的越來越少了,更推薦使用任務并行庫(Task Parallel Library,簡稱TPL)來組織異步API。
本節将介紹如何将異步操作放入線程池中執行,并且如何傳遞參數給線程池中的線程。本節中主要用到的是
ThreadPool.QueueUserWorkItem()
方法,該方法可将需要運作的任務通過委托的形式傳遞給線程池中的線程,并且允許傳遞參數。
使用比較簡單,示範代碼如下所示。示範了線程池使用中如何傳遞方法和參數,最後需要注意的是使用了
Lambda
表達式和它的閉包機制。
static void Main(string[] args)
{
const int x = 1;
const int y = 2;
const string lambdaState = "lambda state 2";
// 直接将方法傳遞給線程池
ThreadPool.QueueUserWorkItem(AsyncOperation);
Sleep(TimeSpan.FromSeconds(1));
// 直接将方法傳遞給線程池 并且 通過state傳遞參數
ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");
Sleep(TimeSpan.FromSeconds(1));
// 使用Lambda表達式将任務傳遞給線程池 并且通過 state傳遞參數
ThreadPool.QueueUserWorkItem(state =>
{
WriteLine($"Operation state: {state}");
WriteLine($"工作線程 id: {CurrentThread.ManagedThreadId}");
Sleep(TimeSpan.FromSeconds(2));
}, "lambda state");
// 使用Lambda表達式将任務傳遞給線程池 通過 **閉包** 機制傳遞參數
ThreadPool.QueueUserWorkItem(_ =>
{
WriteLine($"Operation state: {x + y}, {lambdaState}");
WriteLine($"工作線程 id: {CurrentThread.ManagedThreadId}");
Sleep(TimeSpan.FromSeconds(2));
}, "lambda state");
ReadLine();
}
private static void AsyncOperation(object state)
{
WriteLine($"Operation state: {state ?? "(null)"}");
WriteLine($"工作線程 id: {CurrentThread.ManagedThreadId}");
Sleep(TimeSpan.FromSeconds(2));
}
運作結果如下圖所示。
在本節中,主要是使用普通建立線程和使用線程池内的線程在任務量比較大的情況下有什麼差別,我們模拟了一個場景,建立了很多不同的線程,然後分别使用普通建立線程方式和線程池方式看看有什麼不同。
static void Main(string[] args)
{
const int numberOfOperations = 500;
var sw = new Stopwatch();
sw.Start();
UseThreads(numberOfOperations);
sw.Stop();
WriteLine($"使用線程執行總用時: {sw.ElapsedMilliseconds}");
sw.Reset();
sw.Start();
UseThreadPool(numberOfOperations);
sw.Stop();
WriteLine($"使用線程池執行總用時: {sw.ElapsedMilliseconds}");
Console.ReadLine();
}
static void UseThreads(int numberOfOperations)
{
using (var countdown = new CountdownEvent(numberOfOperations))
{
WriteLine("通過建立線程排程工作");
for (int i = 0; i < numberOfOperations; i++)
{
var thread = new Thread(() =>
{
Write($"{CurrentThread.ManagedThreadId},");
Sleep(TimeSpan.FromSeconds(0.1));
countdown.Signal();
});
thread.Start();
}
countdown.Wait();
WriteLine();
}
}
static void UseThreadPool(int numberOfOperations)
{
using (var countdown = new CountdownEvent(numberOfOperations))
{
WriteLine("使用線程池開始工作");
for (int i = 0; i < numberOfOperations; i++)
{
ThreadPool.QueueUserWorkItem(_ =>
{
Write($"{CurrentThread.ManagedThreadId},");
Sleep(TimeSpan.FromSeconds(0.1));
countdown.Signal();
});
}
countdown.Wait();
WriteLine();
}
}
執行結果如下,可見使用原始的建立線程執行,速度非常快。隻花了2秒鐘,但是建立了500多個線程,而使用線程池相對來說比較慢,花了9秒鐘,但是隻建立了很少的線程,為作業系統節省了線程和記憶體空間,但花了更多的時間。
在之前的文章中有提到,如果需要終止一個線程的執行,那麼可以使用
Abort()
方法,但是有諸多的原因并不推薦使用
Abort()
方法。
這裡推薦的方式是使用協作式取消(cooperative cancellation),這是一種可靠的技術來安全取消不再需要的任務。其主要用到
CancellationTokenSource
CancellationToken
兩個類,具體用法見下面示範代碼。
以下延時代碼主要是實作了使用
CancellationToken
CancellationTokenSource
來實作任務的取消。但是任務取消後可以進行三種操作,分别是:直接傳回、抛出
ThrowIfCancellationRequesed
異常和執行回調。詳細請看代碼。
static void Main(string[] args)
{
// 使用CancellationToken來取消任務 取消任務直接傳回
using (var cts = new CancellationTokenSource())
{
CancellationToken token = cts.Token;
ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token));
Sleep(TimeSpan.FromSeconds(2));
cts.Cancel();
}
// 取消任務 抛出 ThrowIfCancellationRequesed 異常
using (var cts = new CancellationTokenSource())
{
CancellationToken token = cts.Token;
ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token));
Sleep(TimeSpan.FromSeconds(2));
cts.Cancel();
}
// 取消任務 并 執行取消後的回調函數
using (var cts = new CancellationTokenSource())
{
CancellationToken token = cts.Token;
token.Register(() => { WriteLine("第三個任務被取消,執行回調函數。"); });
ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token));
Sleep(TimeSpan.FromSeconds(2));
cts.Cancel();
}
ReadLine();
}
static void AsyncOperation1(CancellationToken token)
{
WriteLine("啟動第一個任務.");
for (int i = 0; i < 5; i++)
{
if (token.IsCancellationRequested)
{
WriteLine("第一個任務被取消.");
return;
}
Sleep(TimeSpan.FromSeconds(1));
}
WriteLine("第一個任務運作完成.");
}
static void AsyncOperation2(CancellationToken token)
{
try
{
WriteLine("啟動第二個任務.");
for (int i = 0; i < 5; i++)
{
token.ThrowIfCancellationRequested();
Sleep(TimeSpan.FromSeconds(1));
}
WriteLine("第二個任務運作完成.");
}
catch (OperationCanceledException)
{
WriteLine("第二個任務被取消.");
}
}
static void AsyncOperation3(CancellationToken token)
{
WriteLine("啟動第三個任務.");
for (int i = 0; i < 5; i++)
{
if (token.IsCancellationRequested)
{
WriteLine("第三個任務被取消.");
return;
}
Sleep(TimeSpan.FromSeconds(1));
}
WriteLine("第三個任務運作完成.");
}
運作結果如下所示,符合預期結果。
本節将介紹如何線上程池中使用等待任務和如何進行逾時處理,其中主要用到
ThreadPool.RegisterWaitForSingleObject()
方法,該方法允許傳入一個
WaitHandle
對象,和需要執行的任務、逾時時間等。通過使用這個方法,可完成線程池情況下對逾時任務的處理。
示範代碼如下所示,運作了兩次使用
ThreadPool.RegisterWaitForSingleObject()
編寫逾時代碼的
RunOperations()
方法,但是所傳入的逾時時間不同,是以造成一個必然逾時和一個不會逾時的結果。
static void Main(string[] args)
{
// 設定逾時時間為 5s WorkerOperation會延時 6s 肯定會逾時
RunOperations(TimeSpan.FromSeconds(5));
// 設定逾時時間為 7s 不會逾時
RunOperations(TimeSpan.FromSeconds(7));
}
static void RunOperations(TimeSpan workerOperationTimeout)
{
using (var evt = new ManualResetEvent(false))
using (var cts = new CancellationTokenSource())
{
WriteLine("注冊逾時操作...");
// 傳入同步事件 逾時處理函數 和 逾時時間
var worker = ThreadPool.RegisterWaitForSingleObject(evt
, (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut)
, null
, workerOperationTimeout
, true);
WriteLine("啟動長時間運作操作...");
ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt));
Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2)));
// 取消注冊等待的操作
worker.Unregister(evt);
ReadLine();
}
}
static void WorkerOperation(CancellationToken token, ManualResetEvent evt)
{
for (int i = 0; i < 6; i++)
{
if (token.IsCancellationRequested)
{
return;
}
Sleep(TimeSpan.FromSeconds(1));
}
evt.Set();
}
static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut)
{
if (isTimedOut)
{
cts.Cancel();
WriteLine("工作操作逾時并被取消.");
}
else
{
WriteLine("工作操作成功.");
}
}
運作結果如下圖所示,與預期結果相符。
計時器是FCL提供的一個類,叫
System.Threading.Timer
,可要結果與建立周期性的異步操作。該類使用比較簡單。
以下的示範代碼使用了定時器,并設定了定時器延時啟動時間和周期時間。
static void Main(string[] args)
{
WriteLine("按下Enter鍵,結束定時器...");
DateTime start = DateTime.Now;
// 建立定時器
_timer = new Timer(_ => TimerOperation(start), null
, TimeSpan.FromSeconds(1)
, TimeSpan.FromSeconds(2));
try
{
Sleep(TimeSpan.FromSeconds(6));
_timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4));
ReadLine();
}
finally
{
//實作了IDispose接口 要及時釋放
_timer.Dispose();
}
}
static Timer _timer;
static void TimerOperation(DateTime start)
{
TimeSpan elapsed = DateTime.Now - start;
WriteLine($"離 {start} 過去了 {elapsed.Seconds} 秒. " +
$"定時器線程池 線程 id: {CurrentThread.ManagedThreadId}");
}
運作結果如下所示,可見定時器根據所設定的周期時間循環的調用
TimerOperation()
本節主要介紹
BackgroundWorker
元件的使用,該元件實際上被用于Windows窗體應用程式(Windows Forms Application,簡稱 WPF)中,通過它實作的代碼可以直接與UI控制器互動,更加自認和好用。
示範代碼如下所示,使用
BackgroundWorker
來實作對資料進行計算,并且讓其支援報告工作進度,支援取消任務。
static void Main(string[] args)
{
var bw = new BackgroundWorker();
// 設定可報告進度更新
bw.WorkerReportsProgress = true;
// 設定支援取消操作
bw.WorkerSupportsCancellation = true;
// 需要做的工作
bw.DoWork += Worker_DoWork;
// 工作處理進度
bw.ProgressChanged += Worker_ProgressChanged;
// 工作完成後處理函數
bw.RunWorkerCompleted += Worker_Completed;
bw.RunWorkerAsync();
WriteLine("按下 `C` 鍵 取消工作");
do
{
if (ReadKey(true).KeyChar == 'C')
{
bw.CancelAsync();
}
}
while (bw.IsBusy);
}
static void Worker_DoWork(object sender, DoWorkEventArgs e)
{
WriteLine($"DoWork 線程池 線程 id: {CurrentThread.ManagedThreadId}");
var bw = (BackgroundWorker)sender;
for (int i = 1; i <= 100; i++)
{
if (bw.CancellationPending)
{
e.Cancel = true;
return;
}
if (i % 10 == 0)
{
bw.ReportProgress(i);
}
Sleep(TimeSpan.FromSeconds(0.1));
}
e.Result = 42;
}
static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
WriteLine($"已完成{e.ProgressPercentage}%. " +
$"處理線程 id: {CurrentThread.ManagedThreadId}");
}
static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e)
{
WriteLine($"完成線程池線程 id: {CurrentThread.ManagedThreadId}");
if (e.Error != null)
{
WriteLine($"異常 {e.Error.Message} 發生.");
}
else if (e.Cancelled)
{
WriteLine($"操作已被取消.");
}
else
{
WriteLine($"答案是 : {e.Result}");
}
}
運作結果如下所示。
在本節中,使用了C#中的另外一個文法,叫事件(event)。當然這裡的事件不同于之前線上程同步章節中提到的事件,這裡是觀察者設計模式的展現,包括事件源、訂閱者和事件處理程式。是以,除了異步APM模式意外,還有基于事件的異步模式(Event-based Asynchronous Pattern,簡稱 EAP)。
本文主要參考了以下幾本書,在此對這些作者表示由衷的感謝你們提供了這麼好的資料。
- 《CLR via C#》
- 《C# in Depth Third Edition》
- 《Essential C# 6.0》
- 《Multithreading with C# Cookbook Second Edition》
- 《C#多線程程式設計實戰》
源碼下載下傳點選連結
示例源碼下載下傳