天天看點

線程安全的環形緩沖區實作

來源:http://blog.csdn.net/lezhiyong

    應用背景:線程1将每次數量不一的音頻采樣點(PCM音頻資料)寫入環形緩沖區,線程2每次取固定數量采樣點送音頻編碼器,線程1線程2在平均時間内的讀寫資料量相等。(倒入桶中的水量有時大有時小,但每次取一瓢喝:)

   該環形緩沖區借鑒CoolPlayer音頻播放器中的環形緩沖區代碼實作,在讀寫操作函數中加了鎖,允許多線程同時操作。CPs_CircleBuffer基于記憶體段的讀寫,比用模闆實作的環形緩沖隊列适用的資料類型更廣些, CPs_CircleBuffer修改成C++中基于對象的實作,加上詳細注釋,m_csCircleBuffer鎖變量為自用的lock類型(将CRITICAL_SECTION封裝起來),調用lock()加鎖,調用unlock()解鎖。使用效果良好,分享出來。

CPs_CircleBuffer環形緩沖還不具備當待寫資料量超出空餘緩沖時自動配置設定記憶體的功能,這個将在後續進行優化。

CPs_CircleBuffer使用步驟:

1、建立對象  

CPs_CircleBuffer* m_pCircleBuffer;  

m_pCircleBuffer = new CPs_CircleBuffer(bufsize);  

2、寫  

if (m_pCircleBuffer->GetFreeSize() < CIC_READCHUNKSIZE)  

 {  

     Sleep(20);  

     continue;  

 }  

m_pCircleBuffer->Write(internetbuffer.lpvBuffer,internetbuffer.dwBufferLength);  

3、讀  

m_pCircleBuffer->Read(pDestBuffer,iBytesToRead, piBytesRead);  

4、其他調用  

if(m_pCircleBuffer->IsComplete())  

    break;          

iUsedSpace =m_pCircleBuffer->GetUsedSize();  

m_pCircleBuffer->SetComplete();  

CPs_CircleBuffer修改為類的定義:

class  CPs_CircleBuffer  

{  

public:  

       CPs_CircleBuffer(const unsigned int iBufferSize);  

       ~CPs_CircleBuffer();  

        // Public functions  

        void  Uninitialise();  

        void  Write(const void* pSourceBuffer, const unsigned int iNumBytes);  

        bool  Read(void* pDestBuffer, const size_t iBytesToRead, size_t* pbBytesRead);  

        void  Flush();  

        unsigned int GetUsedSize();  

        unsigned int GetFreeSize();  

        void  SetComplete();  

        bool  IsComplete();  

private:         

        unsigned char*  m_pBuffer;  

        unsigned int    m_iBufferSize;  

        unsigned int    m_iReadCursor;  

        unsigned int    m_iWriteCursor;  

        HANDLE          m_evtDataAvailable;  

        Vlock           m_csCircleBuffer;  

        bool            m_bComplete;        

};  

CPs_CircleBuffer修改為類的實作:

#define CIC_WAITTIMEOUT  3000  

CPs_CircleBuffer::CPs_CircleBuffer(const unsigned int iBufferSize)  

    m_iBufferSize = iBufferSize;  

    m_pBuffer = (unsigned char*)malloc(iBufferSize);  

    m_iReadCursor = 0;  

    m_iWriteCursor = 0;  

    m_bComplete = false;  

    m_evtDataAvailable = CreateEvent(NULL, FALSE, FALSE, NULL);  

}  

CPs_CircleBuffer::~CPs_CircleBuffer()  

    Uninitialise();  

// Public functions  

void CPs_CircleBuffer::Uninitialise()//沒有必要public這個接口函數,long120817  

    CloseHandle(m_evtDataAvailable);  

    free(m_pBuffer);  

//Write前一定要調用m_pCircleBuffer->GetFreeSize(),如果FreeSize不夠需要等待,long120817  

void  CPs_CircleBuffer::Write(const void* _pSourceBuffer, const unsigned int _iNumBytes)  

    unsigned int iBytesToWrite = _iNumBytes;  

    unsigned char* pSourceReadCursor = (unsigned char*)_pSourceBuffer;  

    //CP_ASSERT(iBytesToWrite <= GetFreeSize());//修改為沒有足夠空間就傳回,write前一定要加GetFreeSize判斷,否則進入到這裡相當于丢掉資料,         // long120817  

    if (iBytesToWrite > GetFreeSize())  

    {  

        return;  

    }  

    _ASSERT(m_bComplete == false);  

    m_csCircleBuffer.Lock();  

    if (m_iWriteCursor >= m_iReadCursor)  

        //              0                                            m_iBufferSize  

        //              |-----------------|===========|--------------|  

        //                                pR->        pW->   

        // 計算尾部可寫空間iChunkSize,long120817  

        unsigned int iChunkSize = m_iBufferSize - m_iWriteCursor;  

        if (iChunkSize > iBytesToWrite)  

        {  

            iChunkSize = iBytesToWrite;  

        }  

        // Copy the data  

        memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iChunkSize);  

        pSourceReadCursor += iChunkSize;  

        iBytesToWrite -= iChunkSize;  

        // 更新m_iWriteCursor  

        m_iWriteCursor += iChunkSize;  

        if (m_iWriteCursor >= m_iBufferSize)//如果m_iWriteCursor已經到達末尾  

            m_iWriteCursor -= m_iBufferSize;//傳回到起點0位置,long120817  

    //剩餘資料從Buffer起始位置開始寫  

    if (iBytesToWrite)  

        memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iBytesToWrite);  

        m_iWriteCursor += iBytesToWrite;  

        _ASSERT(m_iWriteCursor < m_iBufferSize);//這個斷言沒什麼意思,應該_ASSERT(m_iWriteCursor <= m_iReadCursor);long20120817  

    SetEvent(m_evtDataAvailable);//設定資料寫好信号量  

    m_csCircleBuffer.UnLock();  

bool  CPs_CircleBuffer::Read(void* pDestBuffer, const size_t _iBytesToRead, size_t* pbBytesRead)  

    size_t iBytesToRead = _iBytesToRead;  

    size_t iBytesRead = 0;  

    DWORD dwWaitResult;  

    bool bComplete = false;  

    while (iBytesToRead > 0 && bComplete == false)  

        dwWaitResult = WaitForSingleObject(m_evtDataAvailable, CIC_WAITTIMEOUT);//等待資料寫好,long120817  

        if (dwWaitResult == WAIT_TIMEOUT)  

            //TRACE_INFO2("Circle buffer - did not fill in time!");  

            *pbBytesRead = iBytesRead;  

            return FALSE;//等待逾時則傳回  

        m_csCircleBuffer.Lock();  

        if (m_iReadCursor > m_iWriteCursor)  

            //              0                                                    m_iBufferSize  

            //              |=================|-----|===========================|  

            //                                pW->  pR->   

            unsigned int iChunkSize = m_iBufferSize - m_iReadCursor;  

            if (iChunkSize > iBytesToRead)  

                iChunkSize = (unsigned int)iBytesToRead;  

            //讀取操作  

            memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);  

            iBytesRead += iChunkSize;  

            iBytesToRead -= iChunkSize;  

            m_iReadCursor += iChunkSize;  

            if (m_iReadCursor >= m_iBufferSize)//如果m_iReadCursor已經到達末尾  

                m_iReadCursor -= m_iBufferSize;//傳回到起點0位置,long120817  

        if (iBytesToRead && m_iReadCursor < m_iWriteCursor)  

            unsigned int iChunkSize = m_iWriteCursor - m_iReadCursor;  

        //如果有更多的資料要寫  

        if (m_iReadCursor == m_iWriteCursor)  

            if (m_bComplete)//跳出下一個while循環,該值通過SetComplete()設定,此邏輯什麼意思?long120817  

                bComplete = true;  

        else//還有資料可以讀,SetEvent,在下一個while循環開始可以不用再等待,long120817  

            SetEvent(m_evtDataAvailable);  

        m_csCircleBuffer.UnLock();  

    *pbBytesRead = iBytesRead;  

    return bComplete ? false : true;  

//  0                                                m_iBufferSize  

//  |------------------------------------------------|  

//  pR  

//  pW  

//讀寫指針歸零  

void  CPs_CircleBuffer::Flush()  

//擷取已經寫的記憶體  

unsigned int CPs_CircleBuffer::GetUsedSize()  

     return m_iBufferSize - GetFreeSize();  

unsigned int CPs_CircleBuffer::GetFreeSize()  

    unsigned int iNumBytesFree;  

    if (m_iWriteCursor < m_iReadCursor)  

        //              0                                                    m_iBufferSize  

        //              |=================|-----|===========================|  

        //                                pW->  pR->   

        iNumBytesFree = (m_iReadCursor - 1) - m_iWriteCursor;  

    else if (m_iWriteCursor == m_iReadCursor)  

        iNumBytesFree = m_iBufferSize;  

    else  

        //              |-----------------|=====|---------------------------|  

        //                                pR->   pW->   

        iNumBytesFree = (m_iReadCursor - 1) + (m_iBufferSize - m_iWriteCursor);  

    return iNumBytesFree;  

//該函數什麼時候調用?long120817  

void  CPs_CircleBuffer::SetComplete()  

    m_bComplete = true;  

    SetEvent(m_evtDataAvailable);  

附自動初始化和摧毀的鎖對象Vlock的實作:

#ifdef WIN32  

#include <windows.h>  

#define  V_MUTEX            CRITICAL_SECTION //利用臨界區實作的鎖變量  

#define  V_MUTEX_INIT(m)        InitializeCriticalSection(m)  

#define  V_MUTEX_LOCK(m)        EnterCriticalSection(m)  

#define  V_MUTEX_UNLOCK(m)      LeaveCriticalSection(m)  

#define  V_MUTEX_DESTORY(m)     DeleteCriticalSection(m)  

#else  

#define  V_MUTEX                pthread_mutex_t  

#define  V_MUTEX_INIT(m)        pthread_mutex_init(m,NULL)  

#define  V_MUTEX_LOCK(m)        pthread_mutex_Lock(m)  

#define  V_MUTEX_UNLOCK(m)      pthread_mutex_unLock(m)  

#define  V_MUTEX_DESTORY(m)     pthread_mutex_destroy(m)  

#endif  

class  Vlock  

    Vlock(void)  

        V_MUTEX_INIT(&m_Lock);  

    ~Vlock(void)  

        V_MUTEX_DESTORY(&m_Lock);  

    void Lock(){V_MUTEX_LOCK(&m_Lock);}  

    void UnLock(){V_MUTEX_UNLOCK(&m_Lock);}  

private:  

    V_MUTEX m_Lock;  

繼續閱讀