天天看點

C++并發實戰16: std::atomic原子操作

     C++中對共享資料的存取在并發條件下可能會引起data race的undifined行為,需要限制并發程式以某種特定的順序執行,有兩種方式:使用mutex保護共享資料,原子操作:針對原子類型操作要不一步完成,要麼不做,不可能出現操作一半被切換CPU,這樣防止由于多線程指令交叉執行帶來的可能錯誤。非原子操作下,某個線程可能看見的是一個其它線程操作未完成的資料。

1  關于bool的原子化

   1.1 std::atomic_flag是一個bool原子類型有兩個狀态:set(flag=true) 和 clear(flag=false),必須被ATOMIC_FLAG_INIT初始化此時flag為clear狀态,相當于靜态初始化。一旦atomic_flag初始化後隻有三個操作:test_and_set,clear,析構,均是原子化操作。atomic_flag::test_and_set檢查flag是否被設定,若被設定直接傳回true,若沒有設定則設定flag為true後再傳回false。atomic_clear()清楚flag标志即flag=false。不支援拷貝、指派等操作,這和所有atomic類型一樣,因為兩個原子類型之間操作不能保證原子化。atomic_flag的可操作性不強導緻其應用局限性,還不如atomic<bool>。

    使用atomic_flag作為簡單的自旋鎖例子:本線程可以對flag設定了就跳出循環,避免使用mutex導緻線程阻塞

#include <iostream>       // std::cout
#include <atomic>         // std::atomic_flag
#include <thread>         // std::thread
#include <vector>         // std::vector
#include <sstream>        // std::stringstream

std::atomic_flag lock_stream = ATOMIC_FLAG_INIT;//flag處于clear狀态,沒有被設定過
std::stringstream stream;

void append_number(int x) {
  while (lock_stream.test_and_set()) {}//檢查并設定是個原子操作,如以前沒有設定過則退出循環,
    //每個線程都等待前面一個線程将lock_stream狀态清楚後跳出循環
  stream << "thread #" << x << '\n'; 
  lock_stream.clear();}
int main (){ 
  std::vector<std::thread> threads; 
  for (int i=1; i<=10; ++i) 
     threads.push_back(std::thread(append_number,i)); 
  for (auto& th : threads) th.join(); std::cout << stream.str(); return 0;
}
           

        采用class封裝可以用于lock_guard或unique_lock,但是最好不要将此用于任何競态條件下,這是一個busy loop!

class spinlock_mutex
{
   std::atomic_flag flag;
 public:
   spinlock_mutex():
   flag(ATOMIC_FLAG_INIT){}
   void lock()
   {
     while(flag.test_and_set(std::memory_order_acquire));
   }
   void unlock()
   {
     flag.clear(std::memory_order_release);
   }
};
           

2 atomic<T>模闆類,生成一個T類型的原子對象,并提供了系列原子操作函數。其中T是trivially  copyable type滿足:要麼全部定義了拷貝/移動/指派函數,要麼全部沒定義;沒有虛成員;基類或其它任何非static成員都是trivally copyable。典型的内置類型bool、int等屬于trivally copyable。再如class triviall{public: int x};也是。T能夠被memcpy、memcmp函數使用,進而支援compare/exchange系列函數。有一條規則:不要在保護資料中通過使用者自定義類型T通過參數指針或引用使得共享資料超出保護的作用域。atomic<T>編譯器通常會使用一個内部鎖保護,而如果使用者自定義類型T通過參數指針或引用可能産生死鎖。總之限制T可以更利于原子指令。注意某些原子操作可能會失敗,比如atomic<float>、atomic<double>在compare_exchange_strong()時和expected相等但是内置的值表示形式不同于expected,還是傳回false,沒有原子算術操作針對浮點數;同理一些使用者自定義的類型T由于記憶體的不同表示形式導緻memcmp失敗,進而使得一些相等的值仍傳回false。

atomic<T>的成員函數:

