多线程2
- 同步
-
- 作用
- 条件变量及其接口
-
- 初始化
-
- 静态初始化
- 动态初始化
- 等待接口
- 唤醒接口
- 销毁接口
- 代码实现
- 参数为什么需要互斥锁
- 在调用该接口时,pthread_cond_wait函数的实现逻辑是什么
- 如果一个线程在等待的时候被唤醒了,需要做什么事情、
- 生产者与消费者模型
-
- 123规则
- 优点
- 创建线程安全队列
同步
作用
让多个执行流在访问临界资源的时候是合理访问
条件变量及其接口
本质是:一个PCB等待队列
条件变量 = PCB等待队列 + 一堆接口
PCB等待队列:当线程发现资源不可用的时候,调用变量接口将自己放到PCB等待队列,等待被唤醒
条件变量的类型:pthread_cond_t
初始化
静态初始化
动态初始化
cond:待要初始化的“条件变量”的变量
一般情况下,传递一个pthread_cond_t类型变量的地址
attr:一般情况下直接给NULL,采用默认属性
等待接口
cond:条件变量
mutex:互斥锁
作用:如果一个执行流调用了该接口,就会将执行流对应的PCB放到参数cond的PCB等待队列当中
唤醒接口
作用:通知(唤醒)PCB等待队列当中的线程,如果被通知的线程接收到了,则从PCB等待队列当中出队操作,正常执行代码
注意:至少唤醒一个PCB等待队列当中的线程
作用:同signal一致
注意:唤醒所有PCB等待队列当中的线程
销毁接口
销毁动态初始化的条件变量
代码实现
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4
5 int g_bowl = 1;
6
7 pthread_mutex_t g_lock;
8
9 //eat
W> 10 void* MyThreadA(void* arg)
11 {
12 while(1)
13 {
14 pthread_mutex_lock(&g_lock);
W> 15 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
16 g_bowl--;
17 pthread_mutex_unlock(&g_lock);
18 }
19 return NULL;
20 }
21
22 //make
W> 23 void* MyThreadB(void* arg)
24 {
25 while(1)
26 {
27 pthread_mutex_lock(&g_lock);
28 g_bowl++;
W> 29 printf("i make %d, i am %p\n", g_bowl, pthread_self());
30 pthread_mutex_unlock(&g_lock);
31 }
32 return NULL;
33 }
34
35 int main()
36 {
37 pthread_mutex_init(&g_lock, NULL);
38 pthread_t tid_A, tid_B;
39 pthread_create(&tid_A, NULL, MyThreadA, NULL);
40 pthread_create(&tid_B, NULL, MyThreadB, NULL);
41
42 pthread_join(tid_A, NULL);
43 pthread_join(tid_B, NULL);
44
45 pthread_mutex_destroy(&g_lock);
46 return 0;
47 }
这是一个吃面和做面的代码,一个工作线程A代表吃面的人,一个工作线程B代表做面的人,运行一下

