天天看點

線程的互斥與同步

線程的互斥與同步

    • 線程的互斥
      • 簡單的搶票程式
      • 互斥量
        • 初始化互斥量
        • 銷毀互斥量
        • 互斥量加鎖和解鎖
      • 互斥量實作原理
    • 可重入VS線程安全
      • 概念
        • 常見的線程不安全的情況
        • 常見的線程安全的情況
        • 常見不可重入的情況
        • 常見可重入的情況
        • 可重入與線程安全聯系
        • 可重入與線程安全差別
    • 死鎖
      • 死鎖四個必要條件
      • 避免死鎖
    • 程序同步
      • 條件變量
        • 條件變量初始化
        • 銷毀
        • 等待條件滿足
        • 喚醒等待
      • 資源等待隊列
    • 生産者消費者模型
      • 單生産者,單消費者,一個blockingqueue
    • POSIX信号量
      • 初始化信号量
      • 銷毀信号量
      • 等待信号量
      • 釋出信号量
      • 基于環形隊列的生産消費模型
      • 線程池
    • 線程安全的單例模式
      • 餓漢方式實作單例模式
      • 懶漢方式實作單例模式(線程安全版本)
    • STL,智能指針線程安全問題
      • STL中的容器是否是線程安全的
      • 智能指針是否是線程安全的?
    • 其他常見的各種鎖
    • 讀寫者問題
      • 初始化
      • 銷毀
      • 加鎖和解鎖

線程的互斥

相關概念:

臨界資源:多線程執行流共享的資源就叫做臨界資源

臨界區:每個線程内部,通路臨界資源的代碼,就叫做臨界區

互斥:任何時刻,互斥保證有且隻有一個執行流進入臨界區,通路臨界資源,通常對臨界資源起保護作用

原子性:不會被任何排程機制打斷的操作,該操作隻有兩态,要麼完成,要麼未完成

簡單的搶票程式

//未保護臨界區
    #include<stdio.h>
    #include<unistd.h>
    #include<pthread.h>
    #include<sys/stat.h>
    #include<sys/types.h>
    int tickets=1000;
    void* TicketGrabbing(void* arg)
    {
      const char* str=(char*) arg;
      while(1)
      {
        if(tickets>0)
        {
            usleep(100);
            printf("%s:get a ticket:%d\n",str,tickets--);
        }
        else 
        {
          break;
        }
      }                                                                                                                                         
      printf("%s quit!\n",str);
      pthread_exit((void*)0);
    }
    int main()
    {
      pthread_t t1,t2,t3,t4;
      pthread_create(&t1,NULL,TicketGrabbing,"thread 1"); 
      pthread_create(&t2,NULL,TicketGrabbing,"thread 2");        	    pthread_create(&t3,NULL,TicketGrabbing,"thread 3");              	pthread_create(&t4,NULL,TicketGrabbing,"thread 4");
      pthread_join(t1,NULL);
      pthread_join(t2,NULL);
      pthread_join(t3,NULL);
      pthread_join(t4,NULL);
      return 0;
    }              
           
線程的互斥與同步

可以看到,票數竟然被搶到了負數,那這是為什麼呢?

線程的互斥與同步

a++為什麼不是原子的:

線程的互斥與同步

這就是對臨界資源沒有進行保護而造成了問題

互斥量

線程的互斥與同步

為解決上面的問題,本質上就是需要一把鎖。Linux上提供的這把鎖叫互斥量

初始化互斥量

方法1,靜态配置設定:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

方法2,動态配置設定:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

參數:

mutex:要初始化的互斥量

attr:初始屬性,NULL預設

銷毀互斥量

int pthread_mutex_destroy(pthread_mutex_t *mutex);

注意:

1.使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要銷毀

2.不要銷毀一個已經加鎖的互斥量

3.已經銷毀的互斥量,要確定後面不會有線程再嘗試加鎖

互斥量加鎖和解鎖

加鎖是有損于性能的

int pthread_mutex_lock(pthread_mutex_t *mutex);

int pthread_mutex_unlock(pthread_mutex_t *mutex);

傳回值:成功傳回0,失敗傳回錯誤号