template < class T > struct atomic {
    bool is_lock_free() const volatile;//判斷atomic<T>中的T對象是否為lock free的,若是傳回true。lock free(鎖無關)指多個線程并發通路T不會出現data race,任何線程在任何時刻都可以不受限制的通路T
    bool is_lock_free() const;
    atomic() = default;//預設構造函數,T未初始化,可能後面被atomic_init(atomic<T>* obj,T val )函數初始化
    constexpr atomic(T val);//T由val初始化
    atomic(const atomic &) = delete;//禁止拷貝
    atomic & operator=(const atomic &) = delete;//atomic對象間的互相指派被禁止,但是可以顯示轉換再指派,如atomic<int> a=static_cast<int>(b)這裡假設atomic<int> b
    atomic & operator=(const atomic &) volatile = delete;//atomic間不能指派
    T operator=(T val) volatile;//可以通過T類型對atomic指派,如:atomic<int> a;a=10;
    T operator=(T val);
    operator  T() const volatile;//讀取被封裝的T類型值,是個類型轉換操作,預設記憶體序是memory_order_seq需要其它記憶體序則調用load
    operator  T() const;//如:atomic<int> a,a==0或者cout<<a<<endl都使用了類型轉換函數
    //以下函數可以指定記憶體序memory_order
    T exchange(T val, memory_order = memory_order_seq_cst) volatile;//将T的值置為val,并傳回原來T的值
    T exchange(T val, memory_order = memory_order_seq_cst);
    void store(T val, memory_order = memory_order_seq_cst) volatile;//将T值設為val
    void store(T val, memory_order = memory_order_seq_cst);
    T load(memory_order = memory_order_seq_cst) const volatile;//通路T值
    T load(memory_order = memory_order_seq_cst) const;
    bool compare_exchange_weak(T& expected, T val, memory_order = memory_order_seq_cst) volatile;//該函數直接比較原子對象所封裝的值與參數 expected 的實體内容,是以某些情況下,對象的比較操作在使用 operator==() 判斷時相等,但 compare_exchange_weak 判斷時卻可能失敗,因為對象底層的實體内容中可能存在位對齊或其他邏輯表示相同但是實體表示不同的值(比如 true 和 2 或 3,它們在邏輯上都表示"真",但在實體上兩者的表示并不相同)。可以虛假的傳回false(和expected相同)。若本atomic的T值和expected相同則用val值替換本atomic的T值,傳回true;若不同則用本atomic的T值替換expected,傳回false。
    bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst);
    bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst) volatile;//
與compare_exchange_weak 不同, strong版本的 compare-and-exchange 操作不允許(spuriously 地)傳回 false,即原子對象所封裝的值與參數 expected 的實體内容相同,比較操作一定會為 true。不過在某些平台下,如果算法本身需要循環操作來做檢查, compare_exchange_weak 的性能會更好。是以對于某些不需要采用循環操作的算法而言, 通常采用compare_exchange_strong 更好
    bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst);
};
           

    cplusplus給出的例子之一:

// atomic::compare_exchange_weak example:
#include <iostream>       // std::cout
#include <atomic>         // std::atomic
#include <thread>         // std::thread
#include <vector>         // std::vector
// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);
void append (int val) {     // append an element to the list
  Node* newNode = new Node {val,list_head};
  // next is the same as: list_head = newNode, but in a thread-safe way:
  while (!list_head.compare_exchange_weak(newNode->next,newNode)) {}
  // (with newNode->next updated accordingly if some other thread just appended another node)
}
int main ()
{
  // spawn 10 threads to fill the linked list:
  std::vector<std::thread> threads;
  for (int i=0; i<10; ++i) threads.push_back(std::thread(append,i));
  for (auto& th : threads) th.join();
  // print contents:
  for (Node* it = list_head; it!=nullptr; it=it->next)
    std::cout << ' ' << it->value;
  std::cout << '\n';
  // cleanup:
  Node* it; while (it=list_head) {list_head=it->next; delete it;}
  return 0;
}
           

程式輸出:

9 8 7 6 5 4 3 2 1 0      

