天天看點

并發程式設計 原子操作

i++和i–是線程安全的不?

1. 操作的原子性

實際上當我們操作i++的時候,其彙編指令為:

并發程式設計 原子操作

從上圖能看到一個i++對應的操作是:

(1)把變量i從記憶體(RAM)加載到寄存器;

(2)把寄存器的值加1;

(3)把寄存器的值寫回記憶體(RAM)。

那如果有多個線程去做i++操作的時候,也就可能導緻這樣一種情況:

并發程式設計 原子操作

上圖這種執行情況就導緻i變量的結果僅僅自增了一次,而不是兩次,導緻實際結果與預期結果不對。

總結下上面的問題,也就是說我們整個存儲的結構如下圖:

并發程式設計 原子操作

我們所有的變量首先是存儲在主存(RAM)上,CPU要去操作的時候首先會加載到寄存器,然後才操作,操作好了才寫會主存。

關于CPU和cache的更詳細介紹可以參考: 

對于gcc、g++編譯器來講,它們提供了一組API來做原子操作:

type sync_fetch_and_add (type *ptr, type value, ...) type sync_fetch_and_sub (type *ptr, type value, ...) type sync_fetch_and_or (type *ptr, type value, ...) type sync_fetch_and_and (type *ptr, type value, ...) type sync_fetch_and_xor (type *ptr, type value, ...) type sync_fetch_and_nand (type *ptr, type value, ...)
bool sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
// CAS
type sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

type sync_lock_test_and_set (type *ptr, type value, ...) 
void sync_lock_release (type *ptr, ...)      

詳細文檔見: https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html#Atomic-Builtins

對于c++11來講,我們也有一組atomic的接口:

并發程式設計 原子操作
并發程式設計 原子操作
并發程式設計 原子操作
并發程式設計 原子操作

Memory synchronization ordering

并發程式設計 原子操作

詳細文檔見: https://en.cppreference.com/w/cpp/atomic

但是這些原子操作都是怎麼實作的呢?

X86的架構

Intel X86指令集提供了指令字首lock用于鎖定前端串行總線FSB,保證了指令執行時不會收到其他處理器的幹擾。

比如:

static int lxx_atomic_add(int* ptr, int increment){ int old_value = *ptr;
    asm volatile("lock; xadd %0, %1 \n\t"
        : "=r"(old_value), "=m"(*ptr)
        : "0"(increment), "m"(*ptr)
        : "cc", "memory");

return *ptr;
}      

使用lock指令字首之後,處理期間對count記憶體的并發通路(Read/Write)被禁止,進而保證了指令的原子性。

如圖所示:

并發程式設計 原子操作

其原理在Intel開發手冊有如下說明:

Causes the processor’s LOCK# signal to be asserted during execution of the accompanying instruction (turns the instruction into an atomic instruction). In a multiprocessor environment, ​

​the LOCK# signal ensures that the processor has exclusive use of any shared memory while the signal is asserted.​

In most IA-32 and all Intel 64 processors, locking may occur without the LOCK# signal being asserted. See the “IA32 Architecture Compatibility” section below for more details. The LOCK prefix can be prepended only to the following instructions and only to those forms of the instructions where the destination operand is a memory operand: ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, CMPXCHG16B,DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, and XCHG. If the LOCK prefix is used with one of these instructions and the source operand is a memory operand, an undefined opcode exception (#UD) may be generated. An undefined opcode exception will also be generated if the LOCK prefix is used with any instruction not in the above list. The XCHG instruction always asserts the LOCK# signal regardless of the presence or absence of the LOCK prefix.

The LOCK prefix is typically used with the BTS instruction to perform a read-modify-write operation on a memory location in shared memory environment.

The integrity of the LOCK prefix is not affected by the alignment of the memory field. Memory locking is observed for arbitrarily misaligned fields.

This instruction’s operation is the same in non-64-bit modes and 64-bit mode.

PS : 注意上面标紅的文字。

在執行伴随指令期間使處理器的LOCK#信号有效(将指令變為原子指令)。在多處理器環境中,LOCK#信号確定處理器在信号有效時獨占使用任何共享存儲器。如果LOCK字首與這些指令之一一起使用,并且源操作數是記憶體操作數,則可能會生成未定義的操作碼異常(#UD)。 如果LOCK字首與任何不在上述清單中的指令一起使用,也會産生未定義的操作碼異常。 無論是否存在LOCK 字首,XCHG指令都始終聲明LOCK#信号。

