天天看點

多核分布式隊列的實作:“偷”與“自私”的運用(4)

在設計CDistributeQueue類時,通常有兩種方案值得考慮:

1、 本地隊列預先建立好,當有線程通路時就可以直接根據線程編号去通路對應的本地隊列。

2、 不預先建立本地隊列,當線程第一次通路分布式隊列時,由于擷取不到線程編号,由此可以斷定本線程是第1次通路分布式隊列,此時才建立本地隊列。

方案1和方案2可以說各有優缺點。方案1中,在事先不知道有多少線程會通路分布式隊列的情況下,預先建立好本地隊列會造成程式初始化時間過長,并且可能有一些建立好的隊列得不到使用。

方案2中,采用線程通路分布式隊列時才建立本地隊列,初始化時比較簡單,并且不會造成多建立了本地隊列的情況。缺點是程式設計時,隊列的操作代碼會變複雜一些,效率會有所降低。

下面的代碼中,給出的是方案2的實作。

1) 類的原型定義和成員函數

//擷取線程Id回調函數定義

typedef int (*GetThreadIdFunc)(void *pArg);

template <class T, class LocalQueue, class SharedQueue>

class CDistributedQueue {

private:

    LocalQueue **   m_ppLocalQueue;    // 本地隊列數組

    SharedQueue *   m_pSharedQueue;    // 共享隊列池或共享隊列

    int             m_nLocalQueueSize;

    int             m_nSharedQueueSize;

    int             m_nLocalQueueCount;

    int             m_nSharedQueueCount;

    DWORD       m_dwTlsIndex;        //線程本地存儲索引

    LONG volatile   m_lThreadIdIndex;     //線程編号最大值

    GetThreadIdFunc m_GetThreadIdFunc;   //擷取線程編号回調函數,如果由外面

                                                                  //的線程池提供編号時,需要傳入回調函數

    void *          m_pThreadIdFuncArg;  //擷取線程編号回調函數的參數

 CFastLock       m_LocalQueueResizeLock; //專為下面的ResizeLocalQueue函數使用

 void ResizeLocalQueue();             //将m_ppLocalQueue數組的大小擴大一倍

public:

    CDistributedQueue(){

        m_GetThreadIdFunc = NULL;

        m_pThreadIdFuncArg = NULL;

        m_lThreadIdIndex = 0;

    };

    void Create( int nLocalQueueSize, int nLocalQueueCount,

        int nSharedQueueSize, int nSharedQueueCount);

        int nSharedQueueSize, int nSharedQueueCount,

        GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg);

    virtual ~CDistributedQueue();

    LONG ThreadIdGet();

    void EnQueue(T &Data);

    int  DeQueue(T &Data);

    void PushToLocalQueue(T &Data);

    void PushToLocalQueue(T &Data, int nIndex);

    int PopFromLocalQueue(T &Data);

    SharedQueue *GetSharedQueue() { return m_pSharedQueue; };

    int PrivatizeSharedQueue(int nSharedQueueIndex);

};

說明一下:CDistributedQueue類中有三個模闆參數,第1個模闆參數T是表示資料類型;第2個模闆參數是表示本地隊列類的類型,為一 個不需要使用鎖的普通隊列,比如環形隊列等;第3個模闆參數是表示一個需要使用鎖的共享隊列類,可以是一個隊列池類,也可以是普通的使用鎖的共享隊列類。

1) 構造函數和析構函數代碼

/**   分布式隊列的建立函數

       @param  int nLocalQueueSize - 本地子隊列的大小 

       @param  int nLocalQueueCount - 本地隊列的個數(數組的大小)   

       @param  int nSharedQueueSize - 共享子隊列的大小      

       @param  int nSharedQueueCount - 共享子隊列的個數    

       @return  void - 無

*/

void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(

                   int nLocalQueueSize, int nLocalQueueCount,

                   int nSharedQueueSize, int nSharedQueueCount)

{

    m_nLocalQueueSize = nLocalQueueSize;

    m_nSharedQueueSize = nSharedQueueSize;

    if ( nLocalQueueCount != 0 )

    {

        m_nLocalQueueCount = nLocalQueueCount;

    }

    else

        m_nLocalQueueCount = omp_get_num_procs();

    if ( nSharedQueueCount != 0 )

        m_nSharedQueueCount = nSharedQueueCount;

        m_nSharedQueueCount = omp_get_num_procs();

    m_ppLocalQueue =  new LocalQueue *[m_nLocalQueueCount];

    int i;

    for ( i = 0; i < m_nLocalQueueCount; i++ )

        m_ppLocalQueue[i] = NULL;

    m_pSharedQueue = new SharedQueue(m_nSharedQueueCount, m_nSharedQueueSize);

    m_dwTlsIndex = TlsAlloc();

    m_lThreadIdIndex = 0;

}

       @param  GetThreadIdFunc GetThreadId - 擷取線程Id回調函數  

       @param  void * pThreadIdFuncArg - GetThreadId回調函數的參數

    int nLocalQueueSize, int nLocalQueueCount,

    int nSharedQueueSize, int nSharedQueueCount,

    GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg)

    m_GetThreadIdFunc = GetThreadId;

    m_pThreadIdFuncArg = pThreadIdFuncArg;

    Create(nLocalQueueSize, nLocalQueueCount, nSharedQueueSize, nSharedQueueCount);

/**   分布式隊列的析構函數

       @return  - 無     

CDistributedQueue<T, LocalQueue, SharedQueue>::~CDistributedQueue()

        if ( m_ppLocalQueue[i] != NULL )

        {

            delete m_ppLocalQueue[i];

        }

    delete [] m_ppLocalQueue;

    delete m_pSharedQueue;

    TlsFree(m_dwTlsIndex);

2) 将本地隊列數組擴大一倍的内部成員函數代碼

這個函數主要是考慮有可能程式更新後,通路的線程數量可能大于本地隊列數組的大小的情況,此時采取将本地隊列數組擴大一倍的政策。

/**   分布式隊列的将本地隊列數組擴大一倍的内部成員函數

void CDistributedQueue<T, LocalQueue, SharedQueue>::ResizeLocalQueue()

    //将本地隊列數組擴大一倍, 防止線程數量多于隊列數量,以保證程式安全

    LocalQueue **ppQueue = new LocalQueue *[m_nLocalQueueCount * 2];

        ppQueue[i] = m_ppLocalQueue[i];

    for ( i = m_nLocalQueueCount; i < m_nLocalQueueCount * 2; i++ )

        ppQueue[i] = NULL;

    m_ppLocalQueue = ppQueue;

    //使用原子操作避免m_nLocalQueueCount的資料競争問題

    AtomicWrite((LONG volatile *)&m_nLocalQueueCount, m_nLocalQueueCount * 2);

3) 擷取線程Id成員函數代碼

擷取線程Id成員函數中,這個函數中完成本地隊列的建立和分派工作。先是判斷擷取的線程Id是否為0,如果為0則表明還沒有建立本地隊列,此時需要給線程進行編号,并建立一個新的本地隊列放到數組中下标等于線程編号的位置上。

/**   分布式隊列的擷取線程Id函數

       如果m_GetThreadIdFunc回調函數不為空,則使用它擷取Id

       否則根據分布式隊列内部的編号機制擷取線程Id

       @return  LONG - 傳回線程的編号   

LONG CDistributedQueue<T, LocalQueue, SharedQueue>::ThreadIdGet()

    LONG Id;

    LocalQueue *pQueue = NULL;

    if ( m_GetThreadIdFunc != NULL )

        Id = (*m_GetThreadIdFunc)(m_pThreadIdFuncArg);

        if ( Id >= m_nLocalQueueCount )

            CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);

            if ( Id >= m_nLocalQueueCount )

            {

                ResizeLocalQueue();

            }

        if ( m_ppLocalQueue[Id] == NULL )

            m_ppLocalQueue[Id] = new LocalQueue(m_nLocalQueueSize);

        return Id;

        Id = (LONG )TlsGetValue(m_dwTlsIndex);

        if ( Id == 0 )

            Id = AtomicIncrement(&m_lThreadIdIndex);

            TlsSetValue(m_dwTlsIndex, (void *)Id);

            pQueue = new LocalQueue(m_nLocalQueueSize);

        --Id;

    if ( Id >= m_nLocalQueueCount)

        CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);

            ResizeLocalQueue();

    if ( pQueue != NULL )

        m_ppLocalQueue[Id] = pQueue;

return Id;

4) 進隊操作政策1的進隊操作代碼

/**   分布式隊列的進隊操作函數

       這裡假定了本地隊列可以無限進隊

       進隊政策按以下優先級進行:

       1、本地隊列空時進入本地隊列,、共享隊列未滿時進入共享隊列

       3、共享隊列滿時進入本地隊列

       @param  T &Data - 要進隊的資料    

void CDistributedQueue<T, LocalQueue, SharedQueue>::EnQueue(T &Data)

    int nId = ThreadIdGet();

    if ( m_ppLocalQueue[nId]->IsEmpty() )

        m_ppLocalQueue[nId]->EnQueue(Data);

    else if ( m_pSharedQueue->Push(Data) != CAPI_SUCCESS )

        int nId = ThreadIdGet();

        m_ppLocalQueue[nId]->EnQueue(Data);

        //這個分支不需要做任何事

    return;

5) 本地隊列的操作代碼

/**   分布式隊列的本地隊列進隊函數

       将資料進入到目前線程的本地隊列中

void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(

T &Data)

    m_ppLocalQueue[nId]->EnQueue(Data);

/**   分布式隊列的指定序号本地隊列進隊函數

    這是一個為特殊需求而設計的函數

    使用這個函數要特别小心,必須保證不會發生資料競争問題

       @param  int nIndex - 本地隊列的序号     

T &Data, int nIndex)

    if ( nIndex >= m_nLocalQueueCount * 2)

        return;

    if ( nIndex >= m_nLocalQueueCount )

        if ( nIndex >= m_nLocalQueueCount )

    if ( m_ppLocalQueue[nIndex] == NULL )

        m_ppLocalQueue[nIndex] = new LocalQueue(m_nLocalQueueSize);

    m_ppLocalQueue[nIndex]->EnQueue(Data);

/**   分布式隊列的本地隊列出隊函數

       @param  T &Data - 接收出隊的資料

       @return  int - 出隊成功傳回CAPI_SUCCESS, 失敗(隊列為空)傳回CAPI_FAILED.      

int CDistributedQueue<T, LocalQueue, SharedQueue>::PopFromLocalQueue(

    return m_ppLocalQueue[nId]->DeQueue(Data);

6) 出隊操作代碼

/**   分布式隊列的出隊函數

    出隊操作政策為,先從本地隊列中出隊,如果失敗則從共享隊列中出隊

       @return  int - 成功傳回CAPI_SUCCESS, 失敗傳回CAPI_FAILED.    

int CDistributedQueue<T, LocalQueue, SharedQueue>::DeQueue(T &Data)

    int nRet;

    nRet = m_ppLocalQueue[nId]->DeQueue(Data);

    if ( nRet == CAPI_FAILED )

        nRet = m_pSharedQueue->Pop(Data);

    return nRet;  

本文轉自Intel_ISN 51CTO部落格,原文連結:http://blog.51cto.com/intelisn/130452,如需轉載請自行聯系原作者