天天看點

原子操作實作無鎖隊列

關于CAS等原子操作

在開始說無鎖隊列之前,我們需要知道一個很重要的技術就是CAS操作——Compare & Set,或是 Compare & Swap,現在幾乎所有的CPU指令都支援CAS的原子操作,X86下對應的是 CMPXCHG 彙編指令。有了這個原子操作,我們就可以用其來實作各種無鎖(lock free)的資料結構。

這個操作用C語言來描述就是下面這個樣子:(代碼來自Wikipedia的Compare And Swap詞條)意思就是說,看一看記憶體reg裡的值是不是oldval,如果是的話,則對其指派newval。

int compare_and_swap (int reg, int oldval, int newval)

{

int old_reg_val = *reg;

if (old_reg_val == oldval)

*reg = newval;

return old_reg_val;

}

這個操作可以變種為傳回bool值的形式(傳回 bool值的好處在于,可以調用者知道有沒有更新成功):

bool compare_and_swap (int *accum, int *dest, int newval)

{

if ( *accum == *dest ) {

*dest = newval;

return true;

}

return false;

}

與CAS相似的還有下面的原子操作:(這些東西大家自己看Wikipedia吧)

Fetch And Add,一般用來對變量做 +1 的原子操作

Test-and-set,寫值到某個記憶體位置并傳回其舊值。彙編指令BST

Test and Test-and-set,用來低低Test-and-Set的資源争奪情況

注:在實際的C/C++程式中,CAS的各種實作版本如下:

1)GCC的CAS

GCC4.1+版本中支援CAS的原子操作(完整的原子操作可參看 GCC Atomic Builtins)

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, …)

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, …)

2)Windows的CAS

在Windows下,你可以使用下面的Windows API來完成CAS:(完整的Windows原子操作可參看MSDN的InterLocked Functions)

InterlockedCompareExchange ( __inout LONG volatile *Target,

__in LONG Exchange,

__in LONG Comperand);

3) C++11中的CAS

C++11中的STL中的atomic類的函數可以讓你跨平台。(完整的C++11的原子操作可參看 Atomic Operation Library)

template< class T >

bool atomic_compare_exchange_weak( std::atomic* obj,

T* expected, T desired );

template< class T >

bool atomic_compare_exchange_weak( volatile std::atomic* obj,

T* expected, T desired );

無鎖隊列的連結清單實作

下面的東西主要來自John D. Valois 1994年10月在拉斯維加斯的并行和分布系統系統國際大會上的一篇論文——《Implementing Lock-Free Queues》。

我們先來看一下進隊列用CAS實作的方式:

EnQueue(x) //進隊列

{

//準備新加入的結點資料

q = new record();

q->value = x;

q->next = NULL;

do {
    p = tail; //取連結清單尾指針的快照
} while( CAS(p->next, NULL, q) != TRUE); //如果沒有把結點鍊在尾指針上,再試

CAS(tail, p, q); //置尾結點
           

}

我們可以看到,程式中的那個 do- while 的 Re-Try-Loop。就是說,很有可能我在準備在隊列尾加入結點時,别的線程已經加成功了,于是tail指針就變了,于是我的CAS傳回了false,于是程式再試,直到試成功為止。這個很像我們的搶電話熱線的不停重播的情況。

你會看到,為什麼我們的“置尾結點”的操作(第12行)不判斷是否成功,因為:

如果有一個線程T1,它的while中的CAS如果成功的話,那麼其它所有的 随後線程的CAS都會失敗,然後就會再循環,

此時,如果T1 線程還沒有更新tail指針,其它的線程繼續失敗,因為tail->next不是NULL了。

直到T1線程更新完tail指針,于是其它的線程中的某個線程就可以得到新的tail指針,繼續往下走了。

這裡有一個潛在的問題——如果T1線程在用CAS更新tail指針的之前,線程停掉或是挂掉了,那麼其它線程就進入死循環了。下面是改良版的EnQueue()

EnQueue(x) //進隊列改良版

{

q = new record();

q->value = x;

q->next = NULL;

p = tail;
oldp = p
do {
    while (p->next != NULL)
        p = p->next;
} while( CAS(p.next, NULL, q) != TRUE); //如果沒有把結點鍊在尾上,再試

CAS(tail, oldp, q); //置尾結點
           

}

我們讓每個線程,自己fetch 指針 p 到連結清單尾。但是這樣的fetch會很影響性能。而通實際情況看下來,99.9%的情況不會有線程停轉的情況,是以,更好的做法是,你可以接合上述的這兩個版本,如果retry的次數超了一個值的話(比如說3次),那麼,就自己fetch指針。

好了,我們解決了EnQueue,我們再來看看DeQueue的代碼:(很簡單,我就不解釋了)

DeQueue() //出隊列

