提到并行計算通常都會想到加鎖,事實卻并非如此,大多數并發是不需要加鎖的。比如在不同電腦上運作的代碼編輯器,兩者并發運作不需要加鎖。在一台電腦上同時運作的媒體播放放器和代碼編輯器,兩者并發運作不需要加鎖(當然系統調用和程序排程是要加鎖的)。在同一個程序中運作多個線程,如果各自處理獨立的事情也不需要加鎖(當然系統調用、程序排程和記憶體配置設定是要加鎖的)。在以上這些情況裡,各個并發實體之間沒有共享資料,是以雖然并發運作但不需要加鎖。
多線程并發運作時,雖然有共享資料,如果所有線程隻是讀取共享資料而不修改它,也是不用加鎖的,比如代碼段就是共享的“資料”,每個線程都會讀取,但是不用加鎖。排除所有這些情況,多線程之間有共享資料,有的線程要修改這些共享資料,有的線程要讀取這些共享資料,這才是程式員需要關注的情況,也是本節我們讨論的範圍。
在并發的環境裡,加鎖可以保護共享的資料,但是加鎖也會存在一些問題:
由于臨界區無法并發運作,進入臨界區就需要等待,加鎖帶來效率的降低。
在複雜的情況下,很容易造成死鎖,并發實體之間無止境的互相等待。
在中斷/信号處理函數中不能加鎖,給并發處理帶來困難。
優先級倒置造成實時系統不能正常工作。低級優先程序拿到高優先級程序需要的鎖,結果是高/低優先級的程序都無法運作,中等優先級的程序可能在狂跑。
由于并發與加鎖(互斥)的沖突關系,無鎖資料結構自然成為程式員關注的焦點,這也是本節要介紹的:
CPU提供的原子操作
大約在七八年前,我們用apache的xerces來解析XML檔案,奇怪的是多線程反而比單線程慢。他們找了很久也沒有找出原因,隻是證明使用多程序代替多線程會快一個數量級,在Windows上他們就使用了多程序的方式。後來移植到linux時候,我發現xerces每建立一個結點都會去更新一些全局的統計資訊,比如把結點的總數加一,它使用的pthread_mutex實作互斥。這就是問題所在:一個XML文檔有數以萬計的結點,以50個線程并發為例,每個線程解析一個XML文檔,總共要進行上百萬次的加鎖/解鎖,幾乎所有線程都在等待,你說能快得了嗎?
當時我知道Windows下有InterlockedIncrement之類的函數,它們利用CPU一些特殊指令,保證對整數的基本操作是原子的。查找了一些資源發現Linux下也有類似的函數,後來我把所有加鎖去掉,換成這些原子操作,速度比多程序運作還快了幾倍。下面我們看++和—的原子操作在IA架構上的實作:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
#define ATOMIC_SMP_LOCK "lock ; "
typedef struct { volatile int counter; } atomic_t;
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
static __inline__ void atomic_inc(atomic_t *v)
{
__asm__ __volatile__(
ATOMIC_SMP_LOCK "incl %0"
:"=m" (v->counter)
:"m" (v->counter));
}
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
static __inline__ void atomic_dec(atomic_t *v)
ATOMIC_SMP_LOCK "decl %0"
單入單出的循環隊列。單入單出的循環隊列是一種特殊情況,雖然特殊但是很實用,重要的是它不需要加鎖。這裡的單入是指隻有一個線程向隊列裡追加資料(push),單出隻是指隻有一個線程從隊列裡取資料(pop),循環隊列與普通隊列相比,不同之處在于它的最大資料儲存量是事先固定好的,不能動态增長。盡管有這些限制它的應用還是相當廣泛的。這我們介紹一下它的實作:
資料下定義如下:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
typedef struct _FifoRing
int r_cursor;
int w_cursor;
size_t length;
void* data[0];
}FifoRing;
r_cursor指向隊列頭,用于取資料(pop)。w_cursor指向隊列尾,用于追加資料(push)。length表示隊列的最大資料儲存量,data表示存放的資料,[0]在這裡表示變長的緩沖區(前面我們已經講過)。
建立函數
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
FifoRing* fifo_ring_create(size_t length)
FifoRing* thiz = NULL;
return_val_if_fail(length > 1, NULL);
thiz = (FifoRing*)malloc(sizeof(FifoRing) + length * sizeof(void*));
if(thiz != NULL)
{
thiz->r_cursor = 0;
thiz->w_cursor = 0;
thiz->length = length;
}
return thiz;
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
這裡我們要求隊列的長度大于1而不是大于0,為什麼呢?排除長度為1的隊列沒有什麼意義的原因外,更重要的原因是隊列頭與隊列尾重疊 (r_cursor= =w_cursor) 時,到底表示是滿隊列還是空隊列?這個要搞清楚才行,上次一個同僚犯了這個錯誤,讓我們查了很久。這裡我們認為隊列頭與隊列尾重疊時表示隊列為空,這與隊列初始狀态一緻,後面在寫的時候始終保留一個空位,避免隊列頭與隊列尾重疊,這樣可以消除歧義了。
追加資料(push)
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
Ret fifo_ring_push(FifoRing* thiz, void* data)
int w_cursor = 0;
Ret ret = RET_FAIL;
return_val_if_fail(thiz != NULL, RET_FAIL);
w_cursor = (thiz->w_cursor + 1) % thiz->length;
if(w_cursor != thiz->r_cursor)
thiz->data[thiz->w_cursor] = data;
thiz->w_cursor = w_cursor;
ret = RET_OK;
return ret;
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
隊列頭和隊列尾之間還有一個以上的空位時就追加資料,否則傳回失敗。
取資料(pop)
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
Ret fifo_ring_pop(FifoRing* thiz, void** data)
return_val_if_fail(thiz != NULL && data != NULL, RET_FAIL);
if(thiz->r_cursor != thiz->w_cursor)
*data = thiz->data[thiz->r_cursor];
thiz->r_cursor = (thiz->r_cursor + 1)%thiz->length;
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
隊列頭和隊列尾不重疊表示隊列不為空,取資料并移動隊列頭。
單寫多讀的無鎖資料結構
單寫表示隻有一個線程去修改共享資料結構,多讀表示有多個線程去讀取共享資料結構。前面介紹的讀寫鎖可以有效的解決這個問題,但更高效的辦法是使用無鎖資料結構。思路如下:
就像為了避免顯示閃爍而使用的雙緩沖一樣,我們使用兩份資料結構,一份資料結構用于讀取,所有線程都可以在不加鎖的情況下讀取這個資料結構。另外一份資料結構用于修改,由于隻有一個線程會修改它,是以也不用加鎖。
在修改之後,我們再交換讀/寫的兩個函數結構,把另外一份也修改過來,這樣兩個資料結構就一緻了。在交換時要保證沒有線程在讀取,是以我們還需要一個讀線程的引用計數。現在我們看看怎麼把前面寫的雙向連結清單改為單寫多讀的無鎖資料結構。
為了保證交換是原子的,我們需要一個新的原子操作CAS(compare and swap)。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
#define CAS(_a, _o, _n) \
({ __typeof__(_o) __o = _o; \
__asm__ __volatile__( \
"lock cmpxchg %3,%1" \
: "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) \
: "0" (__o), "r" (_n) ); \
__o; \
})
資料結構
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
typedef struct _SwmrDList
atomic_t rd_index_and_ref;
DList* dlists[2];
}SwmrDList;
兩個連結清單,一個用于讀一個用于寫。rd_index_and_ref的最高位元組記錄用于讀取的雙向連結清單的索引,低24位用于記錄讀取線程的引用記數,最大支援16777216個線程同時讀取,應該是足夠了,是以後面不考慮它的溢出。
讀取操作
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
int swmr_dlist_find(SwmrDList* thiz, DListDataCompareFunc cmp, void* ctx)
int ret = 0;
return_val_if_fail(thiz != NULL && thiz->dlists != NULL, -1);
atomic_inc(&(thiz->rd_index_and_ref));
size_t rd_index = (thiz->rd_index_and_ref.counter>>24) & 0x1;
ret = dlist_find(thiz->dlists[rd_index], cmp, ctx);
atomic_dec(&(thiz->rd_index_and_ref));
修改操作
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIml2ZuUmbv50LcNncvRXYjlGZul0ZulmbpxGd190LcNXZnFWbJ9CXt92Yuc2bsJGcwNmL3d3dvw1LcpDc0RHaiojIsJye.gif)
Ret swmr_dlist_insert(SwmrDList* thiz, size_t index, void* data)
DList* wr_dlist = NULL;
return_val_if_fail(thiz != NULL && thiz->dlists != NULL, ret);
size_t wr_index = !((thiz->rd_index_and_ref.counter>>24) & 0x1);
if((ret = dlist_insert(thiz->dlists[wr_index], index, data)) == RET_OK)
int rd_index_old = thiz->rd_index_and_ref.counter & 0xFF000000;
int rd_index_new = wr_index << 24;
do
{
usleep(100);
}while(CAS(&(thiz->rd_index_and_ref), rd_index_old, rd_index_new));
wr_index = rd_index_old>>24;
ret = dlist_insert(thiz->dlists[wr_index], index, data);
先修改用于修改的雙向連結清單,修改完成之後等到沒有線程讀取時,交換讀/寫兩個連結清單,再修改另一個連結清單,此時兩個連結清單狀态保持一緻。
稍做改進,對修改的操作進行加鎖,就可以支援多讀多寫的資料結構,讀是無鎖的,寫是加鎖的。
真正的無鎖資料結構
Andrei Alexandrescu的《Lock-FreeDataStructures》估計是這方面最經典的論文了,對他的方法我開始感到驚奇後來感到失望,驚奇的是算法的巧妙,失望的是無鎖的限制和代價。作者最後說這種資料結構隻适用于WRRMBNTM(Write-Rarely-Read-Many -But-Not-Too-Many)的情況。而且每次修改都要拷貝整個資料結構(甚至多次),是以不要指望這種方法能帶來多少性能上的提高,唯一的好處是能避免加鎖帶來的部分副作用。有興趣的朋友可以看下這篇論文,這裡我就不重複了。