3 std::atomic針對整數和指針的特化:

不能像傳統那樣拷貝和指派,可以通過内置成員函數load(),store(),exchange()完成指派,支援複合指派運算,自增自減運算,還有特有的fetch系列函數

整型特化:

specializations additional member functions

char signed char unsigned char short unsigned short int unsigned int long unsigned long long long unsigned long long char16_t char32_t wchar_t

extended integral types (if any)

atomic::fetch_add 

atomic::fetch_sub 

atomic::fetch_and 

atomic::fetch_or 

atomic::fetch_xor 

atomic::operator++ 

atomic::operator-- 

operator (comp. assign.) 

指針特化:

specializations additional member functions

U*

(for any type U)

atomic::fetch_add 

atomic::fetch_sub 

atomic::operator++ 

atomic::operator-- 

operator (comp. assign.) 

函數說明:

T fetch_add (T val, memory_order sync = memory_order_seq_cst) noexcept;//整型
T fetch_add (ptrdiff_t val, memory_order sync = memory_order_seq_cst) noexcept;//指針
将原子對象的封裝值加 val,并傳回原子對象的舊值(适用于整形和指針類型的 std::atomic 特化版本),整個過程是原子的

T fetch_and (T val, memory_order sync = memory_order_seq_cst) noexcept;//将原子對象的封裝值按位與 val,并傳回原子對象的舊值(隻适用于整型的 std::atomic 特化版本),整個過程是原子的。

T fetch_or (T val, memory_order sync = memory_order_seq_cst) noexcept;//将原子對象的封裝值按位或 val,并傳回原子對象的舊值(隻适用于整型的 std::atomic 特化版本),整個過程是原子的。

 fetch_xor (T val, memory_order sync = memory_order_seq_cst) noexcept;//将原子對象的封裝值按位異或 val,并傳回原子對象的舊值(隻适用于整型的 std::atomic 特化版本),整個過程是原子的。

operator++
pre-increment (1)	
T operator++() volatile noexcept;
T operator++() noexcept;
post-increment (2)	
T operator++ (int) volatile noexcept;
T operator++ (int) noexcept;
自增運算符重載, 第一種形式 (1) 傳回自增後的值(即字首++),第二種形式(2) 傳回自增前的值(即字尾++),适用于整形和指針類型的 std::atomic 特化版本。
operator--
自減運算符重載, 第一種形式 (1) 傳回自減後的值(即字首--),第二種形式(2) 傳回自減前的值(即字尾--),适用于整形和指針類型的 std::atomic 特化版本。
atomic::operator (comp. assign.)
複合指派運算符重載,主要包含以下形式:
if T is integral (1)	
T operator+= (T val) volatile noexcept;
T operator+= (T val) noexcept;
T operator-= (T val) volatile noexcept;
T operator-= (T val) noexcept;
T operator&= (T val) volatile noexcept;
T operator&= (T val) noexcept;
T operator|= (T val) volatile noexcept;
T operator|= (T val) noexcept;
T operator^= (T val) volatile noexcept;
T operator^= (T val) noexcept;
if T is pointer (2)	
T operator+= (ptrdiff_t val) volatile noexcept;
T operator+= (ptrdiff_t val) noexcept;
T operator-= (ptrdiff_t val) volatile noexcept;
T operator-= (ptrdiff_t val) noexcept;
以上各個 operator 都會有對應的 fetch_* 操作,詳細見下表:

操作符	成員函數	支援類型
複合指派	等價于	整型	指針類型	其他類型
+	atomic::operator+=	atomic::fetch_add	是	是	否
-	atomic::operator-=	atomic::fetch_sub	是	是	否
&	atomic::operator&=	atomic::fetch_and	是	否	否
|	atomic::operator|=	atomic::fetch_or	是	否	否
^	atomic::operator^=	atomic::fetch_xor	是	否	否
           

4 C風格的atomic類型及其操作,有點繁雜這裡不贅述了,參看:點選打開連結和點選打開連結。前面成員函數字首atomic_形成原子函數,函數的第一個參數必須是原子類型,如:

