天天看點

淺談.NET下的多線程和并行計算(四)線程同步基礎下

回顧一下上次,我們讨論了lock/AutoResetEvent/ManualResetEvent以及Semaphore。這些用于線程同步的結構叫做同步基元。同步基元從類型上可以分為鎖定/通知/聯鎖三種。lock顯然鎖定方式,而且是獨占鎖定,也就是在鎖釋放之前不能由其它線程獲得。Semaphore也是一種鎖定,隻不過不是獨占鎖,可以指定多少個線程通路代碼塊。AutoResetEvent和ManualResetEvent當然就是通知方式了,前者在通行之後自動重置,後者需要手動重置。我們還看到了即使使用同步機制不一定能確定線程按照我們規劃的去執行,因為從根本上來說,作業系統的線程排程我們是沒有辦法預測的,除非使用阻塞或鎖等待等方式,否則我們很難去預測兩個無關的線程究竟哪個先得到執行(即使設定了優先級),而且在使用這些同步機制的時候我們也要考慮到性能問題,如果多線程程式做的不好的話很可能會比單線程執行效率還低,比如我們開啟了多個線程互相阻塞等待并沒有任何的并行運算,比如在一個多線程環境彙總我們鎖的範圍很大,導緻多線程環境變為了一個單線程環境,有關性能問題以後再讨論,這次我們來看看其它的一些同步基元。

本文的例子基于上文定義的一些基本靜态對象:

static int result = 0;
static object locker = new object();
static EventWaitHandle are = new AutoResetEvent(false);
static EventWaitHandle mre = new ManualResetEvent(false);      

使用lock保護共享資源不被多個線程同時修改是常見的做法,其實lock本質上基于Monitor,而使用Monitor本身可以帶來更豐富的特性,比如可以設定超過某個等待時間段就不繼續等待:

for (int i = 0; i < 10; i++)
{
    new Thread(() =>
    {
        if (Monitor.TryEnter(locker, 2000))
        {
            Thread.Sleep(1000);
            Console.WriteLine(DateTime.Now.ToString("mm:ss"));
            Monitor.Exit(locker);
        }
    }).Start();
}      

在這段代碼中我們開啟10個線程嘗試申請locker獨占鎖,通過輸出結果可以看出,由于我們設定了2秒逾時,程式隻輸出了三次:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

在第一個線程擷取鎖之後,一秒後釋放,第二個線程擷取,一秒後又釋放,第三個線程最後擷取到,之後的線程都超過了2秒等待時間,TryEnter傳回false,線程結束。

除了TryEnter之外,Monitor還有一組有用的方法,Wait和Pulse(PulseAll)。一般情況下,我們的線程占據了獨占鎖之後進行一些線程安全的資源通路,然後退出鎖。使用Wait我們可以讓目前線程阻塞并且暫時釋放鎖,一直到有其它線程通知(Pulse)阻塞的(一個或者多個)線程鎖狀态已改變為止:

for (int i = 0; i < 2; i++)
{
    Thread reader = new Thread(() =>
    {
        Console.WriteLine(string.Format("reader #{0} started", Thread.CurrentThread.ManagedThreadId));
        while (true)
        {
            lock (locker)
            {
                if (data.Count == 0)
                {
                    Console.WriteLine(string.Format("#{0} can not get result, wait", Thread.CurrentThread.ManagedThreadId));
                    Monitor.Wait(locker);
                    Console.WriteLine(string.Format("#{0} get result: {1}", Thread.CurrentThread.ManagedThreadId, data.Dequeue()));
                }
            }
        }
    });
    reader.Start();
}

Thread writer = new Thread(() =>
{
    Console.WriteLine(string.Format("writer #{0} started", Thread.CurrentThread.ManagedThreadId));
    while (true)
    {
        lock (locker)
        {
            int s = DateTime.Now.Second;
            Console.WriteLine(string.Format("#{0} set result: {1}", Thread.CurrentThread.ManagedThreadId, s));
            data.Enqueue(s);
            Console.WriteLine("notify thread");
            Monitor.Pulse(locker);
        }
        Thread.Sleep(1000);
    }
});
writer.Start();      
在這裡,data定義如下:      
static Queue<int> data = new Queue<int>();      