LOCK字首通常與BTS指令一起使用,以在共享存儲器環境中的存儲器位置上執行讀取 – 修改 – 寫入操作。

LOCK字首的完整性不受存儲器字段對齊的影響。 記憶體鎖定是針對任意不對齊的字段。

好了,到此,我們了解X86上如何支援原子操作了,我們看看核心的實作: 如檔案:arch/x86/include/asm/atomic.h

/**
* arch_atomic_add - add integer to atomic variable
* @i: integer value to add
* @v: pointer of type atomic_t
*
* Atomically adds @i to @v.
*/
static __always_inline void arch_atomic_add(int i, atomic_t *v)
{
 asm volatile(LOCK_PREFIX "addl %1,%0"
 : "+m" (v->counter)
 : "ir" (i) : "memory");
}      

LOCK_PREFIX中的實作:

#ifdef CONFIG_SMP
#define LOCK_PREFIX_HERE \
        ".pushsection .smp_locks,\"a\"\n"    \
        ".balign 4\n"                \
        ".long 671f - .\n" /* offset */        \
        ".popsection\n"                \
        "671:"
#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "
#else /* ! CONFIG_SMP */
#define LOCK_PREFIX_HERE ""
#define LOCK_PREFIX ""
#endif      

也就是說在SMP的系統中,LOCK_PREFIX是lock,而非SMP系統中是空, 另外CAS的代碼實作也如下:

static __always_inline int atomic_cmpxchg(atomic_t *v, int old, int new)
{
    return cmpxchg(&v->counter, old, new);
}
#define cmpxchg(ptr, old, new)                      \
    __cmpxchg(ptr, old, new, sizeof(*(ptr)))
#define __cmpxchg(ptr, old, new, size)                  \
    __raw_cmpxchg((ptr), (old), (new), (size), LOCK_PREFIX)
#define __raw_cmpxchg(ptr, old, new, size, lock)            \
({                                  \
    __typeof__(*(ptr)) __ret;                   \
    __typeof__(*(ptr)) __old = (old);               \
    __typeof__(*(ptr)) __new = (new);               \
    switch (size) {                         \
    case __X86_CASE_B:                      \
    {                               \
        volatile u8 *__ptr = (volatile u8 *)(ptr);      \
        asm volatile(lock "cmpxchgb %2,%1"          \
             : "=a" (__ret), "+m" (*__ptr)      \
             : "q" (__new), "0" (__old)         \
             : "memory");               \
        break;                          \
    }                               \
        case __X86_CASE_W:                      \
    {                               \
        volatile u16 *__ptr = (volatile u16 *)(ptr);        \
        asm volatile(lock "cmpxchgw %2,%1"          \
             : "=a" (__ret), "+m" (*__ptr)      \
             : "r" (__new), "0" (__old)         \
             : "memory");               \
        break;                          \
    }                               \
    case __X86_CASE_L:                      \
    {                               \
        volatile u32 *__ptr = (volatile u32 *)(ptr);        \
        asm volatile(lock "cmpxchgl %2,%1"          \
             : "=a" (__ret), "+m" (*__ptr)      \
             : "r" (__new), "0" (__old)         \
             : "memory");               \
        break;                          \
    }                               \
    case __X86_CASE_Q:                      \
    {                               \
        volatile u64 *__ptr = (volatile u64 *)(ptr);        \
        asm volatile(lock "cmpxchgq %2,%1"          \
             : "=a" (__ret), "+m" (*__ptr)      \
             : "r" (__new), "0" (__old)         \
             : "memory");               \
        break;                          \
    }                               \
    default:                            \
        __cmpxchg_wrong_size();                 \
    }                               \
    __ret;                              \
})      

對于X86的系統我們有LOCK信号去關閉CPU和記憶體間并發通路,做到獨占通路, 那麼也阻止了其它CPU與記憶體間的通路,這是一種低效的處理方式。

mips或者其它類型的CPU體系

當然對于有些CPU體系結構,比如mips是通過關CPU中斷實作的: arch/mips/include/asm/atomic.h