{

do{

p = head;

if (p->next == NULL){

return ERR_EMPTY_QUEUE;

}

while( CAS(head, p, p->next) != TRUE );

return p->next->value;

}

我們可以看到,DeQueue的代碼操作的是 head->next,而不是head本身。這樣考慮是因為一個邊界條件,我們需要一個dummy的頭指針來解決連結清單中如果隻有一個元素,head和tail都指向同一個結點的問題,這樣EnQueue和DeQueue要互相排斥了。

注:上圖的tail正處于更新之前的裝态。

CAS的ABA問題

所謂ABA(見維基百科的ABA詞條),問題基本是這個樣子:

程序P1在共享變量中讀到值為A

P1被搶占了,程序P2執行

P2把共享變量裡的值從A改成了B,再改回到A,此時被P1搶占。

P1回來看到共享變量裡的值沒有被改變,于是繼續執行。

雖然P1以為變量值沒有改變,繼續執行了,但是這個會引發一些潛在的問題。ABA問題最容易發生在lock free 的算法中的,CAS首當其沖,因為CAS判斷的是指針的位址。如果這個位址被重用了呢,問題就很大了。(位址被重用是很經常發生的,一個記憶體配置設定後釋放了,再配置設定,很有可能還是原來的位址)

比如上述的DeQueue()函數,因為我們要讓head和tail分開,是以我們引入了一個dummy指針給head,當我們做CAS的之前,如果head的那塊記憶體被回收并被重用了,而重用的記憶體又被EnQueue()進來了,這會有很大的問題。(記憶體管理中重用記憶體基本上是一種很常見的行為)

這個例子你可能沒有看懂,維基百科上給了一個活生生的例子——

你拿着一個裝滿錢的手提箱在飛機場,此時過來了一個火辣性感的美女,然後她很暖昧地挑逗着你,并趁你不注意的時候,把用一個一模一樣的手提箱和你那裝滿錢的箱子調了個包,然後就離開了,你看到你的手提箱還在那,于是就提着手提箱去趕飛機去了。

這就是ABA的問題。

解決ABA的問題

維基百科上給了一個解——使用double-CAS(雙保險的CAS),例如,在32位系統上,我們要檢查64位的内容

1)一次用CAS檢查雙倍長度的值,前半部是指針,後半部分是一個計數器。

2)隻有這兩個都一樣,才算通過檢查,要吧賦新的值。并把計數器累加1。

這樣一來,ABA發生時,雖然值一樣,但是計數器就不一樣(但是在32位的系統上,這個計數器會溢出回來又從1開始的,這還是會有ABA的問題)

當然,我們這個隊列的問題就是不想讓那個記憶體重用,這樣明确的業務問題比較好解決,論文《Implementing Lock-Free Queues》給出一這麼一個方法——使用結點記憶體引用計數refcnt!

SafeRead(q)

{

loop:

p = q->next;

if (p == NULL){

return p;

}

Fetch&Add(p->refcnt, 1);

    if (p == q->next){
        return p;
    }else{
        Release(p);
    }
goto loop;
           

}

其中的 Fetch&Add和Release分是是加引用計數和減引用計數,都是原子操作,這樣就可以阻止記憶體被回收了。

用數組實作無鎖隊列

本實作來自論文《Implementing Lock-Free Queues》

使用數組來實作隊列是很常見的方法,因為沒有記憶體的分部和釋放,一切都會變得簡單,實作的思路如下:

1)數組隊列應該是一個ring buffer形式的數組(環形數組)

2)數組的元素應該有三個可能的值:HEAD,TAIL,EMPTY(當然,還有實際的資料)

3)數組一開始全部初始化成EMPTY,有兩個相鄰的元素要初始化成HEAD和TAIL,這代表空隊列。

4)EnQueue操作。假設資料x要入隊列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),則說明隊列滿了。

5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x傳回。同樣需要注意,如果x是TAIL,則說明隊列為空。

算法的一個關鍵是——如何定位HEAD或TAIL?

1)我們可以聲明兩個計數器,一個用來計數EnQueue的次數,一個用來計數DeQueue的次數。

2)這兩個電腦使用使用Fetch&ADD來進行原子累加,在EnQueue或DeQueue完成的時候累加就好了。

3)累加後求個模什麼的就可以知道TAIL和HEAD的位置了。

如下圖所示:

小結

以上基本上就是所有的無鎖隊列的技術細節,這些技術都可以用在其它的無鎖資料結構上。

1)無鎖隊列主要是通過CAS、FAA這些原子操作,和Retry-Loop實作。

2)對于Retry-Loop,我個人感覺其實和鎖什麼什麼兩樣。隻是這種“鎖”的粒度變小了,主要是“鎖”HEAD和TAIL這兩個關鍵資源。而不是整個資料結構。

還有一些和Lock Free的文章你可以去看看:

Code Project 上的雄文 《Yet another implementation of a lock-free circular array queue》

Herb Sutter的《Writing Lock-Free Code: A Corrected Queue》– 用C++11的std::atomic模闆。

IBM developerWorks的《設計不使用互斥鎖的并發資料結構》

繼續閱讀