天天看點

眉目傳情之并發無鎖環形隊列的實作

眉目傳情之并發無鎖環形隊列的實作

  • Author:Echo Chen(陳斌)
  • Email:[email protected]
  • Blog:Blog.csdn.net/chen19870707
  • Date:October 10th, 2014

           前面在《眉目傳情之匠心獨運的kfifo》一文中詳細解析了 linux  核心并發無鎖環形隊列kfifo的原理和實作,kfifo鬼斧神工,博大精深,讓人歎為觀止,但遺憾的是kfifo為核心提供服務,并未開放出來。劍不試則利鈍暗,弓不試則勁撓誣,鷹不試則巧拙惑,馬不試則良驽疑,光說不練是不能學到精髓的,下面就動手實作自己的并發無鎖隊列UnlockQueue(單生産者單消費者)。

    一、UnlockQueue聲明

    1: #ifndef _UNLOCK_QUEUE_H      
    2: #define _UNLOCK_QUEUE_H      
    3:        
    4: class UnlockQueue      
    5: {      
    6: public:      
    7:     UnlockQueue(int nSize);      
    8:     virtual ~UnlockQueue();      
    9:        
    10:     bool Initialize();      
    11:        
    12:     unsigned int Put(const unsigned char *pBuffer, unsigned int nLen);      
    13:     unsigned int Get(unsigned char *pBuffer, unsigned int nLen);      
    14:        
    15:     inline void Clean() { m_nIn = m_nOut = 0; }      
    16:     inline unsigned int GetDataLen() const { return  m_nIn - m_nOut; }      
    17:        
    18: private:      
    19:     inline bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); };      
    20:     inline unsigned long roundup_power_of_two(unsigned long val);      
    21:        
    22: private:      
    23:     unsigned char *m_pBuffer;    /* the buffer holding the data */      
    24:     unsigned int   m_nSize;        /* the size of the allocated buffer */      
    25:     unsigned int   m_nIn;        /* data is added at offset (in % size) */      
    26:     unsigned int   m_nOut;        /* data is extracted from off. (out % size) */      
    27: };      
    28:        
    29: #endif      
    UnlockQueue與kfifo 結構相同相同,也是由一下變量組成:
    UnlockQueue kfifo 作用
    m_pBuffer buffer 用于存放資料的緩存
    m_nSize size 緩沖區空間的大小,圓整為2的次幂
    m_nIn in 指向buffer中隊頭
    m_nOut out 指向buffer中的隊尾
    UnlockQueue的設計是用在單生産者單消費者情況下,是以不需要鎖 lock 如果使用不能保證任何時間最多隻有一個讀線程和寫線程,必須使用該lock實施同步。

    二、UnlockQueue構造函數和初始化

    1: UnlockQueue::UnlockQueue(int nSize)      
    2: :m_pBuffer(NULL)      
    3: ,m_nSize(nSize)      
    4: ,m_nIn(0)      
    5: ,m_nOut(0)      
    6: {      
    7:     //round up to the next power of 2      
    8:     if (!is_power_of_2(nSize))      
    9:     {      
    10:         m_nSize = roundup_power_of_two(nSize);      
    11:     }      
    12: }      
    13:        
    14: UnlockQueue::~UnlockQueue()      
    15: {      
    16:     if(NULL != m_pBuffer)      
    17:     {      
    18:         delete[] m_pBuffer;      
    19:         m_pBuffer = NULL;      
    20:     }      
    21: }      
    22:        
    23: bool UnlockQueue::Initialize()      
    24: {      
    25:     m_pBuffer = new unsigned char[m_nSize];      
    26:     if (!m_pBuffer)      
    27:     {      
    28:         return false;      
    29:     }      
    30:        
    31:     m_nIn = m_nOut = 0;      
    32:        
    33:     return true;      
    34: }      
    35:        
    36: unsigned long UnlockQueue::roundup_power_of_two(unsigned long val)      
    37: {      
    38:     if((val & (val-1)) == 0)      
    39:         return val;      
    40:        
    41:     unsigned long maxulong = (unsigned long)((unsigned long)~0);      
    42:     unsigned long andv = ~(maxulong&(maxulong>>1));      
    43:     while((andv & val) == 0)      
    44:         andv = andv>>1;      
    45:        
    46:     return andv<<1;      
    47: }      

    1.在構造函數中,對傳入的size進行2的次幂圓整,圓整的好處是可以将m_nIn % m_nSize 可以轉化為 m_nIn  & (m_nSize – 1),取模運算”的效率并沒有 “位運算” 的效率高。

    2.在構造函數中,未給buffer配置設定記憶體,而在Initialize中配置設定,這樣做的原因是:我們知道在new UnlockQueue的時候有兩步操作,第一步配置設定記憶體,第二步調用構造函數,如果将buffer的配置設定放在構造函數中,那麼就可能 buffer 就可能配置設定失敗,而後面用到buffer,還需要判空。

    三、UnlockQueue入隊和出隊操作

    1: unsigned int UnlockQueue::Put(const unsigned char *buffer, unsigned int len)      
    2: {      
    3:     unsigned int l;      
    4:        
    5:     len = std::min(len, m_nSize - m_nIn + m_nOut);      
    6:        
    7:     /*      
    8:      * Ensure that we sample the m_nOut index -before- we      
    9:      * start putting bytes into the UnlockQueue.      
    10:      */      
    11:     __sync_synchronize();      
    12:        
    13:     /* first put the data starting from fifo->in to buffer end */      
    14:     l = std::min(len, m_nSize - (m_nIn  & (m_nSize - 1)));      
    15:     memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l);      
    16:        
    17:     /* then put the rest (if any) at the beginning of the buffer */      
    18:     memcpy(m_pBuffer, buffer + l, len - l);      
    19:        
    20:     /*      
    21:      * Ensure that we add the bytes to the kfifo -before-      
    22:      * we update the fifo->in index.      
    23:      */      
    24:     __sync_synchronize();      
    25:        
    26:     m_nIn += len;      
    27:        
    28:     return len;      
    29: }      
    30:        
    31: unsigned int UnlockQueue::Get(unsigned char *buffer, unsigned int len)      
    32: {      
    33:     unsigned int l;      
    34:        
    35:     len = std::min(len, m_nIn - m_nOut);      
    36:        
    37:     /*      
    38:      * Ensure that we sample the fifo->in index -before- we      
    39:      * start removing bytes from the kfifo.      
    40:      */      
    41:     __sync_synchronize();      
    42:        
    43:     /* first get the data from fifo->out until the end of the buffer */      
    44:     l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1)));      
    45:     memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l);      
    46:        
    47:     /* then get the rest (if any) from the beginning of the buffer */      
    48:     memcpy(buffer + l, m_pBuffer, len - l);      
    49:        
    50:     /*      
    51:      * Ensure that we remove the bytes from the kfifo -before-      
    52:      * we update the fifo->out index.      
    53:      */      
    54:     __sync_synchronize();      
    55:        
    56:     m_nOut += len;      
    57:        
    58:     return len;      
    59: }      
        入隊和出隊操作與kfifo相同,用到的技巧也完全相同,有不了解的童鞋可以參考前面一篇文章《眉目傳情之匠心獨運的kfifo》。這裡需要指出的是__sync_synchronize()函數,由于linux并未開房出記憶體屏障函數,而在gcc4.2以上版本提供This builtin issues a full memory barrier,有興趣同學可以參考Built-in functions for atomic memory access。

    四、測試程式

    如圖所示,我們設計了兩個線程,一個生産者随機生成學生資訊放入隊列,一個消費者從隊列中取出學生資訊并列印,可以看到整個代碼是無鎖的。
    眉目傳情之并發無鎖環形隊列的實作
    1: #include "UnlockQueue.h"      
    2: #include <iostream>      
    3: #include <algorithm>      
    4: #include <pthread.h>      
    5: #include <time.h>      
    6: #include <stdio.h>      
    7: #include <errno.h>      
    8: #include <string.h>      
    9:        
    10: struct student_info      
    11: {      
    12:    long stu_id;      
    13:    unsigned int age;      
    14:    unsigned int score;      
    15: };      
    16:        
    17: void print_student_info(const student_info *stu_info)      
    18: {      
    19:     if(NULL == stu_info)      
    20:         return;      
    21:        
    22:     printf("id:%ld\t",stu_info->stu_id);      
    23:     printf("age:%u\t",stu_info->age);      
    24:     printf("score:%u\n",stu_info->score);      
    25: }      
    26:        
    27: student_info * get_student_info(time_t timer)      
    28: {      
    29:      student_info *stu_info = (student_info *)malloc(sizeof(student_info));      
    30:      if (!stu_info)      
    31:      {      
    32:         fprintf(stderr, "Failed to malloc memory.\n");      
    33:         return NULL;      
    34:      }      
    35:      srand(timer);      
    36:      stu_info->stu_id = 10000 + rand() % 9999;      
    37:      stu_info->age = rand() % 30;      
    38:      stu_info->score = rand() % 101;      
    39:      //print_student_info(stu_info);      
    40:      return stu_info;      
    41: }      
    42:        
    43: void * consumer_proc(void *arg)      
    44: {      
    45:      UnlockQueue* queue = (UnlockQueue *)arg;      
    46:      student_info stu_info;      
    47:      while(1)      
    48:      {      
    49:          sleep(1);      
    50:          unsigned int len = queue->Get((unsigned char *)&stu_info, sizeof(student_info));      
    51:          if(len > 0)      
    52:          {      
    53:              printf("------------------------------------------\n");      
    54:              printf("UnlockQueue length: %u\n", queue->GetDataLen());      
    55:              printf("Get a student\n");      
    56:              print_student_info(&stu_info);      
    57:              printf("------------------------------------------\n");      
    58:          }      
    59:      }      
    60:      return (void *)queue;      
    61: }      
    62:        
    63: void * producer_proc(void *arg)      
    64:  {      
    65:       time_t cur_time;      
    66:       UnlockQueue *queue = (UnlockQueue*)arg;      
    67:       while(1)      
    68:       {      
    69:           time(&cur_time);      
    70:           srand(cur_time);      
    71:           int seed = rand() % 11111;      
    72:           printf("******************************************\n");      
    73:           student_info *stu_info = get_student_info(cur_time + seed);      
    74:           printf("put a student info to queue.\n");      
    75:           queue->Put( (unsigned char *)stu_info, sizeof(student_info));      
    76:           free(stu_info);      
    77:           printf("UnlockQueue length: %u\n", queue->GetDataLen());      
    78:           printf("******************************************\n");      
    79:           sleep(1);      
    80:       }      
    81:      return (void *)queue;      
    82:   }      
    83:        
    84:        
    85: int main()      
    86: {      
    87:     UnlockQueue unlockQueue(1024);      
    88:     if(!unlockQueue.Initialize())      
    89:     {      
    90:         return -1;      
    91:     }      
    92:        
    93:     pthread_t consumer_tid, producer_tid;      
    94:        
    95:     printf("multi thread test.......\n");      
    96:        
    97:     if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue))      
    98:     {      
    99:          fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",      
    100:                  errno, strerror(errno));      
    101:          return -1;      
    102:     }      
    103:        
    104:     if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue))      
    105:     {      
    106:            fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",      
    107:                    errno, strerror(errno));      
    108:            return -1;      
    109:     }      
    110:        
    111:     pthread_join(producer_tid, NULL);      
    112:     pthread_join(consumer_tid, NULL);      
    113:        
    114:     return 0;      
    115:  }      
    運作結果:
    眉目傳情之并發無鎖環形隊列的實作

    -

    Echo Chen:Blog.csdn.net/chen19870707

    -