在設計CDistributeQueue類時,通常有兩種方案值得考慮:
1、 本地隊列預先建立好,當有線程通路時就可以直接根據線程編号去通路對應的本地隊列。
2、 不預先建立本地隊列,當線程第一次通路分布式隊列時,由于擷取不到線程編号,由此可以斷定本線程是第1次通路分布式隊列,此時才建立本地隊列。
方案1和方案2可以說各有優缺點。方案1中,在事先不知道有多少線程會通路分布式隊列的情況下,預先建立好本地隊列會造成程式初始化時間過長,并且可能有一些建立好的隊列得不到使用。
方案2中,采用線程通路分布式隊列時才建立本地隊列,初始化時比較簡單,并且不會造成多建立了本地隊列的情況。缺點是程式設計時,隊列的操作代碼會變複雜一些,效率會有所降低。
下面的代碼中,給出的是方案2的實作。
1) 類的原型定義和成員函數
//擷取線程Id回調函數定義
typedef int (*GetThreadIdFunc)(void *pArg);
template <class T, class LocalQueue, class SharedQueue>
class CDistributedQueue {
private:
LocalQueue ** m_ppLocalQueue; // 本地隊列數組
SharedQueue * m_pSharedQueue; // 共享隊列池或共享隊列
int m_nLocalQueueSize;
int m_nSharedQueueSize;
int m_nLocalQueueCount;
int m_nSharedQueueCount;
DWORD m_dwTlsIndex; //線程本地存儲索引
LONG volatile m_lThreadIdIndex; //線程編号最大值
GetThreadIdFunc m_GetThreadIdFunc; //擷取線程編号回調函數,如果由外面
//的線程池提供編号時,需要傳入回調函數
void * m_pThreadIdFuncArg; //擷取線程編号回調函數的參數
CFastLock m_LocalQueueResizeLock; //專為下面的ResizeLocalQueue函數使用
void ResizeLocalQueue(); //将m_ppLocalQueue數組的大小擴大一倍
public:
CDistributedQueue(){
m_GetThreadIdFunc = NULL;
m_pThreadIdFuncArg = NULL;
m_lThreadIdIndex = 0;
};
void Create( int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount);
int nSharedQueueSize, int nSharedQueueCount,
GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg);
virtual ~CDistributedQueue();
LONG ThreadIdGet();
void EnQueue(T &Data);
int DeQueue(T &Data);
void PushToLocalQueue(T &Data);
void PushToLocalQueue(T &Data, int nIndex);
int PopFromLocalQueue(T &Data);
SharedQueue *GetSharedQueue() { return m_pSharedQueue; };
int PrivatizeSharedQueue(int nSharedQueueIndex);
};
說明一下:CDistributedQueue類中有三個模闆參數,第1個模闆參數T是表示資料類型;第2個模闆參數是表示本地隊列類的類型,為一 個不需要使用鎖的普通隊列,比如環形隊列等;第3個模闆參數是表示一個需要使用鎖的共享隊列類,可以是一個隊列池類,也可以是普通的使用鎖的共享隊列類。
1) 構造函數和析構函數代碼
/** 分布式隊列的建立函數
@param int nLocalQueueSize - 本地子隊列的大小
@param int nLocalQueueCount - 本地隊列的個數(數組的大小)
@param int nSharedQueueSize - 共享子隊列的大小
@param int nSharedQueueCount - 共享子隊列的個數
@return void - 無
*/
void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(
int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount)
{
m_nLocalQueueSize = nLocalQueueSize;
m_nSharedQueueSize = nSharedQueueSize;
if ( nLocalQueueCount != 0 )
{
m_nLocalQueueCount = nLocalQueueCount;
}
else
m_nLocalQueueCount = omp_get_num_procs();
if ( nSharedQueueCount != 0 )
m_nSharedQueueCount = nSharedQueueCount;
m_nSharedQueueCount = omp_get_num_procs();
m_ppLocalQueue = new LocalQueue *[m_nLocalQueueCount];
int i;
for ( i = 0; i < m_nLocalQueueCount; i++ )
m_ppLocalQueue[i] = NULL;
m_pSharedQueue = new SharedQueue(m_nSharedQueueCount, m_nSharedQueueSize);
m_dwTlsIndex = TlsAlloc();
m_lThreadIdIndex = 0;
}
@param GetThreadIdFunc GetThreadId - 擷取線程Id回調函數
@param void * pThreadIdFuncArg - GetThreadId回調函數的參數
int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount,
GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg)
m_GetThreadIdFunc = GetThreadId;
m_pThreadIdFuncArg = pThreadIdFuncArg;
Create(nLocalQueueSize, nLocalQueueCount, nSharedQueueSize, nSharedQueueCount);
/** 分布式隊列的析構函數
@return - 無
CDistributedQueue<T, LocalQueue, SharedQueue>::~CDistributedQueue()
if ( m_ppLocalQueue[i] != NULL )
{
delete m_ppLocalQueue[i];
}
delete [] m_ppLocalQueue;
delete m_pSharedQueue;
TlsFree(m_dwTlsIndex);
2) 将本地隊列數組擴大一倍的内部成員函數代碼
這個函數主要是考慮有可能程式更新後,通路的線程數量可能大于本地隊列數組的大小的情況,此時采取将本地隊列數組擴大一倍的政策。
/** 分布式隊列的将本地隊列數組擴大一倍的内部成員函數
void CDistributedQueue<T, LocalQueue, SharedQueue>::ResizeLocalQueue()
//将本地隊列數組擴大一倍, 防止線程數量多于隊列數量,以保證程式安全
LocalQueue **ppQueue = new LocalQueue *[m_nLocalQueueCount * 2];
ppQueue[i] = m_ppLocalQueue[i];
for ( i = m_nLocalQueueCount; i < m_nLocalQueueCount * 2; i++ )
ppQueue[i] = NULL;
m_ppLocalQueue = ppQueue;
//使用原子操作避免m_nLocalQueueCount的資料競争問題
AtomicWrite((LONG volatile *)&m_nLocalQueueCount, m_nLocalQueueCount * 2);
3) 擷取線程Id成員函數代碼
擷取線程Id成員函數中,這個函數中完成本地隊列的建立和分派工作。先是判斷擷取的線程Id是否為0,如果為0則表明還沒有建立本地隊列,此時需要給線程進行編号,并建立一個新的本地隊列放到數組中下标等于線程編号的位置上。
/** 分布式隊列的擷取線程Id函數
如果m_GetThreadIdFunc回調函數不為空,則使用它擷取Id
否則根據分布式隊列内部的編号機制擷取線程Id
@return LONG - 傳回線程的編号
LONG CDistributedQueue<T, LocalQueue, SharedQueue>::ThreadIdGet()
LONG Id;
LocalQueue *pQueue = NULL;
if ( m_GetThreadIdFunc != NULL )
Id = (*m_GetThreadIdFunc)(m_pThreadIdFuncArg);
if ( Id >= m_nLocalQueueCount )
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
if ( Id >= m_nLocalQueueCount )
{
ResizeLocalQueue();
}
if ( m_ppLocalQueue[Id] == NULL )
m_ppLocalQueue[Id] = new LocalQueue(m_nLocalQueueSize);
return Id;
Id = (LONG )TlsGetValue(m_dwTlsIndex);
if ( Id == 0 )
Id = AtomicIncrement(&m_lThreadIdIndex);
TlsSetValue(m_dwTlsIndex, (void *)Id);
pQueue = new LocalQueue(m_nLocalQueueSize);
--Id;
if ( Id >= m_nLocalQueueCount)
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
ResizeLocalQueue();
if ( pQueue != NULL )
m_ppLocalQueue[Id] = pQueue;
return Id;
4) 進隊操作政策1的進隊操作代碼
/** 分布式隊列的進隊操作函數
這裡假定了本地隊列可以無限進隊
進隊政策按以下優先級進行:
1、本地隊列空時進入本地隊列,、共享隊列未滿時進入共享隊列
3、共享隊列滿時進入本地隊列
@param T &Data - 要進隊的資料
void CDistributedQueue<T, LocalQueue, SharedQueue>::EnQueue(T &Data)
int nId = ThreadIdGet();
if ( m_ppLocalQueue[nId]->IsEmpty() )
m_ppLocalQueue[nId]->EnQueue(Data);
else if ( m_pSharedQueue->Push(Data) != CAPI_SUCCESS )
int nId = ThreadIdGet();
m_ppLocalQueue[nId]->EnQueue(Data);
//這個分支不需要做任何事
return;
5) 本地隊列的操作代碼
/** 分布式隊列的本地隊列進隊函數
将資料進入到目前線程的本地隊列中
void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(
T &Data)
m_ppLocalQueue[nId]->EnQueue(Data);
/** 分布式隊列的指定序号本地隊列進隊函數
這是一個為特殊需求而設計的函數
使用這個函數要特别小心,必須保證不會發生資料競争問題
@param int nIndex - 本地隊列的序号
T &Data, int nIndex)
if ( nIndex >= m_nLocalQueueCount * 2)
return;
if ( nIndex >= m_nLocalQueueCount )
if ( nIndex >= m_nLocalQueueCount )
if ( m_ppLocalQueue[nIndex] == NULL )
m_ppLocalQueue[nIndex] = new LocalQueue(m_nLocalQueueSize);
m_ppLocalQueue[nIndex]->EnQueue(Data);
/** 分布式隊列的本地隊列出隊函數
@param T &Data - 接收出隊的資料
@return int - 出隊成功傳回CAPI_SUCCESS, 失敗(隊列為空)傳回CAPI_FAILED.
int CDistributedQueue<T, LocalQueue, SharedQueue>::PopFromLocalQueue(
return m_ppLocalQueue[nId]->DeQueue(Data);
6) 出隊操作代碼
/** 分布式隊列的出隊函數
出隊操作政策為,先從本地隊列中出隊,如果失敗則從共享隊列中出隊
@return int - 成功傳回CAPI_SUCCESS, 失敗傳回CAPI_FAILED.
int CDistributedQueue<T, LocalQueue, SharedQueue>::DeQueue(T &Data)
int nRet;
nRet = m_ppLocalQueue[nId]->DeQueue(Data);
if ( nRet == CAPI_FAILED )
nRet = m_pSharedQueue->Pop(Data);
return nRet;
本文轉自Intel_ISN 51CTO部落格,原文連結:http://blog.51cto.com/intelisn/130452,如需轉載請自行聯系原作者