對于lock函數:其他線程已經鎖定互斥量,或者存在其他線程同時申請互斥量,但沒有競争到互斥量,那麼pthread_ lock調用會陷入阻塞(執行流被挂起),等待互斥量解鎖,這樣保證臨界區隻有一個線程通路,保護了臨界區

通過互斥量實作線程安全版本的搶票:

#include<stdio.h>
    #include<unistd.h>
    #include<pthread.h>
    #include<sys/stat.h>
    #include<sys/types.h>
    int tickets=1000;
    pthread_mutex_t mutex;
    void* TicketGrabbing(void* arg)
    {
      const char* str=(char*) arg; 
      while(1)
      {
        pthread_mutex_lock(&mutex);
        if(tickets>0)
        {
            usleep(100);
            printf("%s:get a ticket:%d\n",str,tickets--);
            pthread_mutex_unlock(&mutex);
        }                                                                                                                                       
        else
        {
          pthread_mutex_unlock(&mutex);
          break;
        }
      }
      printf("%s quit!\n",str);
		pthread_exit((void*)0);
    }
    int main()
    {
      pthread_mutex_init(&mutex,NULL);
      pthread_t t1,t2,t3,t4;
      pthread_create(&t1,NULL,TicketGrabbing,"thread 1"); 
      pthread_create(&t2,NULL,TicketGrabbing,"thread 2");        
      pthread_create(&t3,NULL,TicketGrabbing,"thread 3");
      pthread_create(&t4,NULL,TicketGrabbing,"thread 4");
      pthread_join(t1,NULL);
      pthread_join(t2,NULL);
      pthread_join(t3,NULL);
      pthread_join(t4,NULL);
      pthread_mutex_destroy(&mutex);
      return 0;
    }                                
           
線程的互斥與同步
線程的互斥與同步

互斥量實作原理

為了實作互斥鎖操作,大多數體系結構都提供了swap或exchange指令,該指令的作用是把寄存器和記憶體單元的資料相交換,由于隻有一條指令,保證了原子性

即使是多處理器平台,通路記憶體的 總線周期也有先後,一個處理器上的交換指令執行時另一個處理器的交換指令隻能等待總線周期

線程的互斥與同步

加鎖解鎖是如何完成的:

線程的互斥與同步

是以本質上是1隻有一份,隻有拿到那個1的線程才能return執行臨界區的代碼

可重入VS線程安全

概念

線程安全:多個線程并發同一段代碼時,不會出現不同的結果。常見對全局變量或者靜态變量進行操作,并且沒有鎖保護的情況下,會出現該問題

重入:同一個函數被不同的執行流調用,目前一個流程還沒有執行完,就有其他的執行流再次進入,我們稱之為重入。一個函數在重入的情況下,運作結果不會出現任何不同或者任何問題,則該函數被稱為可重入函數,否則,是不可重入函數

線程安全與重入的關系:

線程的互斥與同步

常見的線程不安全的情況

1.不保護共享變量的函數

2.函數狀态随着被調用,狀态發生變化的函數

3.傳回指向靜态變量指針的函數

4.調用線程不安全函數的函數

常見的線程安全的情況

1.每個線程對全局變量或者靜态變量隻有讀取的權限,而沒有寫入的權限,一般來說這些線程是安全的

2.類或者接口對于線程來說都是原子操作

3.多個線程之間的切換不會導緻該接口的執行結果存在二義性

常見不可重入的情況

1.調用了malloc/free函數,因為malloc函數是用全局連結清單來管理堆的

2.調用了标準I/O庫函數,标準I/O庫的很多實作都以不可重入的方式使用全局資料結構

3.可重入函數體内使用了靜态的資料結構

常見可重入的情況

1.不使用全局變量或靜态變量

2.不使用用malloc或者new開辟出的空間

3.不調用不可重入函數

4.不傳回靜态或全局資料,所有資料都有函數的調用者提供

5.使用本地資料,或者通過制作全局資料的本地拷貝來保護全局資料

可重入與線程安全聯系

1.函數是可重入的,那就是線程安全的

2.函數是不可重入的,那就不能由多個線程使用,有可能引發線程安全問題

