天天看點

深入淺出多線程系列之十一:生産者/消費者隊列

上次我們使用AutoResetEvent實作了一個生産/消費者隊列。這一次我們要使用Wait和Pulse方法來實作一個更強大的版本,它允許多個消費者,每一個消費者都在自己的線程中運作。

我們使用數組來跟蹤線程。

Thread[] _workers;

通過跟蹤線程可以讓我們在所有的線程都結束後再結束我們的隊列任務。 

每一個消費者線程都執行一個叫做Consume的方法,在一個for循環中,我們可以建立和啟動線程。例如:

<a></a>

       public PCQueue(int workerCount)

        {

            _workers = new Thread[workerCount];

            for (int i = 0; i &lt; workerCount; i++)

                (_workers[i] = new Thread(Consume)).Start();

        }

上次我們使用的是一個字元串來代表任務,這次我們使用Action委托,它的定義如下:

Public delegate void Action();

為了表示一系列的任務,我們使用Queue&lt;T&gt; 集合,例如:

Queue&lt;Action&gt; _itemQ = new Queue&lt;Action&gt;();

在我們調用生産(EnqueueItem)和消費(Consume)方法前,還是完整的看一看代碼吧:

class PCQueue

    {

        readonly object _locker = new object();

        Thread[] _workers;

        Queue&lt;Action&gt; _itemQ = new Queue&lt;Action&gt;(); //儲存任務的隊列

        public PCQueue(int workerCount)

        public void Shutdown(bool waitForWorkers)

           //為每一個線程插入一個null item,可以是每一個worker 退出

            foreach (Thread worker in _workers)

                EnqueueItem(null);

           //等待所有的線程退出。

            if (waitForWorkers)

                foreach (Thread worker in _workers)

                    worker.Join();

        public void EnqueueItem(Action item)

            lock (_locker)

            {

                _itemQ.Enqueue(item);

                Monitor.Pulse(_locker); //通知等待隊列中的線程

            }

        void Consume()

            while (true)

                Action item;

                lock (_locker)

                {

                    while (_itemQ.Count == 0)

                    {

                        Monitor.Wait(_locker); //釋放鎖,并阻止目前線程,直到其他線程發送pulse信号。                    }

                    item = _itemQ.Dequeue();

                }

                if (item == null) return; //退出的信号

                item();

    }

我們可以有一個退出政策,插入一個null item作為consumer退出的信号。如果我們想要快速的退出,可以使用一個獨立的”cancel” 标記,因為我們支援多個consumers,是以我們必須為每一個consumer插入一個null item。

下面是Main方法。使用兩個consumer線程,然後讓這兩個consumers執行10個委托。

        public static void Main()

            PCQueue q = new PCQueue(2);

            Console.WriteLine("Enqueuing 10 items...");

            for (int i = 0; i &lt; 10; i++)

                int itemNumber = i;

                q.EnqueueItem(() =&gt;

                        Thread.Sleep(1000); //模拟耗時的工作

                        Console.WriteLine(" Task " + itemNumber);

                    });

            q.Shutdown(true); //等待關閉

            Console.WriteLine();

            Console.WriteLine("Workers complete!");

下面讓我們細緻的看一看EnqueueItem方法:

public void EnqueueItem(Action item)

因為我們的隊列_itemQ被多線程環境使用,是以在對_itemQ進行讀取的時候需要加鎖lock.

因為我們插入了一個新的任務,我們必須修改阻塞條件,也就是調用pulse方法,來喚醒調用了wait方法的線程。

出于對效率的考慮,當插入一個Item的時候使用Pulse來代替PulseAll方法,因為大部分時候每一個Item隻需要一個consumer來執行。如果你有一個冰淇淋,你不可能叫30個睡眠的孩子都起來吃它,同樣,對于一個item,同時喚醒30個consumers一點好處都沒有。

讓我們再看看Consumer方法。

我們希望當沒什麼事情做的時候,線程阻塞就可以了,換句話說,隊列中沒有item的時候,線程就應該阻塞。是以我們的阻塞條件是_itemQ.Count ==0;

                Action item;

while循環退出的時候也意味着_itemQ 至少有一個item。我們必須在釋放鎖之前調用你哦個dequeue方法來擷取item,考慮下下面的代碼:

                //現在在這裡可能被搶占,_itemQ可能被修改

在item被Dequeued後,我們就應該立即釋放鎖了,如果我們在執行task的時候,一直持有鎖,則會沒有必要的阻塞其他線程來擷取任務。

Wait Timeouts

在調用Wait方法的時候可以傳遞一個毫秒或Timespan的時間來設定逾時。如果Wait逾時了,那麼Wait方法就會傳回false。

帶有逾時功能的Wait方法的主要步驟:

釋放鎖。

阻塞 直到 pulsed 或者逾時。

重新擷取鎖。

逾時就好像CLR 在逾時到了的時候自動的調用了 pulse方法一樣。

下面是使用逾時的Wait的主要代碼:

         lock(_locker)

         while(&lt;阻塞條件&gt;)

                   Monitor.Wait(_locker,&lt;逾時時間&gt;);

Monitor.Wait 方法傳回一個bool值來代表是調用了pulse還是已經逾時了。

如果是true: 代表調用了pulse。

如果是false:代表逾時了。

這對記錄日志很有用。

本文轉自LoveJenny部落格園部落格,原文連結:http://www.cnblogs.com/LoveJenny/archive/2011/06/01/2060857.html,如需轉載請自行聯系原作者

繼續閱讀