天天看點

Boost Thread學習筆記三

下面先對condition_impl進行簡要分析。

condition_impl在其構造函數中會建立兩個Semaphore(信号量):m_gate、m_queue,及一個Mutex(互斥體,跟boost::mutex類似,但boost::mutex是基于CriticalSection<臨界區>的):m_mutex,其中:

m_queue

相當于目前所有等待線程的等待隊列,構造函數中調用CreateSemaphore來建立Semaphore時,lMaximumCount參數被指定為(std::numeric_limits<long>::max)(),即便如此,condition的實作者為了防止出現大量等待線程的情況(以至于超過了long的最大值),線上程因執行condition::wait進入等待狀态時會先:

WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);

以等待被喚醒,但很難想象什麼樣的應用需要處理這麼多線程。

m_mutex

用于内部同步的控制。

但對于m_gate我很奇怪,我仔細研究了一下condition_imp的實作,還是不明白作者引入m_gate這個變量的用意何在,既然已經有了用于同步控制的m_mutex,再引入一個m_gate實在讓我有點不解。

以下是condition::wait調用的do_wait方法簡化後的代碼:

1 template <typename M>

2 void do_wait(M& mutex)

3 {

4     m_impl.enter_wait();

5     lock_ops::unlock(mutex, state);    //對傳入的scoped_lock對象解鎖,以便别的線程可以對其進行加鎖,并執行某些處理,否則,本線程等待的condition永遠不會發生(因為沒有線程可以獲得通路資源的權利以使condition發生)

6     m_impl.do_wait();    //執行等待操作,等待其它線程執行notify_one或notify_all操作以獲得

7     lock_ops::lock(mutex, state);    //重新對scoped_lock對象加鎖,獲得獨占通路資源的權利

8 }

condition::timed_wait的實作方法與此類似,而notify_one、notify_all僅将調用請求轉發給m_impl,就不多講了。

雖然condition的内部實作比較複雜,但使用起來還是比較友善的。下面是一個使用condition的多Producer-多Consumer同步的例子:

  1 #include <boost/thread/thread.hpp>

  2 #include <boost/thread/mutex.hpp>

  3 #include <boost/thread/condition.hpp>

  4 #include <boost/thread/xtime.hpp>

  5 

  6 #include <iostream>

  7 #include <time.h> // for time()

  8 

  9 #include <Windows.h>    // for Sleep, change it for other platform, we can use

 10                         // boost::thread::sleep, but it's too inconvenient.

 11 

 12 typedef boost::mutex::scoped_lock scoped_lock;

 13 boost::mutex io_mutex;

 14 

 15 class Product

 16 {

 17     int num;

 18 public:

 19     Product(int num) : num(num) {}

 20 

 21     friend std::ostream& operator<< (std::ostream& os, Product& product)

 22     {

 23         return os << product.num;

 24     }

 25 };

 26 

 27 class Mediator

 28 {

 29 private:

 30     boost::condition cond;

 31     boost::mutex mutex;

 32 

 33     Product** pSlot;    // product buffer/slot

 34     unsigned int slotCount,    // buffer size

 35         productCount; // current product count

 36     bool stopFlag;    // should all thread stop or not

 37 

 38 public:

 39     Mediator(const int slotCount) : slotCount(slotCount), stopFlag(false), productCount(0)

 40     {

 41         pSlot = new Product*[slotCount];

 42     }

 43 

 44     virtual ~Mediator()

 45     {

 46         for (int i = 0; i < static_cast<int>(productCount); i++)

 47         {

 48             delete pSlot[i];

 49         }

 50         delete [] pSlot;

 51     }

 52 

 53     bool Stop() const { return stopFlag; }

 54     void Stop(bool) { stopFlag = true; }

 55 

 56     void NotifyAll()    // notify all blocked thread to exit

 57     {

 58         cond.notify_all();

 59     }

 60 

 61     bool Put( Product* pProduct)

 62     {

 63         scoped_lock lock(mutex);

 64         if (productCount == slotCount)

 65         {

 66             {

 67                 scoped_lock lock(io_mutex);

 68                 std::cout << "Buffer is full. Waiting

Boost Thread學習筆記三

" << std::endl;

 69             }

 70             while (!stopFlag && (productCount == slotCount))

 71                 cond.wait(lock);

 72         }

 73         if (stopFlag) // it may be notified by main thread to quit.

 74             return false;

 75 

 76         pSlot[ productCount++ ] = pProduct;

 77         cond.notify_one();    // this call may cause *pProduct to be changed if it wakes up a consumer

 78 

 79         return true;

 80     }

 81 

 82     bool Get(Product** ppProduct)

 83     {

 84         scoped_lock lock(mutex);

 85         if (productCount == 0)

 86         {

 87             {

 88                 scoped_lock lock(io_mutex);

 89                 std::cout << "Buffer is empty. Waiting

Boost Thread學習筆記三

 90             }

 91             while (!stopFlag && (productCount == 0))

 92                 cond.wait(lock);

 93         }

 94         if (stopFlag) // it may be notified by main thread to quit.

 95         {

 96             *ppProduct = NULL;

 97             return false;

 98         }

 99 

100         *ppProduct = pSlot[--productCount];

101         cond.notify_one();

102 

103         return true;

104     }

105 };

106 

107 class Producer

108 {

109 private:

110     Mediator* pMediator;

111     static unsigned int num;

112     unsigned int id;    // Producer id

113 

114 public:

115     Producer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }

116 

117     void operator() ()

118     {

119         Product* pProduct;

120         srand( (unsigned)time( NULL ) + id );    // each thread need to srand differently

121         while (!pMediator->Stop())

122         {

123             pProduct = new Product( rand() % 100 );

124             // must print product info before call Put, as Put may wake up a consumer

125             // and cause *pProuct to be changed

126             {

127                 scoped_lock lock(io_mutex);

128                 std::cout << "Producer[" << id << "] produces Product["

129                     << *pProduct << "]" << std::endl;

130             }

131             if (!pMediator->Put(pProduct))    // this function only fails when it is notified by main thread to exit

132                 delete pProduct;

133 

134             Sleep(100);

135         }

136     }

137 };

138 

139 unsigned int Producer::num = 1;

140 

141 class Consumer

142 {

143 private:

144     Mediator* pMediator;

145     static unsigned int num;

146     unsigned int id;    // Consumer id

147 

148 public:

149     Consumer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }

150 

151     void operator() ()

152     {

153         Product* pProduct = NULL;

154         while (!pMediator->Stop())

155         {

156             if (pMediator->Get(&pProduct))

157             {

158                 scoped_lock lock(io_mutex);

159                 std::cout << "Consumer[" << id << "] is consuming Product["

160                     << *pProduct << "]" << std::endl;

161                 delete pProduct;

162             }

163 

164             Sleep(100);

165         }

166     }

167 };

168 

169 unsigned int Consumer::num = 1;

170 

171 int main()

172 {

173     Mediator mediator(2);    // we have only 2 slot to put products

174 

175     // we have 2 producers

176     Producer producer1(&mediator);

177     boost::thread thrd1(producer1);

178     Producer producer2(&mediator);

179     boost::thread thrd2(producer2);

180     // and we have 3 consumers

181     Consumer consumer1(&mediator);

182     boost::thread thrd3(consumer1);

183     Consumer consumer2(&mediator);

184     boost::thread thrd4(consumer2);

185     Consumer consumer3(&mediator);

186     boost::thread thrd5(consumer3);

187 

188     // wait 1 second

189     Sleep(1000);

190     // and then try to stop all threads

191     mediator.Stop(true);

192     mediator.NotifyAll();

193 

194     // wait for all threads to exit

195     thrd1.join();

196     thrd2.join();

197     thrd3.join();

198     thrd4.join();

199     thrd5.join();

200 

201     return 0;

202 }

繼續閱讀