好家伙都吃到负数去了,修改一下
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 1//创建一个宏,后边修改宏即可
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;//定义变量
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 //加完锁后需要判断是否能够吃
19 //没有面则等待,有则吃
20 if(g_bowl < 1)//没有面的情况
21 {
22 //等待
23 pthread_cond_wait(&g_cond, &g_lock);
24 }
25 //有面吃就打印
W> 26 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
27 g_bowl--;
28 pthread_mutex_unlock(&g_lock);
29 //吃完面后通知(唤醒)做面的人
30 pthread_cond_signal(&g_cond);
31 }
32 return NULL;
33 }
34
35 //make
W> 36 void* MyThreadB(void* arg)
37 {
38 while(1)
39 {
40 pthread_mutex_lock(&g_lock);
41 //加完锁后需要判断是否继续做
42 //有面则等待,没面做面
43 if(g_bowl >= 1)//有面的情况
44 {
45 //等待
46 pthread_cond_wait(&g_cond, &g_lock);
47 }
48 //没有面进入下一步做面
49 g_bowl++;
W> 50 printf("i make %d, i am %p\n", g_bowl, pthread_self());
51 pthread_mutex_unlock(&g_lock);
52 //通知吃面
53 pthread_cond_signal(&g_cond);
54 }
55 return NULL;
56 }
57
58 int main()
59 {
60 pthread_mutex_init(&g_lock, NULL);
61
62 pthread_cond_init(&g_cond, NULL);//初始化
63
64 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
65 for(int i = 0; i < THREAD_NUM; i++)
66 {
67 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
68 if(ret < 0)
69 {
70 perror("pthread_create fail\n");
71 exit(0);
72 }
73 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
74 if(ret < 0)
75 {
76 perror("pthread_create fail\n");
77 exit(0);
78 }
79 }
80
81 for(int i = 0; i < THREAD_NUM; i++)
82 {
83 pthread_join(tid_A[i], NULL);
84 pthread_join(tid_B[i], NULL);
85 }
86
87 pthread_mutex_destroy(&g_lock);
88
89 pthread_cond_destroy(&g_cond);//释放
90
91 return 0;
92 }
现在再看结果就正常了,做一份吃一份
参数为什么需要互斥锁
传递互斥锁的原因是由于需要在pthread_cond_wait函数内部进行解锁,解锁之后,其他的执行流就能获得这把互斥锁
否则:如果在调用phread_cond_wait的线程进行等待时不释放互斥锁,那其他线程就无法获取到互斥锁,程序就没有办法向前继续运行了
在调用该接口时,pthread_cond_wait函数的实现逻辑是什么
1、放到PCB等待队列
2、释放互斥锁
3、等待被唤醒
如果一个线程在等待的时候被唤醒了,需要做什么事情、
1、移出PCB等待队列
2、抢互斥锁
抢到了:pthread_cond_wait函数返回了
没抢到:pthread_cond_wait没有返回,在等待抢锁
更改一下刚才的程序,现在变成两个人吃面,两个人做面
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到两个
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 if(g_bowl < 1)
19 {
20 pthread_cond_wait(&g_cond, &g_lock);
21 }
W> 22 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
23 g_bowl--;
24 pthread_mutex_unlock(&g_lock);
25 pthread_cond_signal(&g_cond);
26 }
27 return NULL;
28 }
29
30 //make
W> 31 void* MyThreadB(void* arg)
32 {
33 while(1)
34 {
35 pthread_mutex_lock(&g_lock);
36 if(g_bowl >= 1)
37 {
38 pthread_cond_wait(&g_cond, &g_lock);
39 }
40 g_bowl++;
W> 41 printf("i make %d, i am %p\n", g_bowl, pthread_self());
42 pthread_mutex_unlock(&g_lock);
43 pthread_cond_signal(&g_cond);
44 }
45 return NULL;
46 }
47
48 int main()
49 {
50 pthread_mutex_init(&g_lock, NULL);
51
52 pthread_cond_init(&g_cond, NULL);
53
54 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
55 for(int i = 0; i < THREAD_NUM; i++)
56 {
57 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
58 if(ret < 0)
59 {
60 perror("pthread_create fail\n");
61 exit(0);
62 }
63 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
64 if(ret < 0)
65 {
66 perror("pthread_create fail\n");
67 exit(0);
68 }
69 }
70
71 for(int i = 0; i < THREAD_NUM; i++)
72 {
73 pthread_join(tid_A[i], NULL);
74 pthread_join(tid_B[i], NULL);
75 }
76
77 pthread_mutex_destroy(&g_lock);
78
79 pthread_cond_destroy(&g_cond);
80
81 return 0;
82 }
修改一下条件
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到两个
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 if(g_bowl == 0)//修改条件
19 {
20 pthread_cond_wait(&g_cond, &g_lock);
21 }
W> 22 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
23 g_bowl--;
24 pthread_mutex_unlock(&g_lock);
25 pthread_cond_signal(&g_cond);
26 }
27 return NULL;
28 }
29
30 //make
W> 31 void* MyThreadB(void* arg)
32 {
33 while(1)
34 {
35 pthread_mutex_lock(&g_lock);
36 if(g_bowl >= 1)//修改
37 {
38 pthread_cond_wait(&g_cond, &g_lock);
39 }
40 g_bowl++;
W> 41 printf("i make %d, i am %p\n", g_bowl, pthread_self());
42 pthread_mutex_unlock(&g_lock);
43 pthread_cond_signal(&g_cond);
44 }
45 return NULL;
46 }
47
48 int main()
49 {
50 pthread_mutex_init(&g_lock, NULL);
51
52 pthread_cond_init(&g_cond, NULL);
53
54 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
55 for(int i = 0; i < THREAD_NUM; i++)
56 {
57 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
58 if(ret < 0)
59 {
60 perror("pthread_create fail\n");
61 exit(0);
62 }
63 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
64 if(ret < 0)
65 {
66 perror("pthread_create fail\n");
67 exit(0);
68 }
69 }
70
71 for(int i = 0; i < THREAD_NUM; i++)
72 {
73 pthread_join(tid_A[i], NULL);
74 pthread_join(tid_B[i], NULL);
75 }
76
77 pthread_mutex_destroy(&g_lock);
78
79 pthread_cond_destroy(&g_cond);
80
81 return 0;
82 }
好像更大了
①假设有面,首先做面人1号拿到了互斥锁,查看是否有面,有面进行解锁,将自己放到PCB等待队列当中等待
②此时抢锁的是两个吃面的人以及做面人2号,三人都有可能拿到锁,假设吃面人1号拿到了锁,吃面人1号判断碗里有面,于是吃面,释放互斥锁,通知等待队列
③此时锁被释放,吃面2号和做面2号本身就处于抢锁逻辑,吃面1号释放完后也开始抢锁,做面1号被唤醒也开始抢锁,四人抢锁
④假设做面2号拿到了锁,碗里没有面,做面,解锁,通知
⑤做面2号解锁后又返回抢锁状态,依旧是四人抢锁,吃面1号和吃面2号拿到锁是正常逻辑,但做面1号拿到锁后又开始做面,做面1号通过了if语句,继续++,
面从1碗变成了2碗
此时需要重复判断,判断自己拿到锁之前是否有同类型的资源拿到了锁,并判断是否有“面”的存在,如果有,则跳过
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到两个
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 pthread_cond_t g_cond;
11
12 //eat
W> 13 void* MyThreadA(void* arg)
14 {
15 while(1)
16 {
17 pthread_mutex_lock(&g_lock);
18 while(g_bowl == 0)//将if改成while
19 {
20 pthread_cond_wait(&g_cond, &g_lock);
21 }
W> 22 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
23 g_bowl--;
24 pthread_mutex_unlock(&g_lock);
25 pthread_cond_signal(&g_cond);
26 }
27 return NULL;
28 }
29
30 //make
W> 31 void* MyThreadB(void* arg)
32 {
33 while(1)
34 {
35 pthread_mutex_lock(&g_lock);
36 while(g_bowl >= 1)//将if改成while
37 {
38 pthread_cond_wait(&g_cond, &g_lock);
39 }
40 g_bowl++;
W> 41 printf("i make %d, i am %p\n", g_bowl, pthread_self());
42 pthread_mutex_unlock(&g_lock);
43 pthread_cond_signal(&g_cond);
44 }
45 return NULL;
46 }
47
48 int main()
49 {
50 pthread_mutex_init(&g_lock, NULL);
51
52 pthread_cond_init(&g_cond, NULL);
53
54 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
55 for(int i = 0; i < THREAD_NUM; i++)
56 {
57 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
58 if(ret < 0)
59 {
60 perror("pthread_create fail\n");
61 exit(0);
62 }
63 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
64 if(ret < 0)
65 {
66 perror("pthread_create fail\n");
67 exit(0);
68 }
69 }
70
71 for(int i = 0; i < THREAD_NUM; i++)
72 {
73 pthread_join(tid_A[i], NULL);
74 pthread_join(tid_B[i], NULL);
75 }
76
77 pthread_mutex_destroy(&g_lock);
78
79 pthread_cond_destroy(&g_cond);
80
81 return 0;
82 }
看到输出结果是正常的,但是整个卡住了
MyThreadA代表的是吃面的人,MyThreadB代表的是做面的人,但他们现在都处于pthread_cond_wait也就是等待队列当中
所以程序无法执行
①假设碗里有1份面,此时做面1号拿到了锁,判断有面,解锁,通知吃面,进入等待队列,三人抢锁
②假设做面2号拿到锁,判断有面,解锁,进入等待队列,剩下两个吃面的抢锁
③假设吃面2号拿到锁,判断有面,吃面,解锁,通知做面,进入等待队列,假设唤醒的是做面1号,吃面1号和做面1号抢锁
④假设吃面1号抢到了锁,判断没面,解锁,进入等待队列,此时等待队列当中有两个吃面的以及做面2号
⑤假设做面1号拿到了锁,没面,做面,解锁,通知吃面,进入等待队列,假设通知到了吃面2号
⑥吃面2号出列,拿到了锁,有面,吃面,通知做面,进入等待队列,假设通知到了吃面1号
⑦吃面1号出列,拿到锁,没有面,进入了等待队列
⑧此时四个进程全在等待队列当中,程序无法继续进行
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4 #include <stdlib.h>
5
6 #define THREAD_NUM 2//增加吃面的人到两个
7
8 int g_bowl = 1;
9 pthread_mutex_t g_lock;
10 //增加条件变量
11 pthread_cond_t g_cond_eat;
12 pthread_cond_t g_cond_make;
13
14 //eat
W> 15 void* MyThreadA(void* arg)
16 {
17 while(1)
18 {
19 pthread_mutex_lock(&g_lock);
20 while(g_bowl == 0)
21 {
22 pthread_cond_wait(&g_cond_eat, &g_lock);
23 //吃面的人发现没有面了,将自己放在吃的等待队列当中
24 }
W> 25 printf("i eat %d, i am %p\n", g_bowl, pthread_self());
26 g_bowl--;
27 pthread_mutex_unlock(&g_lock);
28 //发现没有面了需要通知做面的人
29 pthread_cond_signal(&g_cond_make);
30 }
31 return NULL;
32 }
33
34 //make
W> 35 void* MyThreadB(void* arg)
36 {
37 while(1)
38 {
39 pthread_mutex_lock(&g_lock);
40 while(g_bowl >= 1)//将if改成while
41 {
42 //做面的人发现有面了将自己放在做面的等待队列当中
43 pthread_cond_wait(&g_cond_make, &g_lock);
44 }
45 g_bowl++;
W> 46 printf("i make %d, i am %p\n", g_bowl, pthread_self());
47 pthread_mutex_unlock(&g_lock);
48 pthread_cond_signal(&g_cond_eat);
49 //通知吃面的人吃面
50 }
51 return NULL;
52 }
53
54 int main()
55 {
56 pthread_mutex_init(&g_lock, NULL);
57
58 pthread_cond_init(&g_cond_eat, NULL);//初始化吃面
59 pthread_cond_init(&g_cond_make, NULL);//初始化做面
60
61 pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
62 for(int i = 0; i < THREAD_NUM; i++)
63 {
64 int ret = pthread_create(&tid_A[i], NULL, MyThreadA, NULL);
65 if(ret < 0)
66 {
67 perror("pthread_create fail\n");
68 exit(0);
69 }
70 ret = pthread_create(&tid_B[i], NULL, MyThreadB, NULL);
71 if(ret < 0)
72 {
73 perror("pthread_create fail\n");
74 exit(0);
75 }
76 }
77
78 for(int i = 0; i < THREAD_NUM; i++)
79 {
80 pthread_join(tid_A[i], NULL);
81 pthread_join(tid_B[i], NULL);
82 }
83
84 pthread_mutex_destroy(&g_lock);
85
86 pthread_cond_destroy(&g_cond_eat);//吃面释放
87 pthread_cond_destroy(&g_cond_make);//做面释放
88
89 return 0;
90 }
这下就没什么问题了
生产者与消费者模型
123规则
1、一个线程安全队列
队列:先进先出
线程安全:当前这个队列在被其他线程操作的时候,出队和入队保证是原子性的
同一时刻只能有同一个人进行入队,同一时刻也只能有一个人出队,入队和出队这两个操作是互斥的,即同一时刻只能有一个人来操作队列
2、两种角色的线程
消费者线程:从线程安全队列当中获取元素,进行处理
生产者线程:生产元素放到线程安全队列当中进行处理
3、三种关系
消费者与消费者互斥
生产者与生产者互斥
消费者与生产者互斥+同步
优点
1、支持忙闲不均,可以提高程序运行效率
2、队列提供了一个缓冲区的作用,可以缓冲待要处理的元素
创建线程安全队列
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <pthread.h>
4
5 #include <queue> 队列的头文件
6
7 #define THREAD_NUM 1
8
9 using namespace std;
10
11 class RingQueue 用类封装创建安全队列
12 {
13 public:
14 RingQueue() 构造
15 {
16 capacity_ = 10; 容量定为10
17
18 pthread_mutex_init(&lock_, NULL); 初始化
19 pthread_cond_init(&cons_cond_, NULL);
20 pthread_cond_init(&prod_cond_, NULL);
21 }
22
23 ~RingQueue() 析构
24 {
25 pthread_mutex_destroy(&lock_); 销毁
26 pthread_cond_destroy(&cons_cond_);
27 pthread_cond_destroy(&prod_cond_);
28 }
29
30 void Push(int data) 往队列中放(生产者生产)
31 {
32 pthread_mutex_lock(&lock_); 加锁
33 while(que_.size() >= capacity_)
34 {
35 如果队列满了,此时不应该插入,放到生产者的等待队列当中去
36 pthread_cond_wait(&prod_cond_, &lock_);
37 }
38 que_.push(data); 未满则入队
39 pthread_mutex_unlock(&lock_); 解锁
40 pthread_cond_signal(&cons_cond_); 通知消费者
41 }
42
43 void Pop(int* data) 出队(消费者进行消费)
44 {
45 pthread_mutex_lock(&lock_); 加锁
46 while(que_.empty())
47 {
48 如果当前队列为空,则放到消费者的等待队列当中
49 pthread_cond_wait(&cons_cond_, &lock_);
50 }
51 *data = que_.front(); 如果有,先获取
52 que_.pop(); 再出队
53 pthread_mutex_unlock(&lock_); 解锁
54 pthread_cond_signal(&prod_cond_); 通知生产者生产
55 }
56
57 private: 创建成员变量
58 queue<int> que_; 假设元素类型都是整形
59 size_t capacity_; 定义一个无符号变量作为容量
60
61 pthread_mutex_t lock_; 互斥锁
62 pthread_cond_t cons_cond_; 消费者的成员变量
63 pthread_cond_t prod_cond_; 生产者的成员变量
64 };
65
66 void* ConsumeStart(void* arg)
67 {
68 RingQueue* rq = (RingQueue*)arg;
69 while(1)
70 {
71 int data;
72 rq->Pop(&data);
W> 73 printf("i consume %d, i am %p\n", data, pthread_self());
74 }
75 return NULL;
76 }
77
78 void* ProductStart(void* arg)
79 {
80 RingQueue* rq = (RingQueue*)arg;
81 int data = 1;
82 while(1)
83 {
W> 84 printf("i product %d, i am %p\n", data, pthread_self());
85 rq->Push(data++);
86 }
87 return NULL;
88 }
89
90 int main() 创建线程
91 {
92 RingQueue* rq = new RingQueue();
93 if(rq == NULL)
94 {
95 return 0;
96 }
97
98 分别创建消费者和生产者线程
99 pthread_t cond[THREAD_NUM], prod[THREAD_NUM];
100 for(int i = 0; i < THREAD_NUM; i++)
101 {
102 int ret = pthread_create(&cond[i], NULL, ConsumeStart, (void*)rq);
103 if(ret < 0)
104 {
105 perror("pthread_create fail\n");
106 return 0;
107 }
108 ret = pthread_create(&prod[i], NULL, ProductStart, (void*)rq);
109 if(ret < 0)
110 {
111 perror("pthread_create fail\n");
112 return 0;
113 }
114 }
115
116 for(int i = 0; i < THREAD_NUM; i++) 等待
117 {
118 pthread_join(cond[i], NULL);
119 pthread_join(prod[i], NULL);
120 }
121
122 delete rq; 释放
123
124 return 0;
125 }