3.如果一個函數中有全局變量,那麼這個函數既不是線程安全也不是可重入的

可重入與線程安全差別

1.可重入函數是線程安全函數的一種

2.線程安全不一定是可重入的,而可重入函數則一定是線程安全的

3.如果将對臨界資源的通路加上鎖,則這個函數是線程安全的,但如果這個重入函數若鎖還未釋放則會産生死鎖,是以是不可重入的

死鎖

死鎖是指在一組程序中的各個程序均占有不會釋放的資源,但因互相申請被其他程序所站用不會釋放的資源而處于的一種永久等待狀态

死鎖四個必要條件

①互斥條件:一個資源每次隻能被一個執行流使用

②請求與保持條件:一個執行流因請求資源而阻塞時,對已獲得的資源保持不放

③不剝奪條件:一個執行流已獲得的資源,在末使用完之前,不能強行剝奪

④循環等待條件:若幹執行流之間形成一種頭尾相接的循環等待資源的關系

避免死鎖

1.破壞死鎖的四個必要條件

2.加鎖順序一緻

3.避免鎖未釋放的場景

4.資源一次性配置設定

程序同步

同步:在保證資料安全的前提下,讓線程能夠按照某種特定的順序通路臨界資源,進而有效避免饑餓問題,叫做同步

為什麼要有程序同步?

單純的互斥會讓競争力強的線程獲得鎖,競争力弱的一直等待,效率十分低

條件變量

程序同步通過條件變量來實作,類似于程序互斥的互斥量

條件變量:用來描述某種臨界資源是否就緒的一種資料化描述

條件變量通常要與互斥搭配使用

條件變量初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

參數:

cond:要初始化的條件變量

attr:NULL

銷毀

int pthread_cond_destroy(pthread_cond_t *cond)

等待條件滿足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

參數:

cond:要在這個條件變量上等待

mutex:互斥量

線程挂起等待時還拿着互斥鎖,别的線程無法進入臨界區,這時需要釋放鎖mutex

喚醒等待

int pthread_cond_broadcast(pthread_cond_t *cond);//喚醒所有

int pthread_cond_signal(pthread_cond_t *cond);//喚醒一個

示例:

#include<iostream>                                                                                                                            
  #include<pthread.h>
  #include<cstdio>
  using namespace std;
  pthread_mutex_t mutex;
  pthread_cond_t cond;
  void* run(void* arg)
  {
    pthread_detach(pthread_self());
    cout<<(char*)arg<<"run....."<<endl;
    while(true)
    {
      pthread_cond_wait(&cond,&mutex);//初始條件不滿足,等待條件滿足
      cout<<"thread"<<pthread_self()<<"runing.."<<endl; 
    }
  }
  int main()
  {
    pthread_mutex_init(&mutex,NULL);
    pthread_cond_init(&cond,NULL);
    pthread_t t1,t2,t3;
    pthread_create(&t1,NULL,run,(void*)"thread t1");
    pthread_create(&t2,NULL,run,(void*)"thread t2");
    pthread_create(&t3,NULL,run,(void*)"thread t3");
    while(true)
    {
    getchar();
    pthread_cond_signal(&cond);
    }
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    return 0;
  }               
           
線程的互斥與同步

可以看到使用同步後,列印結果出現了相當強的順序性

因為每個線程等待時都在資源的等待隊列中,喚醒隊頭後再等待插入到隊尾,這樣就出現每個線程順序列印

資源等待隊列

線程的互斥與同步

生産者消費者模型

本質上是一個線程安全的隊列,和兩種角色的線程(生産者和消費者)

生産者消費者模式就是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。這個阻塞隊列就是用來給生産者和消費者解耦的

三種關系:

生産者與生産者(競争關系,互斥關系)

消費者和消費者(競争關系,互斥關系)

生産者和消費者(互斥關系(保證資料的正确性),同步關系(多線程協同))

兩種角色:

生産者和消費者

一個交易場所:

通常是記憶體中的緩沖區,自己通過某種方式實作的

生産者消費者模型優點:

1.解耦

2.支援并發

3.支援忙閑不均