輸出結果如下:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

在這裡,我們模拟了兩個讀取線程和一個寫入線程,寫入線程每一秒寫入目前的秒到隊列,讀取線程不斷從隊列讀取一個值。讀取線程中判斷如果隊列沒值的話就讓出獨占鎖并且阻塞目前線程,然後寫入線程拿到了獨占鎖寫入值,并且發出通知,讓排隊的第一個讀取線程得到恢複,由于使用了Pulse()隻能通知一個線程,是以可以發現兩個讀取線程依次有一次機會從隊列中讀取值。

在本文一開始提到了,同步基元還有一種叫做聯鎖(互鎖)的結構,可以以比較高的性能,對線程共享的變量進行原子操作,它比使用lock來的性能高而且簡潔:

Stopwatch sw = Stopwatch.StartNew();
Thread t1 = new Thread(() =>
{
    for (int j = 0; j < 500; j++)
    {
        Interlocked.Increment(ref result);
        Thread.Sleep(10);
    }
});
Thread t2 = new Thread(() =>
{
    for (int j = 0; j < 500; j++)
    {
        Interlocked.Add(ref result, 9);
        Thread.Sleep(10);
    }
});
t1.Start();
t2.Start();
t1.Join();
t2.Join();
Console.WriteLine(sw.ElapsedMilliseconds);
Console.WriteLine(result);      

運作結果如下:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

第一個線程進行了500次累加操作,第二個線程進行了500次加法操作,使得最後result的值為10*500=5000,總共消耗的時間為5秒多一點。

上次我們介紹了AutoResetEvent和ManualResetEvent,WaitHandle提供了兩個靜态方法WaitAll和WaitAny可以讓我們實作等待多個EventWaitHandle都完成或等待其中任意一個完成:

Stopwatch sw = Stopwatch.StartNew();
ManualResetEvent[] wh = new ManualResetEvent[10];
for (int i = 0; i < 10; i++)
{
    wh[i] = new ManualResetEvent(false);
    new Thread((j) =>
    {
        int d = ((int)j + 1) * 100;
        Thread.Sleep(d);
        Interlocked.Exchange(ref result, d);
        wh[(int)j].Set();
    }).Start(i);
}
WaitHandle.WaitAny(wh);
Console.WriteLine(sw.ElapsedMilliseconds);
Console.WriteLine(result);      

程式輸出如下:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

在這裡我們使用了10個ManualResetEvent關聯到10個線程,這些線程的執行時間分别為100毫秒/200毫秒一直到1000毫秒,由于主線程隻等其中的一個信号量發出,是以在100毫秒後就輸出了結果(但是注意到程式在1秒後完成,因為這些線程預設都是前台線程)。如果我們把WaitAny改為WaitAll的話結果如下:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

上文中我們用ManualResetEvent實作過發出信号讓多人響應,這次我們實作的是多個信号讓單人響應。

在實際應用的多線程環境中,我們通常會有很多的線程來讀取緩存中的值,但是隻會有1/10甚至1/10000的線程去修改緩存,對于這種情況如果我們使用lock不分讀寫都對緩存對象進行鎖定的話,相當于多線程環境在用緩存的時候變為了但線程,做個實驗

(假設我們定義了static List<int> list = new List<int>();):

Stopwatch sw = Stopwatch.StartNew();
ManualResetEvent[] wh = new ManualResetEvent[30];
for (int i = 1; i <= 20; i++)
{
    wh[i - 1] = new ManualResetEvent(false);
    new Thread((j) =>
    {
        lock (locker)
        {
            var sum = list.Count;
            Thread.Sleep(100);
            wh[(int)j].Set();
        }

    }).Start(i - 1);
}
for (int i = 21; i <= 30; i++)
{
    wh[i - 1] = new ManualResetEvent(false);
    new Thread((j) =>
    {
        lock (locker)
        {
            list.Add(1);
            Thread.Sleep(100);
            wh[(int)j].Set();
        }
    }).Start(i - 1);
}
WaitHandle.WaitAll(wh);
Console.WriteLine(sw.ElapsedMilliseconds);
Console.WriteLine(list.Count);      
淺談.NET下的多線程和并行計算(四)線程同步基礎下

