多個線程共享相同的記憶體時,需要確定每個線程看到一緻的資料視圖。
1.互斥量
可以通過使用pthread的互斥接口保護資料,確定同一時間隻有一個線程通路資料,互斥量(mutex)從本質上說是一把 鎖,在通路共享資源前對互斥量進行枷鎖,在通路完成後釋放互斥量上的鎖。對互斥量進行加鎖後,任何其他試圖再次對互斥 量進行加鎖的線程将被阻塞知道目前線程釋放該互斥鎖。如果釋放互斥鎖時有多個線程阻塞,所有在該互斥鎖上阻塞線程都會 變成可運作狀态,第一個變為運作狀态的線程可以對互斥量進行加鎖,其他線程将會看到互斥鎖依然被鎖住,隻有回去再次等 待它重新變成可用。在這種方式下,每次隻有一個線程可以向前執行。 互斥變量用pthread_mutex_t資料類型來表示,在使用互斥變量以前,必須首先對它進行初始化,可以把它置為常量 PTHREAD_MUTEX_INITIALIZER(隻對靜态配置設定的互斥量),也可以通過調用pthread_mutex_init函數進行初始化。如果動态地 配置設定互斥量(例如通過調用malloc),那麼在釋放記憶體前需要調用pthread_mutex_destroy。 [cpp] view plain copy
- #include<pthread.h>
- int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
- int pthread_mutex_destroy(pthread_mutex_t * mutex);
- //成功傳回0,否則傳回錯誤編号
使用預設的屬性初始化互斥量,隻要将attr設定為NULL。
對互斥量進行加鎖,需要調用pthread_mutex_lock,如果互斥量已經上鎖,調用線程将阻塞直到互斥量被解鎖。對互斥量 解鎖,需要調用pthread_mutex_unlock。 [cpp] view plain copy
- #include <pthread.h>
- int pthread_mutex_lock(pthread_mutex_t *mutex);
- int pthread_mutex_trylock(pthread_mutex)_t *mutex);
- int pthread_mutex_unlock(pthread_mutex_t *mutex);
- //如果成功傳回0,否則傳回錯誤編号。
如果線程不希望被阻塞,它可以使用pthread_mutex_trylock嘗試對互斥量進行加鎖。如果調用pthread_mutex_trylock時 互斥量處于未鎖住狀态,那麼pthread_mutex_trylock将鎖住互斥量,不會出現并傳回0,否則pthread_mutex_trylock就會失敗, 不能鎖住互斥量,而傳回EBUSY。
實踐: 在沒有使用互斥鎖時:
[cpp] view plain copy
- #include <stdio.h>
- #include <pthread.h>
- #include <malloc.h>
- #include <string.h>
- void* th_func(void* arg){
- int i;
- for(i=0; i<5; i++){
- printf("1\n");
- sleep(1);
- printf("2\n");
- }
- }
- int main(void){
- int ret;
- pthread_t tid1,tid2;
- ret = pthread_create(&tid1, NULL, th_func, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- return -1;
- }
- ret = pthread_detach(tid1);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- return -1;
- }
- ret = pthread_create(&tid2, NULL, th_func, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- return -1;
- }
- ret = pthread_detach(tid2);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- return -1;
- }
- sleep(15);
- return 0;
- }
程式輸出為: 1
1
2
1
2
1
2
1
2
1
2
1
2
1
2
1
2
1
2
2
使用了互斥鎖之後: [cpp] view plain copy
- #include <stdio.h>
- #include <pthread.h>
- #include <malloc.h>
- #include <string.h>
- pthread_mutex_t *mutex;
- void* th_func(void* arg){
- int i;
- for(i=0; i<5; i++){
- pthread_mutex_lock(mutex);
- printf("1\n");
- sleep(1);
- printf("2\n");
- pthread_mutex_unlock(mutex);
- }
- }
- int main(void){
- int ret,result = 0;
- pthread_t tid1,tid2;
- mutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
- if(mutex == NULL){
- perror("malloc");
- result = -1;
- goto FINALLY;
- }
- pthread_mutex_init(mutex,NULL);
- ret = pthread_create(&tid1, NULL, th_func, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid1);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_create(&tid2, NULL, th_func, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid2);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- sleep(15);
- FINALLY:
- if(mutex != NULL){
- pthread_mutex_destroy(mutex);
- free(mutex);
- }
- return result;
- }
程式輸出為: 1
2
1
2
1
2
1
2
1
2
1
2
1
2
1
2
1
2
1
2
2.避免死鎖
如果線程試圖對同一個互斥量加鎖兩次,那麼它自身就會陷入死鎖狀态,使用互斥量時,還有其他更不明顯的方式也能産生死鎖, 例如,程式中使用多個互斥量,如果允許一個線程一直占有一個互斥量,并且試圖鎖住第二個互斥量時處于阻塞狀态,但是擁有第二 個互斥量的線程也在試圖鎖住第一個互斥量,這時就會發生死鎖。因為兩個線程都在互相請求另一個線程擁有的資源,是以這兩個線程 都無法向前運作,于是就産生死鎖。 可以通過控制互斥量加鎖的順序來避免死鎖的發生。例如,假設需要對兩個互斥量A和B同時加鎖,如果所有線程總是在對互斥量B 加鎖之前鎖住A,那麼使用這兩個互斥量不會産生死鎖,類似地,如果所有的此案成總是在鎖住互斥量A之前鎖住B,那麼也不會發生死 鎖。
3.讀寫鎖
讀寫鎖與互斥量類似,不過讀寫鎖允許更高的并行性。互斥量要麼是鎖住狀态要麼不加鎖狀态,而且一次隻有一個線程可以對其 加鎖。讀寫鎖可以由三種狀态:讀模式下加鎖狀态,寫模式下加鎖狀态,不加鎖狀态。一次隻有一個線程可以占有寫模式的讀寫鎖, 但是多個線程可以同時占有讀模式的讀寫鎖。 當讀寫鎖是寫加鎖狀态時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀态時,所有試圖 以讀模式對它進行加鎖的線程都可以得到通路權,但是如果線程希望以寫模式對此鎖進行加鎖,他必須阻塞直到所有的線程釋放讀鎖。 當讀寫鎖處理讀模式鎖住狀态時,如果有另外的線程試圖以寫模式加鎖,讀寫鎖通常會阻塞随後的讀模式鎖請求,這樣可以避免讀模式 鎖長期占用,而等待的些模式請求一直得不到滿足。 讀寫鎖非常适用于對資料結構讀的次數遠大于寫的情況。 通過調用pthread_rwlock_init程序初始化,如果希望讀寫鎖有預設的屬性,可以傳一個空指針給attr。 在釋放讀寫鎖占用的記憶體之前,需要調用pthread_rwlock_destroy做清理工作。 [cpp] view plain copy
- #include <pthread.h>
- int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
- int pthread_rwlock_destory(pthread_rwlock_t *rwlock);
- //若成功傳回0,否則傳回錯誤編号。
在讀模式下鎖住讀寫鎖,需要調用pthread_rwlock_rdlock,要在寫模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock,不管以 何種方式鎖住讀寫鎖,都可以調用pthread_rwlock_unlock進行解鎖。 [cpp] view plain copy
- #include<pthread.h>
- int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
- int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
- int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
- //成功則傳回0,否則傳回錯誤編号。
Single UNIX Specification同樣定義了有條件的讀寫鎖原語版本。 [cpp] view plain copy
- #include <pthread.h>
- int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
- int pthread_rwlock_tryrwlock(pthread_rwlock_t *rwlock);
- //成功傳回0,否則傳回錯誤編号。
實踐: [cpp] view plain copy
- #include <stdio.h>
- #include <pthread.h>
- #include <malloc.h>
- #include <string.h>
- pthread_rwlock_t *rwlock;
- int gi=0;
- void* th_func1(void* arg){
- int i;
- for(i=0; i<5; i++){
- pthread_rwlock_rdlock(rwlock);
- printf("start1:%d\n",gi);
- sleep(1);
- printf("end1:%d\n",gi);
- pthread_rwlock_unlock(rwlock);
- sleep(1);
- }
- }
- void* th_func2(void* arg){
- int i;
- for(i=0; i<5; i++){
- pthread_rwlock_rdlock(rwlock);
- printf("start2:%d\n",gi);
- sleep(1);
- printf("end2:%d\n",gi);
- pthread_rwlock_unlock(rwlock);
- sleep(1);
- }
- }
- void* th_func3(void* arg){
- int i;
- for(i=0; i<5; i++){
- pthread_rwlock_wrlock(rwlock);
- gi++;
- sleep(1);
- pthread_rwlock_unlock(rwlock);
- sleep(1);
- }
- }
- int main(void){
- int ret,result = 0;
- pthread_t tid1,tid2,tid3;
- rwlock = (pthread_rwlock_t*)malloc(sizeof(pthread_rwlock_t));
- if(rwlock == NULL){
- perror("malloc");
- goto FINALLY;
- }
- ret = pthread_create(&tid1, NULL, th_func1, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid1);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_create(&tid2, NULL, th_func2, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid2);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_create(&tid3, NULL, th_func3, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid3);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- sleep(15);
- FINALLY:
- if(rwlock != NULL){
- pthread_rwlock_destroy(rwlock);
- free(rwlock);
- }
- return result;
- }
結果: start2:1
start1:1
end2:1
end1:1
start2:2
start1:2
end2:2
end1:2
start2:3
start1:3
end2:3
end1:3
start2:4
start1:4
end2:4
end1:4
start2:5
start1:5
end2:5
end1:5
一次循環中start和end中的值都是一樣的。
如果我們修改下程式: [cpp] view plain copy
- #include <stdio.h>
- #include <pthread.h>
- #include <malloc.h>
- #include <string.h>
- int gi=0;
- void* th_func1(void* arg){
- int i;
- for(i=0; i<5; i++){
- printf("start1:%d\n",gi);
- sleep(1);
- printf("end1:%d\n",gi);
- sleep(1);
- }
- }
- void* th_func2(void* arg){
- int i;
- for(i=0; i<5; i++){
- printf("start2:%d\n",gi);
- sleep(1);
- printf("end2:%d\n",gi);
- sleep(1);
- }
- }
- void* th_func3(void* arg){
- int i;
- for(i=0; i<5; i++){
- gi++;
- sleep(1);
- }
- }
- int main(void){
- int ret,result = 0;
- pthread_t tid1,tid2,tid3;
- ret = pthread_create(&tid1, NULL, th_func1, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid1);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_create(&tid2, NULL, th_func2, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid2);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_create(&tid3, NULL, th_func3, NULL);
- if(ret != 0){
- printf("pthread_create:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- ret = pthread_detach(tid3);
- if(ret != 0){
- printf("pthread_detach:%s\n",strerror(ret));
- result = -1;
- goto FINALLY;
- }
- sleep(15);
- FINALLY:
- return result;
- }
運作結果: start2:1
start1:1
end2:2
end1:2
start2:3
start1:3
end2:4
end1:4
start2:5
start1:5
end2:5
end1:5
start2:5
start1:5
end2:5
end1:5
start2:5
start1:5
end2:5
end1:5
一次循環中start和end中的值可能會不一樣。
4.條件變量
條件變量是線程可用的另一種同步機制。條件變量給多個線程提供了一個回合的場所。條件變量與互斥量一起使用時,允許線程 以無競争的方式等待特定的條件發生。 條件本身是由互斥量保護的,線程在改變條件狀态前必須首先鎖住互斥量,其他線程在獲得互斥量之前不會覺察到這種改變,因為 必須鎖住互斥量後才能計算條件。 條件變量使用之前必須先進行初始化,pthread_cond_t資料結構代表的條件變量可以用兩種方式進行初始化,可以把常量 PTHREAD_COND_INITIALIZER賦給靜态配置設定的條件變量,但是如果條件變量是動态配置設定的,可以使用pthread_cond_init函數初始化。 在釋放底層的記憶體空間之前,可以使用pthread_cond_destroy函數對條件變量進行去除初始化。 [cpp] view plain copy
- #include<pthread.h>
- int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);
- int pthread_cond_destroy(pthread_cond_t *cond);
- //成功則傳回0,否則傳回錯誤編号。
使用pthread_cond_wait等待條件變成真。 [cpp] view plain copy
- #include<pthread.h>
- int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- //成功傳回0,否則傳回錯誤編号。
傳遞給pthread_cond_wait的互斥量對條件進行保護,調用者把鎖住的互斥量傳給函數。函數把調用線程放到等待條件的線程清單上, 然後對互斥量解鎖,這兩個操作時原子操作,這樣就關閉了條件檢查和線程進入休眠狀态等待條件改變這兩個操作之間的時間通道, 這樣線程就不會錯過條件的任何變化,pthread_cond_wait傳回時,互斥量再次被鎖住。
[cpp] view plain copy
- #include<pthread.h>
- int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex,
- const struct timespec *restrict timeout);
- //成功傳回0,否則傳回錯誤編号。
pthread_cond_timedwait函數的工作方式與pthread_cond_wait函數相似,隻是多了一個timeout,timeout值指定了等待的時間。它 通過timespec結構指定。 [cpp] view plain copy
- struct timespec{
- time_t tv_sec; //秒數
- long tv_nsec; //納秒
- }
使用這個結構體時,需要指定等待多長時間,時間值是一個絕對值而不是相對值,例如如果要等待3分鐘,就需要把目前時間加上3分鐘 再轉換到timespec結構。 如果時間到了但是條件還是沒有出現,pthread_cond_timedwait将重新擷取互斥量然後傳回錯誤ETIMEDOUT。 從pthread_cond_wait或者pthread_cond_timedwait調用成功傳回時,線程需要重新計算條件,因為其他的線程可能已經在運作并改變 了條件。
有兩個函數可以用于通知線程條件已經滿足。pthread_cond_signal函數将喚醒等待該線程的某個線程,而pthread_cond_broadcast函數 将喚醒等待該條件的所有線程。 [cpp] view plain copy
- #include <pthread.h>
- int pthread_cond_siganl(pthread_cond_t *cond);
- int pthread_cond_broadcast(pthread_cond_t *cond);
- //成功則傳回0,否則傳回錯誤編号。
實踐: [cpp] view plain copy
- #include <stdio.h>
- #include <pthread.h>
- #include <string.h>
- pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
- pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
- int gi=0;
- void* substract(void* arg){
- int i;
- for(i=0; i<5; i++){
- pthread_mutex_lock(&qlock);
- while(gi == 0)
- pthread_cond_wait(&qready, &qlock);
- printf("before substract gi:%d.\n",gi);
- gi--;
- printf("after substract gi:%d.\n",gi);
- pthread_mutex_unlock(&qlock);
- }
- }
- void* add(void* arg){
- int i;
- for(i=0; i<5; i++){
- pthread_mutex_lock(&qlock);
- printf("before add gi:%d.\n",gi);
- gi++;
- printf("after add gi:%d.\n",gi);
- pthread_mutex_unlock(&qlock);
- pthread_cond_signal(&qready);
- }
- }
- int main(void){
- int ret;
- pthread_t tid1,tid2;
- ret = pthread_create(&tid1, NULL, substract, NULL);
- if(ret != 0){
- printf("pthread_create:%s",strerror(ret));
- return -1;
- }
- <span style="white-space:pre"> </span>sleep(1);
- ret = pthread_create(&tid2, NULL, add, NULL);
- if(ret != 0){
- printf("pthread_create:%s",strerror(ret));
- return -1;
- }
- ret = pthread_join(tid1,NULL);
- if(ret != 0){
- printf("pthread_join:%s",strerror(ret));
- return -1;
- }
- ret = pthread_join(tid2,NULL);
- if(ret != 0){
- printf("pthread_join:%s",strerror(ret));
- return -1;
- }
- return 0;
- }
運作結果: before add gi:0.
after add gi:1.
before substract gi:1.
after substract gi:0.
before add gi:0.
after add gi:1.
before substract gi:1.
after substract gi:0.
before add gi:0.
after add gi:1.
before substract gi:1.
after substract gi:0.
before add gi:0.
after add gi:1.
before substract gi:1.
after substract gi:0.
before add gi:0.
after add gi:1.
before substract gi:1.
after substract gi:0.
自旋鎖
自旋鎖與互斥量類似,但它不是通過休眠使程序阻塞,而是在擷取鎖之前一直處于忙等(自旋)狀态。自旋鎖可用于以下情況:鎖被持有的時間短,而且線程并不希望在重新排程上花費太多的成本。
自旋鎖通常作為底層原語用于實作其他類型的鎖。根據它們所基于的系統體系結構,可以通過使用測試并設定指令有效地實作。當然這裡說的有效也還是會導緻CPU資源的浪費:當線程自旋等待鎖變為可用時,CPU不能做其他的事情。這也是自旋鎖隻能夠被位置一小段時間的原因。
自旋鎖用在非搶占式核心中時是非常有用的:除了提供互斥機制以外,它們會阻塞中斷,這樣中斷處理程式就不會讓系統陷入死鎖狀态,因為它需要擷取已被加鎖的自旋鎖(把中斷想成另一種搶占)。在這種類型的核心中,中斷處理程式不能休眠,因為它們能用的同步原語隻能是自旋鎖。
屏障
屏障(barrier)是使用者協調多個線程并行工作的同步機制。屏障允許每個線程等待,直到所有的合作線程都到達某一點,然後從該點繼續執行。我們已經看到一種屏障,pthread_join函數是一種屏障,允許一個線程等待,直到另一個線程退出。
但是屏障對象的概念更廣,它們允許任意數量的線程等待,直到所有的線程處理完工作,而線程不需要退出。所有線程到達屏障後可以直接工作。
互斥鎖和自旋鎖的差別:
自旋鎖是一種非阻塞鎖,也就是說,如果某線程需要擷取自旋鎖,但該鎖已經被其他線程占用時,該線程不會被挂起,而是在不斷的消耗CPU的時間,不停的試圖擷取自旋鎖。
互斥量是阻塞鎖,當某線程無法擷取互斥量時,該線程會被直接挂起,該線程不再消耗CPU時間,當其他線程釋放互斥量後,作業系統會激活那個被挂起的線程,讓其投入運作。
兩種鎖适用于不同場景:
如果是多核處理器,如果預計線程等待鎖的時間很短,短到比線程兩次上下文切換時間要少的情況下,使用自旋鎖是劃算的。
如果是多核處理器,如果預計線程等待鎖的時間較長,至少比兩次線程上下文切換的時間要長,建議使用互斥量。
如果是單核處理器,一般建議不要使用自旋鎖。因為,在同一時間隻有一個線程是處在運作狀态,那如果運作線程發現無法擷取鎖,隻能等待解鎖,但因為自 身不挂起,是以那個擷取到鎖的線程沒有辦法進入運作狀态,隻能等到運作線程把作業系統分給它的時間片用完,才能有機會被排程。這種情況下使用自旋鎖的代價 很高。
如果加鎖的代碼經常被調用,但競争情況很少發生時,應該優先考慮使用自旋鎖,自旋鎖的開銷比較小,互斥量的開銷較大。