為什麼要有生産者與消費者模型?

通過緩沖區,實作了生産者與消費者的解耦

單生産者,單消費者,一個blockingqueue

線程的互斥與同步
線程的互斥與同步

代碼實作:

blockingqueue.c

#pragma once
  #include<iostream>
  using namespace std;
  #include<pthread.h>
  #include<queue>
  #include<cstdlib>
  #include<ctime>
  #define NUM 10
  template<class T>
  class blockingqueue
  {                                                                                                                                             
    bool isFull()
    {
      return q.size()==cap;
    }
    bool isEmpty()
    {
      return q.size()==0;
    }
    public:
      blockingqueue(int _cap=NUM)
        :cap(_cap)
      {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&full,NULL);
        pthread_cond_init(&empty,NULL);
      }
      void Push(T& in)
      {
        pthread_mutex_lock(&lock);
        while(isFull())
        {
          //wait是函數可能失敗,造成僞喚醒,
          //是以通過while循環再次進行判定
          //隊列為空不能push等待消費者Pop
          pthread_cond_wait(&full,&lock);
        }
        q.push(in);
        pthread_mutex_unlock(&lock);
        if(q.size()>=cap/2)
        {
          cout<<"資料太多了,消費者來消費吧"<<endl;
          pthread_cond_signal(&empty);
        }
      }
      void Pop(T& out)
      {
        pthread_mutex_lock(&lock);
        while(isEmpty())
        {
          pthread_cond_wait(&empty,&lock);
        }
        out=q.front();
        q.pop();
        pthread_mutex_unlock(&lock);
        if(q.size()<=cap/2)
        {
          cout<<"資料已經快不夠了,生産者來生産吧"<<endl;
          pthread_cond_signal(&full);
  
        }
      }
      ~blockingqueue()
      {
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&full);
        pthread_cond_destroy(&empty);
      }
    public:
      queue<T> q;
    private:
      int cap;
      pthread_mutex_t lock;
      pthread_cond_t full;
      pthread_cond_t empty;
  };      
           

main函數

#include"blockingqueue.c"
  #include<unistd.h>
  void* Consumer(void* arg)
  {
    auto bq=(blockingqueue<int>*) arg;
    while(true)
    {
      int data=0;//輸出型參數
      bq->Pop(data);
      cout<<"consumer:"<<data<<"目前個數:"<<bq->blockingqueue::q.size()<<endl;
      sleep(3);                                                                                                                                 
    }
  }
  void* Producer(void* arg)
  {
    auto bq=(blockingqueue<int>*) arg;
    while(true)
    {
      int data=rand()%100+1;
      bq->Push(data);
      cout<<"Producer:"<<data<<"目前個數:"<<bq->blockingqueue::q.size()<<endl;
      sleep(1);
    }
  }
  int main()
  {
    srand((unsigned long)time(NULL));
	blockingqueue<int>* bq=new blockingqueue<int>();
    pthread_t c,p;
    pthread_create(&c,NULL,Consumer,bq);
    pthread_create(&p,NULL,Producer,bq);
    pthread_join(c,NULL);
    pthread_join(p,NULL);
    return 0;
  }               
           

POSIX信号量

信号量本身是一個計數器,描述臨界資源的計數器

擁有更細粒度的臨界資源的管理

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,達到無沖突的通路共享資源目的。 但POSIX可以用于線程間同步和互斥

線程的互斥與同步

P操作:

①信号量S減1;

②若S減1後仍大于或等于0,則程序繼續執行;

③若S減1後小于0,則該程序被阻塞後放入等待該信号量的等待隊列中

V操作:

①S加1;

②若相加後結果大于0,則程序繼續執行;

③若相加後結果小于或等于0,則從該信号的等待隊列中釋放一個等待程序,然後再傳回原程序繼續執行或轉程序排程

初始化信号量

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

參數:

pshared:0表示線程間共享,非零表示程序間共享

value:信号量初始值,大小表示可以同時通路臨界資源的線程數

銷毀信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,會将信号量的值減1

int sem_wait(sem_t *sem); //P()

釋出信号量