#define ATOMIC64_OP(op, c_op,
asm_op)                          \
static __inline__ void atomic64_##op(long i, atomic64_t * v)              \
{                                          \
    if (kernel_uses_llsc)
{                              \
        long temp;                              \
                                          \
        loongson_llsc_mb();                          \
        __asm__
__volatile__(                          \
        "    .set    push                    \n"   \
        "    .set    "MIPS_ISA_LEVEL"            \n"   \
        "1:    lld    %0, %1        # atomic64_" #op
"    \n"   \
        "    " #asm_op " %0, %2                \n"   \
        "    scd    %0,
%1                    \n"   \
        "\t" __scbeqz "    %0,
1b                    \n"   \
        "    .set    pop                    \n"   \
        : "=&r" (temp), "+" GCC_OFF_SMALL_ASM() (v->counter)          \
        : "Ir" (i));                              \
    } else {                                  \
        unsigned long flags;                          \
                                          \
        raw_local_irq_save(flags);                      \ 
        v->counter c_op i;                          \
        raw_local_irq_restore(flags);                      \
    }                                      \
}      

raw_local_irq_save(flags)

關閉中斷raw_local_irq_restore(flags) 打開中斷

ARM體系

ARMv6架構引入了獨占通路記憶體為止的概念,提供了更靈活的原子記憶體更新。ARMv6體系結構以Load-Exclusive和Store-Exclusive同步原語LDREX和STREX的形式引入了Load Link和Store Conditional指令。 從ARMv6T2開始,這些指令在ARM和Thumb指令集中可用。 獨立加載和專有存儲提供了靈活和可擴充的同步,取代了棄用的SWP和SWPB指令。

後來使用的是LDREX和STREX指令。 ## armv7之後繼續看代碼,在armv7之後就用了ldrex和strex

#define ATOMIC_OP(op, c_op, asm_op)                    \
static inline void atomic_##op(int i, atomic_t *v)            \
{                                    \
    unsigned long tmp;                        \
    int result;                            \
                                    \
    prefetchw(&v->counter);                        \
    __asm__ __volatile__("@ atomic_" #op "\n"            \
"1:    ldrex    %0, [%3]\n"                        \
"    " #asm_op "    %0, %0, %4\n"                    \
"    strex    %1, %0, [%3]\n"                        \
"    teq    %1, #0\n"                        \
"    bne    1b"                            \
    : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)        \
    : "r" (&v->counter), "Ir" (i)                    \
    : "cc");                            \
}                                    \      

訪存指令LDREX/STREX和普通的LDR/STR訪存指令不一樣,它是“獨占”訪存指令。這對指令訪存過程由一個稱作“exclusive monitor”的部件來監視是否可以進行獨占通路。

先看看這對獨占訪存指令:

(1)LDREX R1 ,[R0] 指令是以獨占的方式從R0所指的位址中取一個字存放到R0 中;

(2)STREX R2,R1,[R0] 指令是以獨占的方式用R1來更新記憶體R0,如果獨占通路條件允許,則更新成功并傳回0到R2,否則失敗傳回1到R2。

上面的代碼我們還可以用下圖再解釋一次:

并發程式設計 原子操作

他們的原理就不再展開,詳細情況可以參考:

ARMv8系列的CPU體系,使用了LDXR和STXR指令來對記憶體進行獨占通路:

#define ATOMIC64_OP(op, asm_op) \
__LL_SC_INLINE void \
__LL_SC_PREFIX(arch_atomic64_##op(long i, atomic64_t *v)) \
{ \
 long result; \
 unsigned long tmp; \
 \
 asm volatile("// atomic64_" #op "\n" \
" prfm pstl1strm, %2\n" \
"1: ldxr %0, %2\n" \
" " #asm_op " %0, %0, %3\n" \
" stxr %w1, %0, %2\n" \
" cbnz %w1, 1b" \
 : "=&r" (result), "=&r" (tmp), "+Q" (v->counter) \
 : "Ir" (i)); \
} \
__LL_SC_EXPORT(arch_atomic64_##op);      

更詳細的資訊可以參考:

在此,特别推薦大家可以去看看zmq對原子操作的實作,實際上,我們在

工程開發的時候,可以依照它進行改進:

https://github.com/zeromq/libzmq/blob/master/src/atomic_counter.hpp

https://github.com/zeromq/libzmq/blob/master/src/atomic_ptr.hpp

比如自增原子操作的封裝:

并發程式設計 原子操作

3. 自旋鎖

posix提供一組自旋鎖(spin lock)的API:

int pthread_spin_destroy(pthread_spinlock_t *);
int pthread_spin_init(pthread_spinlock_t *, int pshared);
int pthread_spin_lock(pthread_spinlock_t *);
int pthread_spin_trylock(pthread_spinlock_t *);
int pthread_spin_unlock(pthread_spinlock_t *);      

pshared的取值:

  • PTHREAD_PROCESS_SHARED:該自旋鎖可以在多個程序中的線程之間共

    享。

  • PTHREAD_PROCESS_PRIVATE:僅初始化本自旋鎖的線程所在的程序内的線程才能夠使用該自旋鎖。

我們先來看一個示例:

#include <stdio.h>
#include <pthread.h>
#include <time.h>
#define MAX_THREAD_NUM  8
int counter = 0;
pthread_spinlock_t spinlock;
typedef void* (*thread_func_t)(void* argv);
static int lxx_atomic_add(int* ptr, int increment){
    int old_value = *ptr;
    __asm__ volatile("lock; xadd %0, %1 \n\t"
                     : "=r"(old_value), "=m"(*ptr)
                     : "0"(increment), "m"(*ptr)
                     : "cc", "memory");
    return *ptr;
}
void* atomic_thread_main(void* argv){
    for (int i =0; i < 100000000; i++){
        lxx_atomic_add(&counter, 1);
//          counter++;
    }
    return NULL;
}
void* spin_thread_main(void* argv){
    for(int i = 0; i < 100000000; i++){
        pthread_spin_lock(&spinlock);
        counter++;
        pthread_spin_unlock(&spinlock);
    }
    return NULL;
}
int test_lock(thread_func_t func, char** argv){
    clock_t start = clock();
    pthread_t tid[MAX_THREAD_NUM] = {0};
    for (int i = 0; i < MAX_THREAD_NUM; i++){
        int ret = pthread_create(&tid[i], NULL, func, argv);
        if (0 != ret){
            printf("create thread failed\n");
        }
    }
    for (int i = 0; i < MAX_THREAD_NUM; i++){
        pthread_join(tid[i], NULL);
    }
    clock_t end = clock();
    printf("spend clock : %ld,   ", end - start);
    return 0;
    }
int main(int argc, char** argv){
    test_lock(atomic_thread_main, NULL);
    printf("counter = %d\n", counter);
    counter = 0;
    pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE);
    test_lock(spin_thread_main, NULL);
    printf("counter = %d\n", counter);
    return 0;
}      

好,到現在,我們解釋一下自旋的意思,我們應該還記得mutex如果拿取不到

鎖,則會進入休眠,放在鎖“等待”的隊列上(詳細可見

http://47.106.79.26:4001/2018/11/28/kernel_mutex/),這樣操作會涉及到

程序上下文的切換,效率不高,那麼自旋鎖則不是一種休眠等待的方式,而是

一種忙等待的過程,什麼意思呢?就是自旋鎖的pthread_spin_lock裡有一個死

循環,這個死循環一直檢查鎖的狀态,如果是lock狀态,則繼續執行死循環,

否則上鎖,結束死循環。

那麼這個spin lock在核心層面是怎麼實作?當然不同的CPU體系也有不同的實

現,下文我們會以ARM和其它的單核(Uni-Process)系統來介紹。

ARM體系

在ARMv6版本上,V3.2的核心之前(大約3.2版本,沒有太仔細去考究了),它

的實作相對簡單,詳見

https://github.com/torvalds/linux/blob/v3.2/arch/arm/include/asm/spinlock_types.h

typedef struct {
    volatile unsigned int lock;
} arch_spinlock_t;      

我們看到這個arch_spinlock_t其實就是一個無符号的整形,如果為1表示鎖住

狀态,為0表示沒有鎖住的狀态。如下圖的代碼:

https://github.com/torvalds/linux/blob/v3.2/arch/arm/include/asm/spinlock.h

并發程式設計 原子操作

這種實作也是比較簡單的,性能也不錯,但是存在不公平的問題: 不公平。也

就是所有的thread都是在無序的争搶spin lock,誰先搶到誰先得,不管thread

等了很久還是剛剛開始spin。在沖突比較少的情況下,不公平不會展現的特别

明顯,然而,随着硬體的發展,多核處理器的數目越來越多,多核之間的沖突

越來越劇烈,無序競争的spinlock帶來的performance issue終于浮現出來,根

據Nick Piggin的描述:

On an 8 core (2 socket) Opteron, spinlock unfairness is extremely noticable, with a
userspace test having a difference of up to 2x runtime per thread, and some threads are
starved or "unfairly" granted the lock up to 1 000 000 (!) times.      

多麼的不公平,有些可憐的thread需要饑餓的等待1000000次。本質上無序競争

從機率論的角度看應該是均勻分布的,不過由于硬體特性導緻這麼嚴重的不公

平,我們來看一看硬體block:

并發程式設計 原子操作

lock本質上是儲存在main memory中的,由于cache的存在,當然不需要

每次都有通路main memory。在多核架構下,每個CPU都有自己的L1 cache,保

存了lock的資料。假設CPU0擷取了spin lock,那麼執行完臨界區,在釋放鎖的

時候會調用smp_mb invalide其他忙等待的CPU的L1 cache,這樣後果就是釋放

spin lock的那個cpu可以更快的通路L1cache,操作lock資料,進而大大增加的

下一次擷取該spin lock的機會。

為了解決這麼的問題,在最近的一些核心版本裡(比如我看的是5.4版本

的),優化了這種設計,較長的描述如下:

https://github.com/torvalds/linux/blob/v5.4/arch/arm/include/asm/spinlock_types.h

typedef struct {
    union {
        u32 slock;
        struct __raw_tickets {
#ifdef __ARMEB__
        u16 next;
 u16 owner;
#else
 u16 owner;
 u16 next;
#endif
 } tickets;
 };
} arch_spinlock_t;      

本來一個整數的lock,被整得這麼複雜了,要想了解整個資料結構,我們需要先了解ticket-based spin lock的概念,如有有人來長沙了,我建議大家去吃一次楊裕興的面條,有點兒小貴,但是因為味道不錯,是以每次去的時候可能都會排隊等待,門口的美女會給你一張ticket,上面寫着18号,同時會告訴你,目前狀态是10号已經入席,11号在等待。

回到arch_spinlock_t,這裡的owner就是目前已經入席的那個号碼,next記錄的是下一個要分發的号碼。下面的描述使用普通的計算機語言和在九毛九就餐(假設楊裕興面館隻有一張餐桌)的例子來進行描述,估計可以讓同學們更有興趣閱讀下去。最開始的時候,slock(同上面的lock一樣的含義,0表示unlock狀态,1表示lock狀态)被指派為0,也就是說owner和next都是0,owner和next相等,表示unlocked。當第一個個thread調用spin_lock來申請lock(第一個人就餐)的時候,owner和next相等,表示unlocked,這時候該thread持有該spin lock(可以擁有九毛九的唯一的那個餐桌),并且執行

next++,也就是将next設定為1(再來人就配置設定1這個号碼讓他等待就餐)。也許該thread執行很快(吃飯吃的快),沒有其他thread來競争就調用spin_unlock了(無人等待就餐,生意慘淡啊),這時候執行owner++,也就是将owner設定為1(表示目前持有1這個号碼牌的人可以就餐)。姗姗來遲的1号獲得了直接就餐的機會,next++之後等于2。1号這個家夥吃飯巨慢,這是不文明現象(thread不能持有spin lock太久),但是存在。又來一個人就餐,配置設定目前next值的号碼2,當然也會執行next++,以便下一個人或者3的号碼牌。持續來人就會配置設定3、4、5、6這些号碼牌,next值不斷的增加,但是owner巋然不動,直到欠扁的1号吃飯完畢(調用spin_unlock),釋放飯桌這個唯一資源,owner++之後等于2,表示持有2那個号碼牌的人可以進入就餐了。

好了,我們先來看一個實作:

并發程式設計 原子操作

對于SMP的系統(多核處理器)是上文兩種比較典型的實作,但是如果是UP系統(單處理器系統)則不是如此實作的,具體來講有兩種實作方式,一種是關閉CPU搶占,另一種是先關閉中斷再關閉CPU搶占。

并發程式設計 原子操作

在2.6.8版本中,檢視這些spin lock的聲明和定義的時候,我們可以下檔案為參考:

https://github.com/torvalds/linux/blob/v2.6.18/include/linux/spinlock.h

并發程式設計 原子操作

好了,到此,我們總結下,原子操作和自旋鎖在不同的CPU體系上會有不同的實作,在整個核心發張過程中,也有變化,那麼什麼情況下适合用原子操作,什麼情況下适合用自旋鎖呢?

原子操作适合不叫簡短的操作(單個數,以它為變種可以操作多個數),自旋鎖因為一直是忙等待,是以适合那些耗時比較小的臨界區。

核心在同步中涉及了barrier,有關barrier可以參考:

4. mutex

在此就不較長的描述了,可以參考我之前的部落格: