天天看點

從零開始建構一個Reactor模式的網絡庫(一) 線程同步Mutex和Condition

最近在學習陳碩大神的muduo庫,感覺寫的很專業,以及有一些比較“進階”的技巧和設計方式,自己寫會比較困難。

于是打算自己寫一個簡化版本的Reactor模式網絡庫,就取名叫mini吧,同樣隻基于Linux平台,不使用boost庫,去掉一些比較複雜的部分,隻實作比較基本的功能。

寫作的過程中,參考了https://github.com/chenshuo/muduo(原始版本的實作),以及https://github.com/AlexStocks/muduo(去掉boost庫的依賴,改用C++11)     

就先從用于線程同步的互斥鎖和條件變量的封裝開始吧,基礎部分還會包括一個很簡單的日志類、線程封裝和簡單的線程池。

Linux環境下線程同步的方式有很多,互斥鎖、讀寫鎖、自旋鎖、條件變量、屏障等都可以作為同步的方式,muduo庫使用的是互斥鎖+條件變量的方式,原因也很簡單,就是簡單易用,同時也不失高效性。

為了通用性,使用的都是POSIX的同步原語以及線程實作。

首先是對互斥量的封裝:

1 #ifndef MUTEX_H
 2 #define MUTEX_H
 3 #include <pthread.h>
 4 namespace mini
 5 {
 6 //used as class member
 7 class MutexLock
 8 {
 9 public:
10     MutexLock()
11     {
12         pthread_mutex_init(&mutex_,NULL);
13     }
14     ~MutexLock()
15     {
16         pthread_mutex_destroy(&mutex_);
17     }
18     void lock()
19     {
20         pthread_mutex_lock(&mutex_);
21     }
22     void unlock()
23     {
24         pthread_mutex_unlock(&mutex_);
25     }
26 
27     pthread_mutex_t* getPthreadMutex()
28     {
29         return &mutex_;
30     }
31 
32 private:
33     pthread_mutex_t mutex_;
34 };
35 //used as RAII obj
36 class MutexLockGuard
37 {
38 public:
39     MutexLockGuard(MutexLock& mutex)
40         :mutex_(mutex)
41     {
42         mutex_.lock();
43     }
44     ~MutexLockGuard()
45     {
46         mutex_.unlock();
47     }
48 
49 private:
50     MutexLock& mutex_;
51 };
52 }
53 
54 #endif // MUTEX_H      

MutexLock是對pthread_mutex的簡單封裝,包括初始化、加鎖、解鎖以及銷毀,主要用作類的成員變量(比如Condition類、ThreadPool類等)。

MutexLockGuard是一個RAII類,構造時自動加鎖,析構時自動解鎖,一般用在整個過程都需要加鎖的塊内(比如一個作用于臨界區的函數),可以避免忘記解鎖引起的死鎖。

然後是對于條件變量的封裝:

1 #ifndef CONDITION_H
 2 #define CONDITION_H
 3 #include "Mutex.h"
 4 #include <pthread.h>
 5 namespace mini
 6 {
 7 class Condition{
 8 public:
 9     Condition(MutexLock& mutex)
10         :mutex_(mutex)
11     {
12         pthread_cond_init(&cond_,NULL);
13     }
14 
15     ~Condition()
16     {
17         pthread_cond_destroy(&cond_);
18     }
19 
20     void wait()
21     {
22         pthread_cond_wait(&cond_,mutex_.getPthreadMutex());
23     }
24 
25     void notify()
26     {
27         pthread_cond_signal(&cond_);
28     }
29 
30     void notifyAll()
31     {
32         pthread_cond_broadcast(&cond_);
33     }
34 
35 private:
36     MutexLock& mutex_;//reference, not hold
37     pthread_cond_t cond_;
38 };
39 }
40 
41 #endif // CONDITION_H      

Condition類是對pthread_cond的封裝,因為條件變量本來就要與mutex配合使用,故而持有一個MutexLock的引用,主要操作是lock()、notify()和notifyAll()。

下面通過一個簡單的例子來看看這兩個類的使用:

1 #include <iostream>
 2 #include "Condition.h"
 3 #include <unistd.h>
 4 
 5 using namespace std;
 6 using namespace mini;
 7 
 8 mini::MutexLock mutex;
 9 mini::Condition cond(mutex);
10 
11 int count=0;
12 
13 
14 void* threadFuncAdd(void*)
15 {
16     sleep(1);
17     cout<<"ThreadAdd run!"<<endl;
18     while(count<=1000)
19         count++;
20     cout<<"ThreadAdd finish!"<<endl;
21     cond.notify();
22 }
23 void* threadFuncPrint(void*)
24 {
25     mutex.lock();
26     while(count<=1000)
27     {
28         cout<<"ThreadPrint wait!"<<endl;
29         cond.wait();
30     }
31 
32     cout<<"ThreadPrint wake up!"<<endl;
33 }
34 
35 
36 int main()
37 {
38     //count=0;
39     pthread_t p1,p2;
40     pthread_create(&p1,NULL,threadFuncAdd,NULL);
41     pthread_create(&p2,NULL,threadFuncPrint,NULL);
42 
43     sleep(2);
44 
45     return 0;
46 }      

簡單來說,主線程建立了兩個線程,一個用來對count進行自加,一個用來等待count值到達100并輸出一句話。為了確定print線程會先等待條件,讓add線程睡眠了1s。

從零開始建構一個Reactor模式的網絡庫(一) 線程同步Mutex和Condition

結果與預期一緻,threadprint先進入等待狀态,然後threadadd開始執行,并在count自加到1000後,notify() threadprint,threadprint被喚醒。

繼續閱讀