多線程2
- 同步
-
- 作用
- 條件變量及其接口
-
- 初始化
-
- 靜态初始化
- 動态初始化
- 等待接口
- 喚醒接口
- 銷毀接口
- 代碼實作
- 參數為什麼需要互斥鎖
- 在調用該接口時,pthread_cond_wait函數的實作邏輯是什麼
- 如果一個線程在等待的時候被喚醒了,需要做什麼事情、
- 生産者與消費者模型
-
- 123規則
- 優點
- 建立線程安全隊列
同步
作用
讓多個執行流在通路臨界資源的時候是合理通路
條件變量及其接口
本質是:一個PCB等待隊列
條件變量 = PCB等待隊列 + 一堆接口
PCB等待隊列:當線程發現資源不可用的時候,調用變量接口将自己放到PCB等待隊列,等待被喚醒
條件變量的類型:pthread_cond_t
初始化
靜态初始化
動态初始化
cond:待要初始化的“條件變量”的變量
一般情況下,傳遞一個pthread_cond_t類型變量的位址
attr:一般情況下直接給NULL,采用預設屬性
等待接口
cond:條件變量
mutex:互斥鎖
作用:如果一個執行流調用了該接口,就會将執行流對應的PCB放到參數cond的PCB等待隊列當中
喚醒接口
作用:通知(喚醒)PCB等待隊列當中的線程,如果被通知的線程接收到了,則從PCB等待隊列當中出隊操作,正常執行代碼
注意:至少喚醒一個PCB等待隊列當中的線程
作用:同signal一緻
注意:喚醒所有PCB等待隊列當中的線程
銷毀接口
銷毀動态初始化的條件變量
代碼實作
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4
5 int g_bowl = 1;
6
7 pthread_mutex_t g_lock;
8
9 //eat
W> 10 void* MyThreadA(void* arg)
11 {
12 while(1)
13 {
14 pthread_mutex_lock(&g_lock);
W> 15 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
16 g_bowl--;
17 pthread_mutex_unlock(&g_lock);
18 }
19 return NULL;
20 }
21
22 //make
W> 23 void* MyThreadB(void* arg)
24 {
25 while(1)
26 {
27 pthread_mutex_lock(&g_lock);
28 g_bowl++;
W> 29 printf("i make %d, i am %p\n", g_bowl, pthread_self());
30 pthread_mutex_unlock(&g_lock);
31 }
32 return NULL;
33 }
34
35 int main()
36 {
37 pthread_mutex_init(&g_lock, NULL);
38 pthread_t tid_A, tid_B;
39 pthread_create(&tid_A, NULL, MyThreadA, NULL);
40 pthread_create(&tid_B, NULL, MyThreadB, NULL);
41
42 pthread_join(tid_A, NULL);
43 pthread_join(tid_B, NULL);
44
45 pthread_mutex_destroy(&g_lock);
46 return 0;
47 }
這是一個吃面和做面的代碼,一個工作線程A代表吃面的人,一個工作線程B代表做面的人,運作一下