atomic_store (volatile atomic<T>* obj, T val)
           

如果你需要顯式指定記憶體序,應該使用atomic_store_explicit。是以字首atomic_表示c風格的原子自由函數,字尾_explicit指定記憶體序。

5    atomic<bool>,atomic<bool>同樣不可以指派(這裡指兩個atomic<bool>間的指派)、拷貝,但是其可以直接初始化,如:atomic<bool> flag(false); flag=true。atomic_flag::test_and_set()被atomic::exchange()替代,更多操作見atomic<T>。

std::atomic<bool> b;
bool x=b.load(std::memory_order_acquire);
b.store(true);
x=b.exchange(false,std::memory_order_acq_rel);//更改為false并傳回原來的值
           

    compare_exchange_weak/strong函數是保證在比較和交換執行下原子化,但是此函數可能與expected值相等的情形下atomic的T值沒有替換為val,這時atomic值未變且傳回false, compare_exchange_weak可能失敗,特别是線程數多于CPU核心數時compare-exchange這個指令序列可能CPU不能保證原子化,是以經常在循環中:

bool expected=false;
extern atomic<bool> b; // set somewhere else
while(!b.compare_exchange_weak(expected,true) && !expected);
           

    compare_exchange_strong可以保證當atomic不等于expected時傳回false,不需要循環保護。     std::atomic_flag是lock free的,但是atomic<bool>不一定是lock free的,可以用atomic<T>::is_lock_free()判斷。      一個例子的代碼片段:讀者寫者

#include <vector>
#include <atomic>
#include <iostream>
std::vector<int> data;
std::atomic<bool> data_ready(false);
void reader_thread()
{
  while(!data_ready.load()) 
  {
    std::this_thread::sleep(std::milliseconds(1));
  }
  std::cout<<”The answer=”<<data[0]<<”\n”;// 1 
}
void writer_thread()
{
  data.push_back(42);//2  由于1和2處發生了data race,是以需要線程同步,可以使用mutex,此處使用atomic<bool>強制線程間有個順序關系 
  data_ready=true; 
}
           

6 std::atomic<T*> 指針原子化類型,和atomic<bool>一樣,其也是不可複制拷貝的,擁有is_lock_free,load,store,exchange,compare_exchange_weak/strong等成員函數,隻不過内在成員換為指針類型T*。還提供了指針算術操作,fetch_add()、fetch_sub()記憶體位址的加減(都是原子操作),+=和-=兩個複合指派操作符,++和--的自增運算符。例如:std::atomic<Foo*> x; x+=3; 表示指向第四個Foo*并傳回這個Foo*;x.fetch_add(3)表示x位址向後移動3個,但是傳回原來的Foo*。

class Foo{};
Foo some_array[5];
std::atomic<Foo*> p(some_array);
Foo* x=p.fetch_add(2); 
assert(x==some_array);
assert(p.load()==&some_array[2]);
x=(p-=1); 
assert(x==&some_array[1]);
assert(p.load()==&some_array[1]);
           

7 整數原子類型(如std::atomic<long long>)和其它atomic一樣,均有load,store,exchange,fetch_add,fetch_sub等成員函數,但是整數有自己的一些操作符: fetch_and(), fetch_or(),+=, -=, &=, |=,fetch_xor(),^=,++,--。和atomic<T*>一樣,fetch_系列函數傳回的是舊值,複合指派運算傳回的是新值。整數原子類型沒有乘法、除法、位移操作,因為整數原子類型通常用于計數或者位标記。

8 注意std::shared_ptr<T>是原子類型,是以使用shared_ptr是線程安全的。C風格的自由原子函數也适用于shared_ptr。

std::shared_ptr<my_data> p;
void process_global_data()
{
  std::shared_ptr<my_data> local=std::atomic_load(&p);
  process_data(local);
}
void update_global_data()
{
  std::shared_ptr<my_data> local(new my_data);
  std::atomic_store(&p,local);
}