天天看點

多核分布式隊列的實作:“偷”與“自私”的運用(2)

3. 本地化隊列的實作思路

要給線程指定一個本地化隊列,通常的做法是先将建立好的隊列放入一個數組中,然後給線程編号,從0開始進行編号,編号為0的線程對應于數組下标為0位置上存放的隊列,編号為1的線程對應于數組下标為1位置上存放的隊列,...。

每個線程要擷取自己的本地化隊列時,隻需要先擷取線程編号,然後就可以通過線程編号去通路對應的隊列,由于每個線程的編号都不相同,是以每個線程通路的隊列都不相同,即每個隊列隻有一個線程通路它,這樣就可以實作每個線程的本地化隊列。

那麼如何給線程編号從0開始編号呢,作業系統并沒有直接提供這種功能。即使作業系統提供了線程從0開始編号的功能也沒有用,因為并不一定所有的線程 都會通路分布式隊列。例如有8個線程,其中編号為0,3,5,7的線程會通路分布式隊列,那麼在建立分布式隊列時,就需要建立8個本地隊列,否則線程編号 将無法和存放隊列的數組下标對應起來。

看到這裡,目标已經很明确了,那就是要給所有通路分布式隊列的線程從0開始依次編号。比如有N個線程要通路分布式隊列,那麼需要給這N個線程依次編号為0,1,...N-1。下面就來讨論如何給線程編号的問題。

4. 給線程編号的方法

在作業系統中,通常提供了線程本地存儲的API,通過API可以給每個線程設定一個資料(可以是指針,也可以是一個整數),同時也可以通過API來 取出目前線程設定的那個資料。比如給一個線程A設定一個整數0,那麼線程A執行的任何地方都可以調用相應的API擷取到整數0,這樣就可以在程式的任何地 擷取到線程A的編号為0。

在Windows系列作業系統中,提供了Tls_Alloc(),Tls_SetValue(),Tls_GetValue(),Tls_Free()這幾個函數來實作線程本地存儲操作。

pthread中,可以通過pthread_key_create(), pthread_setspecific(), pthread_getspecific()等函數來實作線程本地存儲操作,其中pthread_create_key()和Tls_Alloc()功能 相同,隻是參數有所不同,Tls_SetValue()和pthread_setspecific()功能等價,Tls_GetValue()和 pthread_getspecific()功能等價。

下面示範一下TlsAlloc(),Tls_SetValue(),Tls_GetValue(),Tls_Free()這幾個函數的基本用法。

DWORD g_dwTlsIndex;

LONG volatile g_dwThreadId = 0;

int GetId()

{

//擷取目前執行線程的由TlsSetValue()設定的值

int nId = (int)TlsGetValue(g_dwTlsIndex);

return (nId-1);

}

void ThreadFunc(void *args)

    LONG  Id = AtomicIncrement (&g_dwThreadId); //對g_dwThreadId進行原子加1操作

    TlsSetValue(g_dwTlsIndex, (void *)Id);  //給目前執行的線程設定一個值

    printf("ThreadFunc2: Thread Id = %ld\n", GetId());

int main(int argc, char* argv[])

    g_dwTlsIndex = TlsAlloc();  //配置設定一個線程本地存儲索引,需要在建立線程前執行

    _beginthread(ThreadFunc, 0, NULL);

Sleep(100); //延時等待上面兩個線程執行完

TlsFree(g_dwTlsIndex);

return 0;

需要說明一下,在ThreadFunc()函數中,使用了一個AtomicIncrement()函數,這個函數相當于Windows作業系統中的InterlockedIncrement()函數。在Widnows系統中,可以使用以下宏定義來實作AtomicIncrement()函數:

#define AtomicIncrement(x)  InterlockedIncrement(x)

上面程式在運作後,會列印出以下結果:

ThreadFunc: Thread Id = 0

ThreadFunc: Thread Id = 1

從上面代碼和執行結果可以看出,雖然GetValue()在ThreadFunc()函數中執行,但是兩個線程執行GetValue()得到的值是 不同的,一個線程得到的是0,另外一個線程得到的是1。這主要是因為兩個線程調用TlsSetValue()設定的值并不相同,一個為1,另一個為2。

需要注意的是,TlsGetValue()的傳回值為0表示失敗,是以使用TlsSetValue()函數時,應該從1開始設定,然後在GetId()函數中,傳回的是TlsGetValue()的傳回值減1。

采用上面的方法,就可以設計出分布式隊列中的線程Id自動編号和擷取功能了。下面是詳細的實作代碼:

class CDistributedQueue {

private:

       DWORD m_dwTlsIndex;

       LONG volatile m_lThreadIdIndex;

public:

       CDistributedQueue();

       virtual ~CDistributedQueue();

       LONG ThreadIdGet();

       //可以添加其他成員函數在下面

};

CDistributedQueue::CDistributedQueue()

       m_dwTlsIndex = TlsAlloc();

       m_lThreadIdIndex = 0;

CDistributedQueue::~CDistributedQueue()

       TlsFree(m_dwTlsIndex);

LONG CDistributedQueue::ThreadIdGet()

       LONG Id = (LONG )TlsGetValue(m_dwTlsIndex);

if ( Id == 0 )

    Id = AtomicIncrement(&m_lThreadIdIndex);

    TlsSetValue(Id);

return (Id - 1);

上面的代碼中,設定或擷取線程編号都在ThreadIdGet()一個成員函數内完成,先判斷擷取的Id是否為0,如果為0,表明線程還沒有被設定 Id,是以将m_lThreadIdIndex原子加1,然後再設定給對應的線程。每調用一次TlsSetValue()函數,其設定的Id值依次加1, 這樣就可以得到一個1,2,3,...序列。每個線程調用了TlsSetValue()函數後,下一個調用TlsGetValue()函數時,獲得的值一 定大于0,是以每個線程最多隻能執行TlsSetValue()函數一次。

采用上面的方法來擷取線程編号,必須保證建立的本地隊列數量大于等于通路隊列的線程數量,否則隊列數量不足,将會造成沒有足夠的本地隊列供線程使用,程式中可能會造成越界等不可預測的異常。常用的解決辦法是将本地隊列的數量擴大一倍。

上面這種線程編号方法,非常友善,任何通路分布式隊列的線程都可以被自動編号,調用分布式隊列的線程不需要為編号操心。

有了給線程自動編号的方法後,就可以實作分布式隊列的各個具體操作如進隊、出隊等。當然在實作具體的操作代碼前,有必要了解一下分布式隊列中是如何進行進隊和出隊操作的。

本文轉自Intel_ISN 51CTO部落格,原文連結:http://blog.51cto.com/intelisn/130445,如需轉載請自行聯系原作者

繼續閱讀