第八章 多線程
線程概念
多程序任務處理是同時通過多個程序進行任務,多個pcb擁有多個虛拟位址空間,分别執行不同的代碼,之間互不關聯。而多線程是通過多個pcb共用一個虛拟位址空間,分别執行虛拟位址空間上所對應的多個不同的實體記憶體中的代碼。即一個虛拟位址空間對應多個實體記憶體。
之前我們說linux下pcb是一個程序,但其實linux下線程以程序pcb模拟實作線程,是以linux下pcb是線程;是以linux線程也叫輕量級程序。一個程序可能擁有多個線程,而每個程序勢必有一個主線程,我們在主線程中建立其他線程。那麼一個程序可以了解為一堆線程的集合,我們稱其為線程組,而程序的pid為了不沖突則規定是主線程的pid。
因為linux線程是pcb——是以線程是cpu的基本機關。因為程序是線程組,程式運作起來,資源是配置設定給整個線程組的,是以程序是資源配置設定的基本機關。
程序與線程的對比
一個程序中的線程共用同一個虛拟位址空間,是以線程間通信更加友善;線程的建立/銷毀成本更低;線程間切換排程成本更低;線程的執行粒度更細。
線程之間缺乏通路控制——系統調用,異常針對的是整個程序,健壯性低。
vfork
建立一個子程序共用同一個虛拟位址空間,怕出現調用棧混亂,是以子程序運作完畢或程式替換後父程序才開始運作。而線程也共用同一個虛拟位址空間卻不會發生調用棧混亂的情況,因為每個線程都會有一些獨立的資訊,會為每個線程在虛拟位址空間中單獨配置設定一塊記憶體用來存儲這些獨立的資訊:棧,寄存器,errno,信号屏蔽字,排程優先級。同時線程間也有共享的資料:代碼段,資料段,檔案描述符表,信号處理方式,使用者群組,目前工作目錄。
多線程相比多程序的優點:
1、通信更加友善,靈活。
2、建立/銷毀成本更低。
3、切換排程成本更低。
多線程相比多程序的缺點:
1、缺乏通路控制并且一些系統調用以及錯誤針對整個程序,健壯性/穩定性更低。
多程序/多線程進行多任務處理的優勢
cpu密集型程式
對于讀寫操作比較少,更多的則是計算方面的操作,這類程式盡量少用多線程/程序,因為cpu排程線程/程序會浪費cpu資源。
io密集型程式
對于讀寫操作較多,cpu計算操作較少的程式則應該多使用多程序/線程進行io操作,由此來并行執行程式,減少執行時間。
線程控制
線程建立
作業系統并沒有為使用者提供直接建立線程的系統調用接口,但是有人自己封裝了一套線程庫實作線程控制。
pthread_create
由于
pthread_create
所在的庫
pthread
并不在gcc預設的連結庫中,是以我們在編譯時要加參數
-pthread
或者
-lpthread
讓其連接配接到這個庫中。
/**
* 線程建立
**/
/**
* int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
* void *(*start_routine) (void *), void *arg);
* thread:輸出型參數,擷取新建立的線程id
* attr: 設定線程屬性,通常置空
* start_routine: 線程入口函數
* arg:通過線程入口函數傳遞給線程的參數
**/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
void* thr_start(void* arg)
{
while(1)
{
//pthread_self檢視此線程的tid
printf("i am child---%d\n",pthread_self());
sleep(1);
}
return NULL;
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, thr_start, (void*)"Misaki");
printf("%d\n",tid);
if(ret != 0)//0為成功
{
printf("thread vreate errno!\n");
return -1;
}
while(1)
{
//thread_self檢視自己的線程id
printf("Misaki!%d\n",getpid());
sleep(1);
}
}
[[email protected] 第八章-多線程]$ ./create
-1544186112
Misaki!5429
i am child----1544186112
i am child----1544186112
Misaki!5429
i am child----1544186112
Misaki!5429
i am child----1544186112
Misaki!5429
i am child----1544186112
Misaki!5429
i am child----1544186112
這個建立線程的函數中的傳回值
tid
為線程在虛拟位址空間上所配置設定的屬于自己的獨立空間的首位址,我們以後要靠這個參數來控制線程。一個
tid
唯一的表示一個線程。
線程終止
線上程入口函數中return
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
void* thr_start(void* arg)
{
while(1)
{
printf("i am child\n");
reutrn NULL;
}
return NULL;
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, thr_start, (void*)"Misaki");
printf("%d\n",tid);
if(ret != 0)
{
printf("thread vreate errno!\n");
return -1;
}
while(1)
{
//thread_self檢視自己的線程id
printf("Misaki!%d\n",getpid());
sleep(1);
return 0;
}
}
[[email protected] 第八章-多線程]$ ./exit
2052687616
Misaki!5710
線上程入口函數中
return
會讓線程退出。當在主函數中使用
return
退出主函數的時候這時會導緻程序終止,由此程序中的所有線程都會終止。
pthread_exit()
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void* thr_start(void* arg)
{
while(1)
{
printf("i am child---%s\n", arg);
sleep(1);
//退出調用這個函數的線程
pthread_exit(0);
}
return NULL;
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, thr_start, (void*)"Misaki");
while(1)
{
printf("i am main!\n");
sleep(1);
}
}
[[email protected] 第八章-多線程]$ ./exit
i am main!
i am child---Misaki
i am main!
i am main!
i am main!
可以看出我們自己建立的線程在執行
pthread_exit()
後退出了。如果我們的主線程調用這個函數會怎樣呢?
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void* thr_start(void* arg)
{
while(1)
{
printf("i am child---%s\n", arg);
sleep(1);
}
return NULL;
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, thr_start, (void*)"Misaki");
if(ret != 0)
{
printf("thread create error\n");
return -1;
}
while(1)
{
printf("i am main!\n");
sleep(1);
//退出調用這個函數的線程
pthread_exit(0);
}
}
[[email protected] 第八章-多線程]$ ./exit
i am main!
i am child---Misaki
i am child---Misaki
i am child---Misaki
i am child---Misaki
i am child---Misaki
可以看出我們雖然在主線程中調用了退出函數,主線程也确實退出了,但是程序卻并沒有退出,這說明,主線程終止并不會讓程序終止。但是我們要注意線程退出也會成為僵屍線程,但是普通線程退出并不會有過于明顯大的影響。
pthread_cancel
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void* thr_start(void* arg)
{
while(1)
{
printf("i am child---%s\n", arg);
sleep(1);
}
return NULL;
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, thr_start, (void*)"Misaki");
if(ret != 0)
{
printf("thread create error\n");
return -1;
}
while(1)
{
printf("i am main!\n");
sleep(1);
//退出id = tid的線程
pthread_cancel(tid);
}
}
[[email protected] 第八章-多線程]$ ./exit
i am main!
i am child---Misaki
i am child---Misaki
i am main!
i am main!
i am main!
線程等待
線程等待是為了擷取指定線程的傳回值,和程序等待一樣為了讓系統可以釋放資源,因為一個線程運作起來,預設有一個屬性:
joinable
。這個屬性決定了線程退出後,必須被等待,否則線程資源無法完全釋放,成為僵屍線程,是以我們必須進行線程等待,擷取線程傳回值,允許系統釋放資源。當然線程等待也有一個前提,線程能夠被等待,即
joinable
屬性。
pthread_join()
/**
* int pthread_join(pthread_t thread, void **retval);
* 線程等待,擷取線程退出傳回值。
* thread:要等待的線程id
* retval:輸出型參數,用于擷取退出線程的傳回值
**/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
void* thr_start(void* arg)
{
sleep(3);
return (void*)"Misaki";
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, thr_start, NULL);
if(ret != 0)
{
printf("thread create error\n");
return -1;
}
char* ptr;
pthread_join(tid, (void**)&ptr);
printf("%s\n", ptr);
}
[[email protected] 第八章-多線程]$ ./join
Misaki
如果一個線程是被取消,則傳回值是一個宏:
PTHREAD_CANCELED
,它的值是-1。線程等待
pthread_join
是阻塞函數,一個一個線程沒有推出則會一直等待。
線程分離
将線程的一個屬性從
joinable
設定為
detach
屬性。屬于
detach
屬性的線程,退出後資源直接自動被回收,這類線程不能被等待。
pthread_detach()
如果使用者對一個線程的傳回值不關心,則可以線上程入口函數對線程進行分離。
/**
* int pthread_detach(pthread_t thread);
* 線程分離。
* thread:要分離的線程id
**/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
void* thr_start(void* arg)
{
//分離自己這個線程
//線程的分離對于一個線程來說,任意線程在任意位置調用都可以
// pthread_detach(pthread_self());
return (void*)"Misaki";
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, thr_start, NULL);
if(ret != 0)
{
printf("thread create error\n");
return -1;
}
//分離這個線程
pthread_detach(tid);
char* ptr;
ret = pthread_join(tid, (void**)&ptr);
//如果一個線程無法被等待則傳回值為一個宏EINVAL
if(ret == EINVAL)
{
printf("this thread can not be wait!!\n");
return -1;
}
printf("%s\t%d\n", ptr, ret);
}
[[email protected] 第八章-多線程]$ ./join
this thread can not be wait!!
會發現我們已經分離了我們自己建立的線程,這個線程已經無法被等待了,并且我們無法接收到線程的傳回值。
線程安全
多個線程同時操作臨界資源而不會出現資料二義性就說這個線程是安全的。如果線上程中進行了非原子性操作就可能會導緻線程不安全,這些非原子性操作也叫做不可重入函數,即多個執行流中同時進入函數運作會出現問題的函數。
如何實作線程安全?這就要靠同步與互斥。同步指臨界資源的合理通路,互斥指臨界資源同一時間唯一通路。
互斥
同步和互斥要如何實作呢?我們先從互斥開始讨論。為了保證操作的原子性,在C語言中互斥鎖可以幫助我們保證互斥,使我們的函數變為可重入函數。
互斥鎖
互斥鎖的值隻能為0或1。1表示可以加鎖,加鎖後值-1,操作結束後就會解鎖,解鎖就會将值+1。如果一個操作已經加鎖則值為0,是以當鎖值為0時其他線程則不能加鎖,不能加鎖線程就會陷入等待。
互斥鎖操作步驟:
1、定義互斥鎖變量:
pthread_mutex_t
。
2、初始化互斥鎖變量:
pthread_mutex_init
。
3、加鎖:
pthread_mutex_lock
。
4、解鎖:
pthread_mutex_unlock
。
5、删除鎖:
pthread_mutex_destroy
。
接下來我用互斥鎖将一個不可重入的函數使它可重入進而使多個線程同時運作函數時變得安全。
/*實作互斥鎖的基本使用以及線程安全的基本認識*/
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
int ticket = 100;
//互斥鎖變量不一定非要全局變量,使用的線程都能通路到就行
//互斥鎖變量
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100;
pthread_mutex_t mutex;
void* ticket_scalper(void* arg)
{
int id = (int)arg;
while(1)
{
//加鎖要在臨界資源通路之前
//int pthread_mutex_lock(pthread_mutex_t* mutex);阻塞加鎖
//int pthread_mutex_trylock(pthread_mutex_t* mutex);非阻塞加鎖,加不上鎖就傳回
pthread_mutex_lock(&mutex);
if(ticket > 0)
{
printf("scalper:%d--get a ticket:%d\n", id, ticket);
ticket--;
usleep(1000);
}
else
{
//解鎖
pthread_mutex_unlock(&mutex);
pthread_exit(0);
}
//解鎖
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main()
{
int i = 0;
int ret;
pthread_t tid[4];
//初始化互斥鎖
//int pthread_mutex_init(pthread_mutex_t *restrict mutex,
// const pthread_mutexattr_t *restrict attr);
//
pthread_mutex_init(&mutex, NULL);
//建立線程
for(i = 0; i < 4; i++)
{
ret = pthread_create(&tid[i], NULL, ticket_scalper, (void*)i);
if(ret != 0)
{
perror("thread creat error:");
return -1;
}
}
for(i = 0; i < 4; i++)
{
pthread_join(tid[i], NULL);
}
//銷毀互斥鎖
//int pthread_mutex_destroy(pthread_mutex_t *mutex);
pthread_mutex_destroy(&mutex);
}
[[email protected] thread_2019_9_2_class45]$ ./main
scalper:2--get a ticket:100
scalper:2--get a ticket:99
scalper:2--get a ticket:98
scalper:2--get a ticket:97
scalper:2--get a ticket:96
scalper:2--get a ticket:95
scalper:3--get a ticket:94
scalper:3--get a ticket:93
scalper:3--get a ticket:92
scalper:3--get a ticket:91
scalper:3--get a ticket:90
scalper:3--get a ticket:89
scalper:3--get a ticket:88
scalper:3--get a ticket:87
scalper:3--get a ticket:86
scalper:3--get a ticket:85
scalper:3--get a ticket:84
scalper:3--get a ticket:83
scalper:3--get a ticket:82
scalper:3--get a ticket:81
scalper:3--get a ticket:80
scalper:3--get a ticket:79
scalper:3--get a ticket:78
scalper:3--get a ticket:77
scalper:3--get a ticket:76
scalper:3--get a ticket:75
scalper:3--get a ticket:74
scalper:3--get a ticket:73
scalper:3--get a ticket:72
scalper:3--get a ticket:71
scalper:3--get a ticket:70
scalper:3--get a ticket:69
scalper:3--get a ticket:68
scalper:3--get a ticket:67
scalper:3--get a ticket:66
scalper:3--get a ticket:65
scalper:3--get a ticket:64
scalper:3--get a ticket:63
scalper:3--get a ticket:62
scalper:3--get a ticket:61
scalper:3--get a ticket:60
scalper:3--get a ticket:59
scalper:3--get a ticket:58
scalper:3--get a ticket:57
scalper:3--get a ticket:56
scalper:3--get a ticket:55
scalper:3--get a ticket:54
scalper:3--get a ticket:53
scalper:3--get a ticket:52
scalper:3--get a ticket:51
scalper:3--get a ticket:50
scalper:3--get a ticket:49
scalper:3--get a ticket:48
scalper:3--get a ticket:47
scalper:3--get a ticket:46
scalper:3--get a ticket:45
scalper:3--get a ticket:44
scalper:3--get a ticket:43
scalper:3--get a ticket:42
scalper:3--get a ticket:41
scalper:3--get a ticket:40
scalper:3--get a ticket:39
scalper:3--get a ticket:38
scalper:3--get a ticket:37
scalper:3--get a ticket:36
scalper:3--get a ticket:35
scalper:3--get a ticket:34
scalper:3--get a ticket:33
scalper:3--get a ticket:32
scalper:3--get a ticket:31
scalper:3--get a ticket:30
scalper:3--get a ticket:29
scalper:3--get a ticket:28
scalper:3--get a ticket:27
scalper:3--get a ticket:26
scalper:3--get a ticket:25
scalper:3--get a ticket:24
scalper:3--get a ticket:23
scalper:3--get a ticket:22
scalper:3--get a ticket:21
scalper:3--get a ticket:20
scalper:3--get a ticket:19
scalper:3--get a ticket:18
scalper:3--get a ticket:17
scalper:3--get a ticket:16
scalper:3--get a ticket:15
scalper:3--get a ticket:14
scalper:3--get a ticket:13
scalper:3--get a ticket:12
scalper:3--get a ticket:11
scalper:3--get a ticket:10
scalper:3--get a ticket:9
scalper:3--get a ticket:8
scalper:3--get a ticket:7
scalper:3--get a ticket:6
scalper:3--get a ticket:5
scalper:3--get a ticket:4
scalper:3--get a ticket:3
scalper:3--get a ticket:2
scalper:3--get a ticket:1
這樣就達成了互斥,在一個線程操作臨界資源時,其他線程不會同時幹涉。
死鎖
死鎖是指因為對一些無法加鎖的鎖進行加鎖操作而導緻程式卡死。死鎖是我們一定要在使用鎖時要注意和避免的
死鎖産生的四個必要條件:
1、互斥條件。一個線程操作時其他線程不能操作。
2、不可剝奪條件。一個線程加的鎖别的線程不能釋放。
3、請求與保持條件。一個線程已經有了鎖卻還在請求其他的鎖,但是其他的鎖請求不到第一個鎖也不釋放。
4、環路等待條件。
死鎖産生往往是因為加鎖解鎖的順序不同。要想避免死鎖就要避免死鎖産生的四個必要條件——死鎖檢測算法,銀行家算法。
同步
通過對目前是否滿足對臨界資源的操作條件來判斷線程是否該等待或喚醒這種方式實作對臨界資源通路的合理性。資源産生後才能進行使用,沒有資源則等待資源産生,生産資源後則喚醒等待,這樣則達成同步。然而互斥鎖雖然可以幫助我們完成等待但是無法判斷何時将我們喚醒,不能在合适的事件喚醒,是以便要借助新的東西——條件變量。
條件變量
條件變量的使用流程:
1、定義條件變量:
pthread_cond_t
。
2、初始化條件變量:
pthread_cond_init
。
3、等待或者喚醒:
pthread_cond_wait/pthread_cond_signal
。
4、銷毀條件變量:
pthread_cond_destroy
。
pthread_cond_wait
中一共有三個操作,首先它要讓讓目前線程等待,但是此時有一點,此時的互斥量還處于加鎖狀态其他線程無法操作臨界資源,呢又怎麼做到讓臨界資源達到要求呢?是以他在讓線程等待前要先解除了互斥量的加鎖狀态,并且這兩部操作為原子操作。為什麼要是原子操作?因為如果不是原子操作有可能在解鎖後已經條件滿足而此時線程還未進行等待可能會忽略喚醒。之後線上程被喚醒後
pthread_cond_wait
還會再加鎖保證互斥。這就是三部操作:
解鎖->等待->喚醒後加鎖
。
在每一個條件變量内部都有一個等待隊列,将所有等待的線程排列在上面,如果有其他線程喚醒則逐一喚醒。
接下來我們用互斥鎖加條件變量模拟實作一個顧客去餐館吃飯的情景,但是在這個情境中為了符合設計要注意兩個顧客不能同時吃一碗飯,并且隻有一個鍋是以兩個廚師不能同時做飯。如果沒飯了2個廚師中其中一個做飯,又犯了2個顧客其中一個吃飯。
/*實作條件變量的基本使用*/
/*吃面前提有人吃面,如果沒有線程的面,等待老闆做出來
* 老闆做出來面就要喚醒顧客
* 老闆不會做太多的面,老闆隻會提前做一碗面
* 如果已經有面做出來,但是沒人吃,不會再做(等待)
* 顧客吃完面後,老闆再來一碗(喚醒老闆的等待)*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
//是否右面
int have_noodle = 1;
//為了讓客人與廚師間的不同的同步性,需要定義多個條件變量
pthread_cond_t customer;
pthread_cond_t boss;
pthread_mutex_t mutex;
//老闆做面
void* thr_boss(void* arg)
{
while(1)
{
pthread_mutex_lock(&mutex);
//由于多個顧客,為了避免兩個顧客吃一碗面的情況這裡要循環判斷
while(have_noodle == 1)//有面
{
//等待
//int pthread_cond_timedwait(pthread_cond_t *restrict cond,
// pthread_mutex_t *restrict mutex,
// const struct timespec *restrict abstime);
//限時等待
//cond:條件變量
//mutex:互斥鎖
//abstime:限時等待時長
//時間到後傳回時間超市,停止阻塞
//int pthread_cond_wait(pthread_cond_t *restrict cond,
// pthread_mutex_t *restrict mutex);
//cond:條件變量
//mutex:互斥鎖
//pthread_cond_wait 集合了解鎖後挂起的操作(原子操作,不可被打斷)
//有可能還沒來得及挂起就已經有人喚醒,白喚醒,導緻死等
//是以這裡的wait将三個操作進行了原子性封裝不讓其中斷
//解鎖 -》 等待 -》 被喚醒後加鎖
pthread_cond_wait(&boss, &mutex);
}
//面沒了,要再做
printf("拉面 + 1\n");
have_noodle = 1;
//面好了,喚醒顧客
pthread_cond_signal(&customer);
//解鎖
pthread_mutex_unlock(&mutex);
}
return NULL;
}
//顧客吃面
void* thr_customer(void* arg)
{
while(1)
{
while(have_noodle == 0)
{
//若沒有現成的面等老闆做好
//等待
pthread_cond_wait(&customer, &mutex);
}
//有面了
printf("真好吃!\n");
have_noodle -= 1;
//喚醒廚師再做一碗
pthread_cond_signal(&boss);
//解鎖
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main()
{
pthread_t tid1, tid2;
int ret;
//條件變量初始化
//int pthread_cond_init(pthread_cond_t *restrict cond,
// const pthread_condattr_t *restrict attr);
pthread_cond_init(&boss, NULL);
pthread_cond_init(&customer, NULL);
pthread_mutex_init(&mutex, NULL);
//各建立兩個線程同時工作,相當于兩個廚師兩個客人
//客人間具有互斥性,廚師間也有互斥性,客人與廚師間有同步與互斥性
for(int i = 0; i < 2; i++)
{
ret = pthread_create(&tid1, NULL, thr_boss, NULL);
if(ret != 0)
{
printf("boss error");
return -1;
}
}
for(int i = 0; i < 2; i++)
{
ret = pthread_create(&tid2, NULL, thr_customer, NULL);
if(ret != 0)
{
printf("customer error");
return -1;
}
}
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
//銷毀條件變量
//int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_destroy(&customer);
pthread_cond_destroy(&boss);
//銷毀鎖
pthread_mutex_destroy(&mutex);
}
真好吃!
拉面 + 1
真好吃!
拉面 + 1
真好吃!
拉面 + 1
真好吃!
拉面 + 1
真好吃!
拉面 + 1
真好吃!
拉面 + 1
真好吃!
拉面 + 1
^C真好吃!
在以上這個例子中要注意幾個點:
1、使用者對條件判斷需要使用循環進行判斷(防止角色不符合條件被喚醒之後因為不循環判斷直接操作臨界資源)。這個問題也被稱為虛假喚醒問題。在多核處理器下,
pthread_cond_signal
可能會激活多于一個線程(阻塞在條件變量上的線程)。結果就是,當一個線程調用
pthread_cond_signal()
後,多個調用
pthread_cond_wait()
或其他等待在隊列上的線程傳回。這種效應就會造成虛假喚醒。
2、不同角色的線程因該等待在不同的條件變量上。(防止角色的誤喚醒,導緻程式阻塞)
但是要注意條件變量并不保證安全,是以往往使用條件變量的時候會與互斥鎖共同使用。面生産一碗顧客吃一碗沒有出現異常,是以實作是成功的。這種在多線程情況下有人生産資料有人消費資料利用同步與互斥達到合理與安全的模式十分經典,是以産生了一種固定的設計模型,這就是生産者消費者模型。
生産者與消費者模型
生産者與消費者模型中有兩種角色:生産者與消費者,同時包含三種關系:生産者與生産者之間互斥,消費者與消費者之間互斥,生産者與消費者之間同步與互斥。他們工作在一個場景中,這個場景通常是一個隊列,用來儲存資料。
實作
/**
* 基于互斥鎖與條件變量實作一個線程安全的隊列
* 實作生産者與消費者模型
**/
#include <iostream>
#include <queue>
#include <pthread.h>
#define MAXQ 10
class BlockQueue
{
public:
BlockQueue(int maxq = MAXQ)
:_capacity(maxq)
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_consumer, NULL);
pthread_cond_init(&_cond_productor, NULL);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_consumer);
pthread_cond_destroy(&_cond_productor);
}
bool QueuePush(int data)
{
pthread_mutex_lock(&_mutex);
while(_queue.size() == _capacity)
{
pthread_cond_wait(&_cond_productor, &_mutex);
}
_queue.push(data);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond_consumer);
return true;
}
bool QueuePop(int &data)
{
pthread_mutex_lock(&_mutex);
while(_queue.empty())
{
pthread_cond_wait(&_cond_consumer, &_mutex);
}
data = _queue.front();
_queue.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond_productor);
return true;
}
private:
std::queue<int> _queue;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _cond_productor;
pthread_cond_t _cond_consumer;
};
void* thr_consumer(void* arg)
{
BlockQueue* q = (BlockQueue*)arg;
int data;
while(1)
{
//消費者一直擷取資料進行列印
q->QueuePop(data);
std::cout << "consumer gets a piece of data--" << data << std::endl;
}
}
void* thr_productor(void* arg)
{
BlockQueue* q = (BlockQueue*)arg;
int data = 0;
while(1)
{
//生産者一直添加資料
q->QueuePush(data);
std::cout << "producer produces a data--" << (data++) << std::endl;
}
return NULL;
}
int main()
{
pthread_t ctid[4], ptid[4];
int ret, i;
BlockQueue q;
for(i = 0; i < 4; i++)
{
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&q);
if(ret != 0)
{
std::cerr << "create thread error" << std::endl;
}
}
for(i = 0; i < 4; i++)
{
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&q);
if(ret != 0)
{
std::cerr << "create thread error" << std::endl;
}
}
for(i = 0; i < 4; i++)
{
pthread_join(ctid[i], NULL);
pthread_join(ptid[i], NULL);
}
}
consumer gets a piece of data--2111
consumer gets a piece of data--2112
consumer gets a piece of data--2113
consumer gets a piece of data--2114
consumer gets a piece of data--2115
consumer gets a piece of data--2116
consumer gets a piece of data--2117
consumer gets a piece of data--2118
producer produces a data--2119
producer produces a data--2120
producer produces a data--2121
producer produces a data--2122
producer produces a data--2123
producer produces a data--2124
這裡列印之是以看上去亂是因為xshell的顯示跟不上虛拟機計算的速度。
優點
生産者與消費者模型有三個優點:
1、解耦合
2、支援忙閑不均
3、支援并發
一個場所,兩種角色,三種關系。
posix标準信号量
system V
是核心中的計數器,
posix
是線程間的全局計數器。它也有着實作程序/程序間同步與互斥的貢藕功能。
與條件變量的差別
條件變量是通過等待、喚醒操作來讓線程等待在等待隊列上來完成同步,這需要使用者自己進行外部條件判斷并且要搭配互斥鎖一起使用。
信号量是通過自身内部技術實作條件的判斷,不需要搭配互斥鎖,自身已經保證了原子操作。
信号量的工作原理
信号量通過一個計數器實作對資源的計數,并且通過這個計數來判斷目前線程/程序能否對臨界資源進行通路,對臨界資源進行通路之前先發起調用通路信号量進行判斷是否能夠通路。
信号量實作同步:首先資源計數-1,若此時資源計數
>=0
,則可以直接進行通路,調用直接傳回,若信号量内部計數器
<0
表示沒有資源無法通路,調用阻塞(挂起線程);若其他線程生産了一個資源則發起調用,首先資源計數+1,如果此時計數器
<=0
則喚醒等待隊列上的線程,若此時計數器
>0
則什麼都不做。
信号量實作互斥:計數隻有0/1,資源隻有一個,同一時間隻有 一個線程可以通路。首先信号量-1,若此時信号量
<0
則調用阻塞,若
>0
,則調用傳回,對臨界資源進行通路,通路完畢,進行計數+1,喚醒所有線程,所有線程繼續進行搶奪。
同時如果信号量小于0則表示目前阻塞在等待隊列上的線程/程序數,等于0表示資源剛好完全配置設定,大于0則表示多餘資源數。
接口
sem_t//定義信号量。
sem_init(sem_t* sem, int flag, int initval);//初始化,
//flag:0-線程間,!0-程序間
//initval:用于設定初值
sem_wait(sem_t* sem);//,進行判斷是否有資源,<=0則阻塞,>0則-1并調用傳回并。
sem_trywait(sem_t* sem);//非阻塞,沒有資源直接報錯傳回。
sem_timedwait(sem_t* sem);//限時阻塞,等待一段時間,若一直沒有資源則逾時報錯傳回
sem_post(sem_t* sem);//計數+1,并且喚醒等待的線程
sem_destroy(sem_t* sem);//銷毀信号量
應用
用信号量實作線程安全的環形隊列。
/**
* 利用信号量完成線程安全的環形隊列
**/
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <thread>
#define MAXQ 10
class RingQueue
{
public:
RingQueue(int maxq = MAXQ)
:_capacity(maxq)
,_queue(maxq)
,_step_read(0)
,_step_write(0)
{
sem_init(&_lock, 0, 1);
sem_init(&_idle_space, 0, maxq);
sem_init(&_data_space, 0, 0);
}
~RingQueue()
{
sem_destroy(&_lock);
sem_destroy(&_idle_space);
sem_destroy(&_data_space);
}
bool QueuePush(int data)
{
//沒有空閑空間則阻塞
sem_wait(&_idle_space);
//加鎖
sem_wait(&_lock);
_queue[_step_write] = data;
_step_write = (_step_write + 1) % _capacity;
//解鎖
sem_post(&_lock);
//喚醒消費者
sem_post(&_data_space);
return true;
}
bool QueuePop(int& data)
{
sem_wait(&_data_space);
sem_wait(&_lock);
data = _queue[_step_read];
_step_read = (_step_read + 1) % _capacity;
sem_post(&_lock);
sem_post(&_idle_space);
return true;
}
private:
std::vector<int> _queue;//用vector實作環形隊列
int _capacity;//容量
int _step_read;//讀指針
int _step_write;//寫指針
sem_t _lock;//初始計數=1,負責完成互斥
//也需要有兩個等待隊列,分别完成兩個角色間的同步
sem_t _idle_space;//空閑空間節點個數,生産者等待在這裡,完成同步
sem_t _data_space;//資料節點個數,初始=0,消費者等待在這裡,完成同步
};
void thr_producer(RingQueue* q)
{
int data = 0;
while(1)
{
q->QueuePush(data);
std::cout << "push data ----" << data++ << std::endl;
}
}
void thr_consumer(RingQueue* q)
{
int data = 0;
while(1)
{
q->QueuePop(data);
std::cout << "get data ----" << data << std::endl;
}
}
int main()
{
RingQueue q;
std::vector<std::thread> list_con(4);
std::vector<std::thread> list_pro(4);
for(int i = 0; i < 4; i++)
{
list_pro[i] = (std::thread(thr_producer, &q));
}
for(int i = 0; i < 4; i++)
{
list_con[i] = (std::thread(thr_consumer, &q));
}
for(int i = 0; i < 4; i++)
{
list_con[i].join();
list_pro[i].join();
}
}
push data ----4028
get data ----push data ----4102
41004029
3996push data ----4030
push data ----4031
push data ----4032
push data ----push data ----4033
push data ----4034
push data ----4101
push data ----4102
get data ----
push data ----4103
push data ----4104
40974102
get data ----4101
push data ----4103
push data ----get data ----4029
4095
get data ----4031
get data ----4032
get data ----4033
get data ----4034
get data ----4102
get data ----get data ----40304103
push data ----get data ----4104
get data ----4103
get data ----4105
get data ----4104
push data ----3997
push data ----3998
push data ----3999
push data ----4000
push data ----4001
4104get data ----
push data ----4105
push data ----4106
push data ----4107
push data ----4108
push data ----push data ----4035
3997
get data ----3999
get data ----4000
讀寫鎖
讀寫鎖在資料庫中就有着極為重要的應用,這樣才得以讓資料得到共享修改得到合理的儲存而不出現資料的二義性。
特點,原理及應用
讀寫鎖有着自己的特點:寫互斥,讀共享,一個使用者寫時所有其他所有使用者都不能讀和寫;一個使用者讀時其他所有使用者都可以讀但不能寫,是以适用于多讀少寫的應用場景,保證資料不會出現二義性并且保證讀取和寫入的效率。
讀寫鎖内部有兩個計數器,讀者計數與寫者計數。加寫鎖時對兩個技術進行判斷如果任意一個計數器>0,都無法加寫鎖需要等待。加讀鎖時對寫者計數進行判斷,若大于0,則無法加讀鎖需要進行等待。
讀寫鎖通過自旋鎖實作,不滿足條件時自旋等待。自旋鎖的特點是:等待中不停循環對條件進行判斷,是以可以及時響應,但是cpu消耗較高。自旋鎖一般應用在對于挂起等待被喚醒時間相較于資料處理時間可以忽略不記的情況下,這樣更傾向于挂起。
線程池
為什麼要有線程池
假如在一個任務的處理時間中,若線程建立及銷毀的時間占用任務處理的大量比例,則意味着大量任務進行中,資源被浪費線上程的建立與銷毀中。是以産生線程池,建立大量線程,但并不推出線程并不斷把任務交給這些線程處理,避免了大量建立線程/銷毀帶來的時間成本。
線程池中線程數量是有上限的,為了防止出現峰值壓力導緻資源瞬間耗盡程式崩潰。
線程池的實作
/**
* 線程池由兩個部分構成一個是一個任務類
* 另一個部分是一個線程安全的隊列,由此構成任務隊列,
* 再用一組線程從任務隊列中擷取任務來執行
* 由此構成線程池
**/
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>
#include <thread>
#include <string>
#include <time.h>
#include <stdlib.h>
#include <sstream>
#define MAX_THREAD 5
#define MAX_QUEUE 10
typedef void(*handler_t)(int val);
//任務類
//1、決定線程處理的任務,處理什麼資料,怎麼處理都由使用者傳入
class Task
{
private:
int _data;//資料
handler_t _handler;//處理資料的方法,函數指針,用于傳入線程中給線程下達指令
public:
Task(int data, handler_t handler)
:_data(data)
,_handler(handler)
{
}
void SetTask(int data, handler_t handler)
{
_data = data;
_handler = handler;
}
void Run()
{
return _handler(_data);
}
};
//線程池類
class ThreadPool
{
private:
std::queue<Task> _queue;//任務隊列
int _capacity;//線程池最大任務數量
pthread_mutex_t _mutex;//鎖,完成互斥,類似于生産者消費者模型
pthread_cond_t _cond_pro;//條件變量,完成
pthread_cond_t _cond_con;
int _thr_max;//線程池擁有的總線程數
std::vector<std::thread> _thr_list;//線程組,存儲線程操作句柄
bool _quit_flag;//用于控制線程是否退出
int _thr_cur;//線程的數量,線程退出時,判斷目前線程數量
void thr_start()
{
while(1)
{
pthread_mutex_lock(&_mutex);
while(_queue.empty())
{
if(_quit_flag == true)
{
std::cout << "thread exit " << pthread_self() << std::endl;
pthread_mutex_unlock(&_mutex);
_thr_cur--;
return;
}
pthread_cond_wait(&_cond_con, &_mutex);
}
Task tt = _queue.front();
_queue.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond_pro);
//任務處理放到鎖外,防止線程處理任務時間過長,一直加鎖導緻其他線程無法處理其他任務
tt.Run();
}
}
public:
//初始化線程池
ThreadPool(int maxq = MAX_QUEUE, int maxt = MAX_THREAD)
:_capacity(maxq)
,_thr_max(maxt)
,_thr_list(maxt)
,_thr_cur(0)
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_pro, NULL);
pthread_cond_init(&_cond_con, NULL);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_con);
}
//初始化線程組
bool PoolInit()
{
for(int i = 0; i < _thr_max; i++)
{
_thr_list[i] = std::thread(&ThreadPool::thr_start, this);
_thr_list[i].detach();
_thr_cur++;
}
return true;
}
//添加任務
bool AddTask(Task& tt)
{
pthread_mutex_lock(&_mutex);
while(_queue.size() == _capacity)
{
pthread_cond_wait(&_cond_pro, &_mutex);
}
_queue.push(tt);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond_con);
return true;
}
//銷毀線程池,停止工作
bool PoolStop()
{
pthread_mutex_lock(&_mutex);
_quit_flag = true;
pthread_mutex_unlock(&_mutex);
while(_thr_cur > 0)
{
//std::cout << "cont:" << _thr_cur << std::endl;
pthread_cond_broadcast(&_cond_con);
usleep(1000);
}
//for(int i = 0; i < _thr_max; i++)
//{
// _thr_list[i].join();
//}
return true;
}
};
void test(int data)
{
srand(time(NULL));
int nsec = rand() % 5;
std::stringstream ss;
ss << "thread:" << pthread_self() << " processint data ";
ss << data << " and sleep " << nsec << " sec" << std::endl;
std:: cout << ss.str();
sleep(nsec);
return;
}
int main()
{
ThreadPool pool;
pool.PoolInit();
for(int i = 0; i < 10; i++)
{
Task tt(i, test);
pool.AddTask(tt);
}
pool.PoolStop();
}
[[email protected] thread]$ ./threadpool
thread:139941165012736 processint data 0 and sleep 4 sec
thread:139941156620032 processint data 1 and sleep 4 sec
thread:139941190190848 processint data 2 and sleep 4 sec
thread:139941181798144 processint data 3 and sleep 4 sec
thread:139941173405440 processint data 4 and sleep 4 sec
thread:139941190190848 processint data 6 and sleep 4 sec
thread:139941156620032 processint data 7 and sleep 4 sec
thread:139941181798144 processint data 8 and sleep 4 sec
thread:139941165012736 processint data 5 and sleep 4 sec
thread:139941173405440 processint data 9 and sleep 4 sec
thread exit 139941156620032
thread exit 139941190190848
thread exit 139941181798144
thread exit 139941165012736
thread exit 139941173405440
設計模式
單例模式
單例模式是一種常見設計模式,之前已經多次介紹,Cpp章節中也有實作。單例模式使用場景是在一個資源隻能被加載配置設定一次,一個類隻能執行個體化一個對象的情況。
餓漢模式
資源一次性加載配置設定完,對象在程式初始化階段執行個體化完畢,這種實作是線程安全的,程式運作起來比較流暢,但是啟動加載時間可能過長。
懶漢模式
資源使用時再加載配置設定,對象再使用的時候再去執行個體化,這種實作加載快,同一時間消耗資源少,但是運作中可能卡頓。這種實作是線程不安全的,是以我們要加鎖判斷類是否執行個體化過,如果沒有則執行個體化。