好家夥都吃到負數去了,修改一下
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 1//建立一個宏,後邊修改宏即可
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;//定義變量
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 //加完鎖後需要判斷是否能夠吃
19 //沒有面則等待,有則吃
20 if(g_bowl < 1)//沒有面的情況
21 {
22 //等待
23 pthread_cond_wait(&g_cond, &g_lock);
24 }
25 //有面吃就列印
W> 26 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
27 g_bowl--;
28 pthread_mutex_unlock(&g_lock);
29 //吃完面後通知(喚醒)做面的人
30 pthread_cond_signal(&g_cond);
31 }
32 return NULL;
33 }
34
35 //make
W> 36 void* MyThreadB(void* arg)
37 {
38 while(1)
39 {
40 pthread_mutex_lock(&g_lock);
41 //加完鎖後需要判斷是否繼續做
42 //有面則等待,沒面做面
43 if(g_bowl >= 1)//有面的情況
44 {
45 //等待
46 pthread_cond_wait(&g_cond, &g_lock);
47 }
48 //沒有面進入下一步做面
49 g_bowl++;
W> 50 printf("i make %d, i am %p\n", g_bowl, pthread_self());
51 pthread_mutex_unlock(&g_lock);
52 //通知吃面
53 pthread_cond_signal(&g_cond);
54 }
55 return NULL;
56 }
57
58 int main()
59 {
60 pthread_mutex_init(&g_lock, NULL);
61
62 pthread_cond_init(&g_cond, NULL);//初始化
63
64 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
65 for(int i = 0; i < THREAD_NUM; i++)
66 {
67 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
68 if(ret < 0)
69 {
70 perror("pthread_create fail\n");
71 exit(0);
72 }
73 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
74 if(ret < 0)
75 {
76 perror("pthread_create fail\n");
77 exit(0);
78 }
79 }
80
81 for(int i = 0; i < THREAD_NUM; i++)
82 {
83 pthread_join(tid_A[i], NULL);
84 pthread_join(tid_B[i], NULL);
85 }
86
87 pthread_mutex_destroy(&g_lock);
88
89 pthread_cond_destroy(&g_cond);//釋放
90
91 return 0;
92 }
現在再看結果就正常了,做一份吃一份
參數為什麼需要互斥鎖
傳遞互斥鎖的原因是由于需要在pthread_cond_wait函數内部進行解鎖,解鎖之後,其他的執行流就能獲得這把互斥鎖
否則:如果在調用phread_cond_wait的線程進行等待時不釋放互斥鎖,那其他線程就無法擷取到互斥鎖,程式就沒有辦法向前繼續運作了
在調用該接口時,pthread_cond_wait函數的實作邏輯是什麼
1、放到PCB等待隊列
2、釋放互斥鎖
3、等待被喚醒
如果一個線程在等待的時候被喚醒了,需要做什麼事情、
1、移出PCB等待隊列
2、搶互斥鎖
搶到了:pthread_cond_wait函數傳回了
沒搶到:pthread_cond_wait沒有傳回,在等待搶鎖
更改一下剛才的程式,現在變成兩個人吃面,兩個人做面
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到兩個
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 if(g_bowl < 1)
19 {
20 pthread_cond_wait(&g_cond, &g_lock);
21 }
W> 22 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
23 g_bowl--;
24 pthread_mutex_unlock(&g_lock);
25 pthread_cond_signal(&g_cond);
26 }
27 return NULL;
28 }
29
30 //make
W> 31 void* MyThreadB(void* arg)
32 {
33 while(1)
34 {
35 pthread_mutex_lock(&g_lock);
36 if(g_bowl >= 1)
37 {
38 pthread_cond_wait(&g_cond, &g_lock);
39 }
40 g_bowl++;
W> 41 printf("i make %d, i am %p\n", g_bowl, pthread_self());
42 pthread_mutex_unlock(&g_lock);
43 pthread_cond_signal(&g_cond);
44 }
45 return NULL;
46 }
47
48 int main()
49 {
50 pthread_mutex_init(&g_lock, NULL);
51
52 pthread_cond_init(&g_cond, NULL);
53
54 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
55 for(int i = 0; i < THREAD_NUM; i++)
56 {
57 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
58 if(ret < 0)
59 {
60 perror("pthread_create fail\n");
61 exit(0);
62 }
63 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
64 if(ret < 0)
65 {
66 perror("pthread_create fail\n");
67 exit(0);
68 }
69 }
70
71 for(int i = 0; i < THREAD_NUM; i++)
72 {
73 pthread_join(tid_A[i], NULL);
74 pthread_join(tid_B[i], NULL);
75 }
76
77 pthread_mutex_destroy(&g_lock);
78
79 pthread_cond_destroy(&g_cond);
80
81 return 0;
82 }
修改一下條件
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到兩個
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 if(g_bowl == 0)//修改條件
19 {
20 pthread_cond_wait(&g_cond, &g_lock);
21 }
W> 22 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
23 g_bowl--;
24 pthread_mutex_unlock(&g_lock);
25 pthread_cond_signal(&g_cond);
26 }
27 return NULL;
28 }
29
30 //make
W> 31 void* MyThreadB(void* arg)
32 {
33 while(1)
34 {
35 pthread_mutex_lock(&g_lock);
36 if(g_bowl >= 1)//修改
37 {
38 pthread_cond_wait(&g_cond, &g_lock);
39 }
40 g_bowl++;
W> 41 printf("i make %d, i am %p\n", g_bowl, pthread_self());
42 pthread_mutex_unlock(&g_lock);
43 pthread_cond_signal(&g_cond);
44 }
45 return NULL;
46 }
47
48 int main()
49 {
50 pthread_mutex_init(&g_lock, NULL);
51
52 pthread_cond_init(&g_cond, NULL);
53
54 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
55 for(int i = 0; i < THREAD_NUM; i++)
56 {
57 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
58 if(ret < 0)
59 {
60 perror("pthread_create fail\n");
61 exit(0);
62 }
63 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
64 if(ret < 0)
65 {
66 perror("pthread_create fail\n");
67 exit(0);
68 }
69 }
70
71 for(int i = 0; i < THREAD_NUM; i++)
72 {
73 pthread_join(tid_A[i], NULL);
74 pthread_join(tid_B[i], NULL);
75 }
76
77 pthread_mutex_destroy(&g_lock);
78
79 pthread_cond_destroy(&g_cond);
80
81 return 0;
82 }
好像更大了
①假設有面,首先做面人1号拿到了互斥鎖,檢視是否有面,有面進行解鎖,将自己放到PCB等待隊列當中等待
②此時搶鎖的是兩個吃面的人以及做面人2号,三人都有可能拿到鎖,假設吃面人1号拿到了鎖,吃面人1号判斷碗裡有面,于是吃面,釋放互斥鎖,通知等待隊列
③此時鎖被釋放,吃面2号和做面2号本身就處于搶鎖邏輯,吃面1号釋放完後也開始搶鎖,做面1号被喚醒也開始搶鎖,四人搶鎖
④假設做面2号拿到了鎖,碗裡沒有面,做面,解鎖,通知
⑤做面2号解鎖後又傳回搶鎖狀态,依舊是四人搶鎖,吃面1号和吃面2号拿到鎖是正常邏輯,但做面1号拿到鎖後又開始做面,做面1号通過了if語句,繼續++,
面從1碗變成了2碗
此時需要重複判斷,判斷自己拿到鎖之前是否有同類型的資源拿到了鎖,并判斷是否有“面”的存在,如果有,則跳過
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到兩個
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 while(g_bowl == 0)//将if改成while
19 {
20 pthread_cond_wait(&g_cond, &g_lock);
21 }
W> 22 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
23 g_bowl--;
24 pthread_mutex_unlock(&g_lock);
25 pthread_cond_signal(&g_cond);
26 }
27 return NULL;
28 }
29
30 //make
W> 31 void* MyThreadB(void* arg)
32 {
33 while(1)
34 {
35 pthread_mutex_lock(&g_lock);
36 while(g_bowl >= 1)//将if改成while
37 {
38 pthread_cond_wait(&g_cond, &g_lock);
39 }
40 g_bowl++;
W> 41 printf("i make %d, i am %p\n", g_bowl, pthread_self());
42 pthread_mutex_unlock(&g_lock);
43 pthread_cond_signal(&g_cond);
44 }
45 return NULL;
46 }
47
48 int main()
49 {
50 pthread_mutex_init(&g_lock, NULL);
51
52 pthread_cond_init(&g_cond, NULL);
53
54 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
55 for(int i = 0; i < THREAD_NUM; i++)
56 {
57 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
58 if(ret < 0)
59 {
60 perror("pthread_create fail\n");
61 exit(0);
62 }
63 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
64 if(ret < 0)
65 {
66 perror("pthread_create fail\n");
67 exit(0);
68 }
69 }
70
71 for(int i = 0; i < THREAD_NUM; i++)
72 {
73 pthread_join(tid_A[i], NULL);
74 pthread_join(tid_B[i], NULL);
75 }
76
77 pthread_mutex_destroy(&g_lock);
78
79 pthread_cond_destroy(&g_cond);
80
81 return 0;
82 }
看到輸出結果是正常的,但是整個卡住了
MyThreadA代表的是吃面的人,MyThreadB代表的是做面的人,但他們現在都處于pthread_cond_wait也就是等待隊列當中
是以程式無法執行
①假設碗裡有1份面,此時做面1号拿到了鎖,判斷有面,解鎖,通知吃面,進入等待隊列,三人搶鎖
②假設做面2号拿到鎖,判斷有面,解鎖,進入等待隊列,剩下兩個吃面的搶鎖
③假設吃面2号拿到鎖,判斷有面,吃面,解鎖,通知做面,進入等待隊列,假設喚醒的是做面1号,吃面1号和做面1号搶鎖
④假設吃面1号搶到了鎖,判斷沒面,解鎖,進入等待隊列,此時等待隊列當中有兩個吃面的以及做面2号
⑤假設做面1号拿到了鎖,沒面,做面,解鎖,通知吃面,進入等待隊列,假設通知到了吃面2号
⑥吃面2号出列,拿到了鎖,有面,吃面,通知做面,進入等待隊列,假設通知到了吃面1号
⑦吃面1号出列,拿到鎖,沒有面,進入了等待隊列
⑧此時四個程序全在等待隊列當中,程式無法繼續進行
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到兩個
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 //增加條件變量
11 pthread_cond_t g_cond_eat;
12 pthread_cond_t g_cond_make;
13
14 //eat
W> 15 void* MyThreadA(void* arg)
16 {
17 while(1)
18 {
19 pthread_mutex_lock(&g_lock);
20 while(g_bowl == 0)
21 {
22 pthread_cond_wait(&g_cond_eat, &g_lock);
23 //吃面的人發現沒有面了,将自己放在吃的等待隊列當中
24 }
W> 25 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
26 g_bowl--;
27 pthread_mutex_unlock(&g_lock);
28 //發現沒有面了需要通知做面的人
29 pthread_cond_signal(&g_cond_make);
30 }
31 return NULL;
32 }
33
34 //make
W> 35 void* MyThreadB(void* arg)
36 {
37 while(1)
38 {
39 pthread_mutex_lock(&g_lock);
40 while(g_bowl >= 1)//将if改成while
41 {
42 //做面的人發現有面了将自己放在做面的等待隊列當中
43 pthread_cond_wait(&g_cond_make, &g_lock);
44 }
45 g_bowl++;
W> 46 printf("i make %d, i am %p\n", g_bowl, pthread_self());
47 pthread_mutex_unlock(&g_lock);
48 pthread_cond_signal(&g_cond_eat);
49 //通知吃面的人吃面
50 }
51 return NULL;
52 }
53
54 int main()
55 {
56 pthread_mutex_init(&g_lock, NULL);
57
58 pthread_cond_init(&g_cond_eat, NULL);//初始化吃面
59 pthread_cond_init(&g_cond_make, NULL);//初始化做面
60
61 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
62 for(int i = 0; i < THREAD_NUM; i++)
63 {
64 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
65 if(ret < 0)
66 {
67 perror("pthread_create fail\n");
68 exit(0);
69 }
70 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
71 if(ret < 0)
72 {
73 perror("pthread_create fail\n");
74 exit(0);
75 }
76 }
77
78 for(int i = 0; i < THREAD_NUM; i++)
79 {
80 pthread_join(tid_A[i], NULL);
81 pthread_join(tid_B[i], NULL);
82 }
83
84 pthread_mutex_destroy(&g_lock);
85
86 pthread_cond_destroy(&g_cond_eat);//吃面釋放
87 pthread_cond_destroy(&g_cond_make);//做面釋放
88
89 return 0;
90 }
這下就沒什麼問題了
生産者與消費者模型
123規則
1、一個線程安全隊列
隊列:先進先出
線程安全:目前這個隊列在被其他線程操作的時候,出隊和入隊保證是原子性的
同一時刻隻能有同一個人進行入隊,同一時刻也隻能有一個人出隊,入隊和出隊這兩個操作是互斥的,即同一時刻隻能有一個人來操作隊列
2、兩種角色的線程
消費者線程:從線程安全隊列當中擷取元素,進行處理
生産者線程:生産元素放到線程安全隊列當中進行處理
3、三種關系
消費者與消費者互斥
生産者與生産者互斥
消費者與生産者互斥+同步
優點
1、支援忙閑不均,可以提高程式運作效率
2、隊列提供了一個緩沖區的作用,可以緩沖待要處理的元素
建立線程安全隊列
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4
5 #include <queue> 隊列的頭檔案
6
7 #define THREAD_NUM 1
8
9 using namespace std;
10
11 class RingQueue 用類封裝建立安全隊列
12 {
13 public:
14 RingQueue() 構造
15 {
16 capacity_ = 10; 容量定為10
17
18 pthread_mutex_init(&lock_, NULL); 初始化
19 pthread_cond_init(&cons_cond_, NULL);
20 pthread_cond_init(&prod_cond_, NULL);
21 }
22
23 ~RingQueue() 析構
24 {
25 pthread_mutex_destroy(&lock_); 銷毀
26 pthread_cond_destroy(&cons_cond_);
27 pthread_cond_destroy(&prod_cond_);
28 }
29
30 void Push(int data) 往隊列中放(生産者生産)
31 {
32 pthread_mutex_lock(&lock_); 加鎖
33 while(que_.size() >= capacity_)
34 {
35 如果隊列滿了,此時不應該插入,放到生産者的等待隊列當中去
36 pthread_cond_wait(&prod_cond_, &lock_);
37 }
38 que_.push(data); 未滿則入隊
39 pthread_mutex_unlock(&lock_); 解鎖
40 pthread_cond_signal(&cons_cond_); 通知消費者
41 }
42
43 void Pop(int* data) 出隊(消費者進行消費)
44 {
45 pthread_mutex_lock(&lock_); 加鎖
46 while(que_.empty())
47 {
48 如果目前隊列為空,則放到消費者的等待隊列當中
49 pthread_cond_wait(&cons_cond_, &lock_);
50 }
51 *data = que_.front(); 如果有,先擷取
52 que_.pop(); 再出隊
53 pthread_mutex_unlock(&lock_); 解鎖
54 pthread_cond_signal(&prod_cond_); 通知生産者生産
55 }
56
57 private: 建立成員變量
58 queue<int> que_; 假設元素類型都是整形
59 size_t capacity_; 定義一個無符号變量作為容量
60
61 pthread_mutex_t lock_; 互斥鎖
62 pthread_cond_t cons_cond_; 消費者的成員變量
63 pthread_cond_t prod_cond_; 生産者的成員變量
64 };
65
66 void* ConsumeStart(void* arg)
67 {
68 RingQueue* rq = (RingQueue*)arg;
69 while(1)
70 {
71 int data;
72 rq->Pop(&data);
W> 73 printf("i consume %d, i am %p\n", data, pthread_self());
74 }
75 return NULL;
76 }
77
78 void* ProductStart(void* arg)
79 {
80 RingQueue* rq = (RingQueue*)arg;
81 int data = 1;
82 while(1)
83 {
W> 84 printf("i product %d, i am %p\n", data, pthread_self());
85 rq->Push(data++);
86 }
87 return NULL;
88 }
89
90 int main() 建立線程
91 {
92 RingQueue* rq = new RingQueue();
93 if(rq == NULL)
94 {
95 return 0;
96 }
97
98 分别建立消費者和生産者線程
99 pthread_t cond[THREAD_NUM], prod[THREAD_NUM];
100 for(int i = 0; i < THREAD_NUM; i++)
101 {
102 int ret = pthread_create(&cond[i], NULL, ConsumeStart, (void*)rq);
103 if(ret < 0)
104 {
105 perror("pthread_create fail\n");
106 return 0;
107 }
108 ret = pthread_create(&prod[i], NULL, ProductStart, (void*)rq);
109 if(ret < 0)
110 {
111 perror("pthread_create fail\n");
112 return 0;
113 }
114 }
115
116 for(int i = 0; i < THREAD_NUM; i++) 等待
117 {
118 pthread_join(cond[i], NULL);
119 pthread_join(prod[i], NULL);
120 }
121
122 delete rq; 釋放
123
124 return 0;
125 }