功能:釋出信号量,表示資源使用完畢,可以歸還資源了。将信号量值加1

int sem_post(sem_t *sem);//V()

信号量版本實作線程安全的搶票程式

#include<iostream>                                                                                                                                                                                                                                                                                                      
  #include<string>
  #include<pthread.h>
  #include<unistd.h>
  #include<semaphore.h>
  using namespace std;
  class Sem
  {
    private:
      sem_t sem;
    public:
      Sem(int num)
      {
        sem_init(&sem,0,num);
      }
      void P()
      {
        sem_wait(&sem);
      }
      void V()
      {
        sem_post(&sem);
      }
      ~Sem()
      {
        sem_destroy(&sem);
      }
  };
  Sem sem(1);//隻能有一個人在搶
  int tickets=100;
  void* Gettickets(void* arg)
  {
    string s=(char*)arg;
    while(true)
    {
      sleep(1);
      sem.P();
      if(tickets>0)
      {
        usleep(10000);
        cout<<s<<"get tickets:"<<tickets--<<endl;
        sem.V();
      }
      else
      {
        sem.V();
        break;
      }
    }
    pthread_exit((void*)0);
  }
  int main()
  {
    pthread_t t1,t2,t3;
    pthread_create(&t1,NULL,Gettickets,(void*)"thread t1");
    pthread_create(&t2,NULL,Gettickets,(void*)"thread t2");
    pthread_create(&t3,NULL,Gettickets,(void*)"thread t3");
    pthread_join(t1,NULL);
    pthread_join(t2,NULL);
    pthread_join(t3,NULL);
    return 0;
  }

           
線程的互斥與同步

基于環形隊列的生産消費模型

線程的互斥與同步

通過信号量實作:

#include<iostream
  #include<unistd.h>
  #include<pthread.h>
  #include<stdlib.h>
  #include<semaphore.h>
  #include<vector>
  using namespace std;
  #define NUM 5
  template<class T>
  class ring
  {
    private:
      sem_t sem_blank;
      sem_t sem_data;
      vector<T> v;
      int _cap;
      int c_pos;
      int p_pos;
      void P(sem_t& t)
      {
        sem_wait(&t);
      }
      void V(sem_t& t)
      {
        sem_post(&t);
      }
    public:
      ring(int num=NUM)
        :_cap(num)
        ,c_pos(0)
        ,p_pos(0)
      {
        v.resize(_cap);
        sem_init(&sem_data,0,0);
        sem_init(&sem_blank,0,_cap);
  
      }
      void Push(T& in)
      {
        P(sem_blank);
        v[p_pos]=in;
        V(sem_data);
        p_pos++;
        p_pos%=_cap;
      }
      void Pop(T& out)
      {
        P(sem_data);
        out=v[c_pos];
        V(sem_blank);
        c_pos++;
        c_pos%=_cap;
      }
      ~ring()
      {
        sem_destroy(&sem_data);
        sem_destroy(&sem_blank);
      }
  };
  void* produce(void* arg)
  {
    ring<int>* rq=(ring<int>*) arg;
    while(true)
    {
      sleep(1);
      int a=rand()%100+1;
      rq->Push(a);
      cout<<"produce done:"<<a<<endl;
    }
  }
  void* consume(void* arg)
  {
    ring<int>* rq=(ring<int>*) arg;
    while(true)
    {
      sleep(2);
      int a=0;
      rq->Pop(a);
      cout<<"consume done:"<<a<<endl;
    }
  }
  
  int main()
  {
    pthread_t p,c;
    ring<int>* rp=new ring<int>();
    srand((unsigned long)time(NULL));
    pthread_create(&p,NULL,produce,rp);
    pthread_create(&c,NULL,consume,rp);
    pthread_join(p,NULL);
    pthread_join(c,NULL);
    return 0;
  }
           
線程的互斥與同步

該隊列不可能出現資料不一緻的情況

隻有兩種情況生産者和消費者指向同一空間,通路同一臨界資源

1.隊列為空時,消費者不能消費,隻有生産者生成,不存在競争

2.隊列為滿時,生産者不能生産,等待消費者消費,也不存在競争

線程池

一種線程使用模式。線程過多會帶來排程開銷,進而影響緩存局部性和整體性能。而線程池維護着多個線程,等待着監督管理者配置設定可并發執行的任務。這避免了在處理短時間任務時建立與銷毀線程的代價。線程池不僅能夠保證核心的充分利用,還能防止過分排程。可用線程數量應該取決于可用的并發處理器、處理器核心、記憶體、網絡sockets等的數量。
  • 線程池的應用場景:
  1. 需要大量的線程來完成任務,且完成任務的時間比較短。 WEB伺服器完成網頁請求這樣的任務,使用線程池技術是非常合适的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點選次數。 但對于長時間的任務,比如一個Telnet連接配接請求,線程池的優點就不明顯了。因為Telnet會話時間比線程的建立時間大多了。 *
  2. 對性能要求苛刻的應用,比如要求伺服器迅速響應客戶請求。
  3. 接受突發性的大量請求,但不至于使伺服器是以産生大量線程的應用。突發性大量客戶請求,在沒有線程池情況下,将産生大量線程,雖然理論上大部分作業系統線程數目最大值不是問題,短時間内産生大量線程可能使記憶體到達極限,出現錯誤.
線程的互斥與同步

Task.hpp

#pragma once                                                                                               
  #include<iostream>
  using namespace std;
  #include<pthread.h>
  class Task
  {
    private:
      int _x;
      int _y;
      char _op;
    public:
      Task(){}
      Task(int x,int y,char op)
      :_x(x)
      ,_y(y)
       ,_op(op)
      {}
      void run()
      {
        int z=0;
		switch(_op)
        {
     case '+':
      z=_x+_y;
      break;
      case '-':
     z=_x-_y;
      break;
      case '*':
      z=_x*_y;
      case '/':
      if(_y==0)
      {
        cerr<<"div zero"<<endl;
      }
      else 
       z=_x/_y;
      break;
      case '%':
      if(_y==0)
      {
        cerr<<"mod zero"<<endl;
      }
      else 
        z=_x%_y;
      break;
      default:
      cerr<<"default"<<endl;
          }
      cout<<_x<<_op<<_y<<"="<<z<<endl;
      }
  };           
           

Threadpool.hpp

#include<iostream>
  using namespace std;
  #include<pthread.h>
  #include<queue>
  #define  NUM 5                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
  template<class T>
  class ThreadPool
  {
    private:
      queue<T> task_queue;
      int thread_num;
      pthread_mutex_t lock;
      pthread_cond_t cond;
    public:
      ThreadPool(int num=NUM)
        :thread_num(num)
      {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&cond,NULL);
      }
      void LockQueue()
      {
        pthread_mutex_lock(&lock);
      }
      void UnlockQueue()
      {
        pthread_mutex_unlock(&lock);
      }
      void waitcond()
      {
        pthread_cond_wait(&cond,&lock);
      }
      void wakeup()
      {
        pthread_cond_signal(&cond);
      }
      bool isEmptyThreadPool()
      {
        return task_queue.size()==0;
      }
  
      static void* Routine(void* arg)
      {
          pthread_detach(pthread_self());
        ThreadPool* This=(ThreadPool*) arg;
        while(true)
        {
          This->LockQueue();
          if(This->isEmptyThreadPool())
          {
           This->waitcond();
          }
          T t;
         This->Pop(t);
          This->UnlockQueue();
          cout<<"thread:"<<pthread_self()<<"run:";
          t.run();
        }
      }
      void Push(T& in)
      {
        LockQueue();
        task_queue.push(in);
        UnlockQueue();
        wakeup();
      }
      void Pop(T& out)
      {
         out=task_queue.front();
         task_queue.pop();
      }
      void InitThreadPool()
      {
        pthread_t tid;
        for(int i=0;i<thread_num;i++)
        {
          pthread_create(&tid,NULL,Routine,this);
        }
      }
      ~ThreadPool()
      {
        pthread_cond_destroy(&cond);
       pthread_mutex_destroy(&lock);
      }
  };

           

對于Routine函數

線程的互斥與同步

main函數

#include"Task.hpp"                                                                                         
  #include"Threadpool.hpp"
  #include<unistd.h>
  int main()
  {
    ThreadPool<Task>* tp=new ThreadPool<Task>();
    tp->InitThreadPool();
    srand((unsigned int)time(NULL));
    const char* op="+-*/%";
    while(true)
    {
      sleep(1);
      int x=rand()%100+1;
      int y=rand()%100+1;
      Task t(x,y,op[x%5]);
      tp->Push(t);
    }
  
    return 0;
  }
           
線程的互斥與同步

線程安全的單例模式

某些類, 隻應該具有一個對象(執行個體), 就稱之為單例

單例模式通常分為餓漢模式和懶漢模式

吃完飯, 立刻洗碗, 這種就是餓漢方式. 因為下一頓吃的時候可以立刻拿着碗就能吃飯.

吃完飯, 先把碗放下, 然後下一頓飯用到這個碗了再洗碗, 就是懶漢方式.

懶漢方式最核心的思想是 “延時加載”,“需要時再加載” 進而能夠優化伺服器的啟動速度以及有效利用記憶體

餓漢方式實作單例模式

template <typename T> 
class Singleton 
{  
	static T data; 
	public:  
	static T* GetInstance() 
	{  
		return &data;  
	} 
}; 
           

懶漢方式實作單例模式(線程安全版本)

template <typename T> 
class Singleton {  
	volatile static T* inst; // 需要設定 volatile 關鍵字, 否則可能被編譯器優化.  
	static std::mutex lock; 
	public:  
	static T* GetInstance() 
	{  
		if (inst == NULL) 
		{ // 雙重判定空指針, 降低鎖沖突的機率, 提高性能.  
			lock.lock(); // 使用互斥鎖, 保證多線程情況下也隻調用一次 new.  
			if (inst == NULL) 
			{  
				inst = new T();  
			}  
		lock.unlock(); 
		 }  
		 return inst;  
		 } 
	}; 
           

注意事項:

  1. 加鎖解鎖的位置
  2. 雙重 if 判定, 避免不必要的鎖競争
  3. volatile關鍵字防止過度優化

STL,智能指針線程安全問題

STL中的容器是否是線程安全的

不是

原因是, STL 的設計初衷是将性能挖掘到極緻, 而一旦涉及到加鎖保證線程安全, 會對性能造成巨大的影響.

而且對于不同的容器, 加鎖方式的不同, 性能可能也不同(例如hash表的鎖表和鎖桶).

是以 STL 預設不是線程安全. 如果需要在多線程環境下使用, 往往需要調用者自行保證線程安全.

智能指針是否是線程安全的?

對于 unique_ptr:

由于隻是在目前代碼塊範圍内生效, 是以不涉及線程安全問題

對于 shared_ptr,:

多個對象需要共用一個引用計數變量, 是以會存線上程安全問題. 但是标準庫實作的時候考慮到了這個問題, 基于原子操作(CAS)的方式保證 shared_ptr 能夠高效, 原子的操作引用計數.

其他常見的各種鎖

悲觀鎖:在每次取資料時,總是擔心資料會被其他線程修改,是以會在取資料前先加鎖(讀鎖,寫鎖,行鎖等),當其他線程想要通路資料時,被阻塞挂起。

樂觀鎖:每次取資料時候,總是樂觀的認為資料不會被其他線程修改,是以不上鎖。但是在更新資料前,會判斷其他資料在更新前有沒有對資料進行修改。主要采用兩種方式:版本号機制和CAS操作。

CAS操作:當需要更新資料時,判斷目前記憶體值和之前取得的值是否相等。如果相等則用新值更新。若不等則失敗,失敗則重試,一般是一個自旋的過程,即不斷重試。

自旋鎖:

線程的互斥與同步

讀寫者問題

線程的互斥與同步

初始化

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);

銷毀

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

加鎖和解鎖

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

多線程下:

如果讀寫寫者同時到來,如果有人還在讀,則不讓讀者進入臨界區,讓寫者進入并寫完後再讀,稱之為寫者優先

反之,為讀者優先

示例:

線程的互斥與同步
線程的互斥與同步

繼續閱讀