天天看点

多核分布式队列的实现:“偷”与“自私”的运用(4)

在设计CDistributeQueue类时,通常有两种方案值得考虑:

1、 本地队列预先创建好,当有线程访问时就可以直接根据线程编号去访问对应的本地队列。

2、 不预先创建本地队列,当线程第一次访问分布式队列时,由于获取不到线程编号,由此可以断定本线程是第1次访问分布式队列,此时才创建本地队列。

方案1和方案2可以说各有优缺点。方案1中,在事先不知道有多少线程会访问分布式队列的情况下,预先创建好本地队列会造成程序初始化时间过长,并且可能有一些创建好的队列得不到使用。

方案2中,采用线程访问分布式队列时才创建本地队列,初始化时比较简单,并且不会造成多创建了本地队列的情况。缺点是编程时,队列的操作代码会变复杂一些,效率会有所降低。

下面的代码中,给出的是方案2的实现。

1) 类的原型定义和成员函数

//获取线程Id回调函数定义

typedef int (*GetThreadIdFunc)(void *pArg);

template <class T, class LocalQueue, class SharedQueue>

class CDistributedQueue {

private:

    LocalQueue **   m_ppLocalQueue;    // 本地队列数组

    SharedQueue *   m_pSharedQueue;    // 共享队列池或共享队列

    int             m_nLocalQueueSize;

    int             m_nSharedQueueSize;

    int             m_nLocalQueueCount;

    int             m_nSharedQueueCount;

    DWORD       m_dwTlsIndex;        //线程本地存储索引

    LONG volatile   m_lThreadIdIndex;     //线程编号最大值

    GetThreadIdFunc m_GetThreadIdFunc;   //获取线程编号回调函数,如果由外面

                                                                  //的线程池提供编号时,需要传入回调函数

    void *          m_pThreadIdFuncArg;  //获取线程编号回调函数的参数

 CFastLock       m_LocalQueueResizeLock; //专为下面的ResizeLocalQueue函数使用

 void ResizeLocalQueue();             //将m_ppLocalQueue数组的大小扩大一倍

public:

    CDistributedQueue(){

        m_GetThreadIdFunc = NULL;

        m_pThreadIdFuncArg = NULL;

        m_lThreadIdIndex = 0;

    };

    void Create( int nLocalQueueSize, int nLocalQueueCount,

        int nSharedQueueSize, int nSharedQueueCount);

        int nSharedQueueSize, int nSharedQueueCount,

        GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg);

    virtual ~CDistributedQueue();

    LONG ThreadIdGet();

    void EnQueue(T &Data);

    int  DeQueue(T &Data);

    void PushToLocalQueue(T &Data);

    void PushToLocalQueue(T &Data, int nIndex);

    int PopFromLocalQueue(T &Data);

    SharedQueue *GetSharedQueue() { return m_pSharedQueue; };

    int PrivatizeSharedQueue(int nSharedQueueIndex);

};

说明一下:CDistributedQueue类中有三个模板参数,第1个模板参数T是表示数据类型;第2个模板参数是表示本地队列类的类型,为一 个不需要使用锁的普通队列,比如环形队列等;第3个模板参数是表示一个需要使用锁的共享队列类,可以是一个队列池类,也可以是普通的使用锁的共享队列类。

1) 构造函数和析构函数代码

/**   分布式队列的创建函数

       @param  int nLocalQueueSize - 本地子队列的大小 

       @param  int nLocalQueueCount - 本地队列的个数(数组的大小)   

       @param  int nSharedQueueSize - 共享子队列的大小      

       @param  int nSharedQueueCount - 共享子队列的个数    

       @return  void - 无

*/

void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(

                   int nLocalQueueSize, int nLocalQueueCount,

                   int nSharedQueueSize, int nSharedQueueCount)

{

    m_nLocalQueueSize = nLocalQueueSize;

    m_nSharedQueueSize = nSharedQueueSize;

    if ( nLocalQueueCount != 0 )

    {

        m_nLocalQueueCount = nLocalQueueCount;

    }

    else

        m_nLocalQueueCount = omp_get_num_procs();

    if ( nSharedQueueCount != 0 )

        m_nSharedQueueCount = nSharedQueueCount;

        m_nSharedQueueCount = omp_get_num_procs();

    m_ppLocalQueue =  new LocalQueue *[m_nLocalQueueCount];

    int i;

    for ( i = 0; i < m_nLocalQueueCount; i++ )

        m_ppLocalQueue[i] = NULL;

    m_pSharedQueue = new SharedQueue(m_nSharedQueueCount, m_nSharedQueueSize);

    m_dwTlsIndex = TlsAlloc();

    m_lThreadIdIndex = 0;

}

       @param  GetThreadIdFunc GetThreadId - 获取线程Id回调函数  

       @param  void * pThreadIdFuncArg - GetThreadId回调函数的参数

    int nLocalQueueSize, int nLocalQueueCount,

    int nSharedQueueSize, int nSharedQueueCount,

    GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg)

    m_GetThreadIdFunc = GetThreadId;

    m_pThreadIdFuncArg = pThreadIdFuncArg;

    Create(nLocalQueueSize, nLocalQueueCount, nSharedQueueSize, nSharedQueueCount);

/**   分布式队列的析构函数

       @return  - 无     

CDistributedQueue<T, LocalQueue, SharedQueue>::~CDistributedQueue()

        if ( m_ppLocalQueue[i] != NULL )

        {

            delete m_ppLocalQueue[i];

        }

    delete [] m_ppLocalQueue;

    delete m_pSharedQueue;

    TlsFree(m_dwTlsIndex);

2) 将本地队列数组扩大一倍的内部成员函数代码

这个函数主要是考虑有可能程序升级后,访问的线程数量可能大于本地队列数组的大小的情况,此时采取将本地队列数组扩大一倍的策略。

/**   分布式队列的将本地队列数组扩大一倍的内部成员函数

void CDistributedQueue<T, LocalQueue, SharedQueue>::ResizeLocalQueue()

    //将本地队列数组扩大一倍, 防止线程数量多于队列数量,以保证程序安全

    LocalQueue **ppQueue = new LocalQueue *[m_nLocalQueueCount * 2];

        ppQueue[i] = m_ppLocalQueue[i];

    for ( i = m_nLocalQueueCount; i < m_nLocalQueueCount * 2; i++ )

        ppQueue[i] = NULL;

    m_ppLocalQueue = ppQueue;

    //使用原子操作避免m_nLocalQueueCount的数据竞争问题

    AtomicWrite((LONG volatile *)&m_nLocalQueueCount, m_nLocalQueueCount * 2);

3) 获取线程Id成员函数代码

获取线程Id成员函数中,这个函数中完成本地队列的创建和分派工作。先是判断获取的线程Id是否为0,如果为0则表明还没有创建本地队列,此时需要给线程进行编号,并创建一个新的本地队列放到数组中下标等于线程编号的位置上。

/**   分布式队列的获取线程Id函数

       如果m_GetThreadIdFunc回调函数不为空,则使用它获取Id

       否则根据分布式队列内部的编号机制获取线程Id

       @return  LONG - 返回线程的编号   

LONG CDistributedQueue<T, LocalQueue, SharedQueue>::ThreadIdGet()

    LONG Id;

    LocalQueue *pQueue = NULL;

    if ( m_GetThreadIdFunc != NULL )

        Id = (*m_GetThreadIdFunc)(m_pThreadIdFuncArg);

        if ( Id >= m_nLocalQueueCount )

            CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);

            if ( Id >= m_nLocalQueueCount )

            {

                ResizeLocalQueue();

            }

        if ( m_ppLocalQueue[Id] == NULL )

            m_ppLocalQueue[Id] = new LocalQueue(m_nLocalQueueSize);

        return Id;

        Id = (LONG )TlsGetValue(m_dwTlsIndex);

        if ( Id == 0 )

            Id = AtomicIncrement(&m_lThreadIdIndex);

            TlsSetValue(m_dwTlsIndex, (void *)Id);

            pQueue = new LocalQueue(m_nLocalQueueSize);

        --Id;

    if ( Id >= m_nLocalQueueCount)

        CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);

            ResizeLocalQueue();

    if ( pQueue != NULL )

        m_ppLocalQueue[Id] = pQueue;

return Id;

4) 进队操作策略1的进队操作代码

/**   分布式队列的进队操作函数

       这里假定了本地队列可以无限进队

       进队策略按以下优先级进行:

       1、本地队列空时进入本地队列,、共享队列未满时进入共享队列

       3、共享队列满时进入本地队列

       @param  T &Data - 要进队的数据    

void CDistributedQueue<T, LocalQueue, SharedQueue>::EnQueue(T &Data)

    int nId = ThreadIdGet();

    if ( m_ppLocalQueue[nId]->IsEmpty() )

        m_ppLocalQueue[nId]->EnQueue(Data);

    else if ( m_pSharedQueue->Push(Data) != CAPI_SUCCESS )

        int nId = ThreadIdGet();

        m_ppLocalQueue[nId]->EnQueue(Data);

        //这个分支不需要做任何事

    return;

5) 本地队列的操作代码

/**   分布式队列的本地队列进队函数

       将数据进入到当前线程的本地队列中

void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(

T &Data)

    m_ppLocalQueue[nId]->EnQueue(Data);

/**   分布式队列的指定序号本地队列进队函数

    这是一个为特殊需求而设计的函数

    使用这个函数要特别小心,必须保证不会发生数据竞争问题

       @param  int nIndex - 本地队列的序号     

T &Data, int nIndex)

    if ( nIndex >= m_nLocalQueueCount * 2)

        return;

    if ( nIndex >= m_nLocalQueueCount )

        if ( nIndex >= m_nLocalQueueCount )

    if ( m_ppLocalQueue[nIndex] == NULL )

        m_ppLocalQueue[nIndex] = new LocalQueue(m_nLocalQueueSize);

    m_ppLocalQueue[nIndex]->EnQueue(Data);

/**   分布式队列的本地队列出队函数

       @param  T &Data - 接收出队的数据

       @return  int - 出队成功返回CAPI_SUCCESS, 失败(队列为空)返回CAPI_FAILED.      

int CDistributedQueue<T, LocalQueue, SharedQueue>::PopFromLocalQueue(

    return m_ppLocalQueue[nId]->DeQueue(Data);

6) 出队操作代码

/**   分布式队列的出队函数

    出队操作策略为,先从本地队列中出队,如果失败则从共享队列中出队

       @return  int - 成功返回CAPI_SUCCESS, 失败返回CAPI_FAILED.    

int CDistributedQueue<T, LocalQueue, SharedQueue>::DeQueue(T &Data)

    int nRet;

    nRet = m_ppLocalQueue[nId]->DeQueue(Data);

    if ( nRet == CAPI_FAILED )

        nRet = m_pSharedQueue->Pop(Data);

    return nRet;  

本文转自Intel_ISN 51CTO博客,原文链接:http://blog.51cto.com/intelisn/130452,如需转载请自行联系原作者