我們同時開了30個線程,其中20個讀10個寫,主線程等待它們全部執行完畢後輸出時間,可以發現這30個線程用了3秒,寫線程用了10秒獨占鎖可以了解,但是讀線程并沒有任何并發。.NET提供了現成的讀寫鎖ReaderWriterLockSlim類型使得讀操作可以并發

(假設我們定義了static ReaderWriterLockSlim rw = new ReaderWriterLockSlim();):

Stopwatch sw = Stopwatch.StartNew();
ManualResetEvent[] wh = new ManualResetEvent[30];
for (int i = 1; i <= 20; i++)
{
    wh[i - 1] = new ManualResetEvent(false);
    new Thread((j) =>
    {
        rw.EnterReadLock();
        var sum = list.Count;
        Thread.Sleep(100);
        wh[(int)j].Set();
        rw.ExitReadLock();

    }).Start(i - 1);
}
for (int i = 21; i <= 30; i++)
{
    wh[i - 1] = new ManualResetEvent(false);
    new Thread((j) =>
    {
        rw.EnterWriteLock();
        list.Add(1);
        Thread.Sleep(100);
        wh[(int)j].Set();
        rw.ExitWriteLock();
    }).Start(i - 1);
}
WaitHandle.WaitAll(wh);
Console.WriteLine(sw.ElapsedMilliseconds);
Console.WriteLine(list.Count);      

輸出結果就像我們想的一樣完美:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

讀線程并沒有過多等待基本都并發通路。

最後,我們來介紹一下一種比較友善的實作線程同步方法的辦法:

[MethodImpl(MethodImplOptions.Synchronized)]
static void M()
{
    Thread.Sleep(1000);
    Console.WriteLine(DateTime.Now.ToString("mm:ss"));
}

[MethodImpl(MethodImplOptions.Synchronized)]
void N()
{
    Thread.Sleep(1000);
    Console.WriteLine(DateTime.Now.ToString("mm:ss"));
}      

M方法是靜态方法,N方法是執行個體方法,我們都為它們應用了MethodImpl特性并且訓示它們是同步方法(隻能被一個線程同時通路)。然後我們寫如下代碼驗證:

for (int i = 0; i < 10; i++)
{
    new Thread(() =>
    {
        M();
    }).Start();
}      

程式輸出結果如下:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

可以發現,雖然有10個線程同時通路M方法,但是每次隻能有一個線程執行。

再來測試一下N方法:

Program p = new Program();
for (int i = 0; i < 10; i++)
{
    new Thread(() =>
        {
            p.N();
        }).Start();
}      

程式輸出結果和上次一樣:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

但是我們要注意的是本質上,為M靜态方法标注同步是對類進行加鎖,而為N執行個體方法标注同步是對類的執行個體進行加鎖,也就是說如果我們每次都用新的執行個體來調用N的話不能實作同步:

for (int i = 0; i < 10; i++)
{
    new Thread(() =>
    {
        new Program().N();
    }).Start();
}      

結果如下:

淺談.NET下的多線程和并行計算(四)線程同步基礎下

通過這兩篇文章我們基本介紹了線程同步的一些方法和結構,可以發現我們的手段很多,但是要随心所欲讓各種線程同時執行然後彙總資料然後通知其它線程繼續計算然後再彙總資料等等并不是很簡單。弄的不好讓多線程變成但線程弄的不好也可能讓資料變髒。其實在.NET 4.0中有一些新特性簡化這些行為甚至編寫多線程程式都不需要知道這些同步基元,但是這些基礎了解一下還是有好處的。

作者:

lovecindywang

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。

繼續閱讀