二、整數原子類型對象的初始化:
C11标準為了簡化對整數原子對象的初始化,引入了一種宏函數——ATOMIC_VAR_INIT,當我們聲明一個整數原子對象,然後立即對它初始化時,可以使用這個宏函數。該宏函數接收一個參數,用于指定該原子對象的初始值。當然,我們必須注意的是,初始值的類型要與原子對象的整數類型相相容。我們可以看以下例子。
#include <stdatomic.h>
#include <stdbool.h>
// 将sIntAtom原子對象初始化為10
static volatile atomic_int sIntAtom = ATOMIC_VAR_INIT(10);
// 将sBoolAtom原子對象初始化為true
static volatile atomic_bool sBoolAtom = ATOMIC_VAR_INIT(true);
// 将sCharAtom原子對象初始化為'c'
static volatile atomic_char sCharAtom = ATOMIC_VAR_INIT('c');
int main(int argc, const char * argv[])
{
// 将intAtom原子對象初始化為10
volatile atomic_int intAtom = ATOMIC_VAR_INIT(10);
// 将boolAtom原子對象初始化為true
volatile atomic_bool boolAtom = ATOMIC_VAR_INIT(true);
// 将charAtom原子對象初始化為'c'
volatile atomic_char charAtom = ATOMIC_VAR_INIT('c');
}
注意!在最新版本的C2X标準草案中已經提到,ATOMIC_VAR_INIT 這個宏将被棄用,是以我們隻需要直接對這些整數類型的原子對象進行指派即可。進而上述代碼可改寫為以下形式:
#include <stdatomic.h>
#include <stdbool.h>
// 将sIntAtom原子對象初始化為10
static volatile atomic_int sIntAtom = 10;
// 将sBoolAtom原子對象初始化為true
static volatile atomic_bool sBoolAtom = true;
// 将sCharAtom原子對象初始化為'c'
static volatile atomic_char sCharAtom = 'c';
int main(int argc, const char * argv[])
{
// 将intAtom原子對象初始化為10
volatile atomic_int intAtom = 10;
// 将boolAtom原子對象初始化為true
volatile atomic_bool boolAtom = true;
// 将charAtom原子對象初始化為'c'
volatile atomic_char charAtom = 'c';
}
另外還有一種初始化方法是調用 void atomic_init(volatile A *atom, C value); 函數。它有兩個參數,第一個參數指向一個整數原子類型的對象;第二個參數是為該原子對象指定的初始值。當我們先聲明了某個整數原子類型的對象,之後再為它初始化時,就可以調用這個函數,我們可以看以下代碼例子。
#include <stdatomic.h>
#include <stdbool.h>
int main(int argc, const char * argv[])
{
volatile atomic_int intAtom;
volatile atomic_bool boolAtom;
volatile atomic_char charAtom;
// 将intAtom原子對象初始化為10
atomic_init(&intAtom, 10);
// 将boolAtom原子對象初始化為false
atomic_init(&boolAtom, false);
// 将charAtom原子對象初始化為'c'
atomic_init(&charAtom, 'c');
}
這裡大家還需要注意的是,對原子對象的初始化操作并非是原子的!是以我們往往在做多線程操作之前先對原子對象做必要的初始化。此外,對原子對象的初始化應該使用上述所提到的初始化方式,而不是用下面即将描述的原子存儲與加載操作。下面即将描述的所有原子操作都應該基于已初始化完的原子對象!
三、整數原子類型對象的存儲與加載:
整數原子類型的存儲與加載均有兩種模式,一種是預設存儲器次序模式,還有一種則是顯式指定存儲器次序的模式。我們先列出整數原子類型加載操作。
C atomic_load(volatile A *object);
C atomic_load_explicit(volatile A *object, memory_order order);
對于預設存儲器次序的整數類型原子加載操作而言,它有一個參數,此參數指向一個整數原子類型的對象,然後傳回該原子對象的目前值。下面我們來看些例子:
#include <stdio.h>
#include <stdatomic.h>
#include <stdbool.h>
int main(int argc, const char * argv[])
{
volatile atomic_int intAtom = 10;
volatile atomic_bool boolAtom = true;
volatile atomic_char charAtom = 'a';
// 加載intAtom原子對象的值
int i = atomic_load(&intAtom);
// 加載boolAtom原子對象的值
bool b = atomic_load(&boolAtom);
// 加載charAtom原子對象的值
char c = atomic_load(&charAtom);
printf("i = %d, b = %d, c = %c\n", i, b, c);
}
整數原子類型的存儲操作也有兩種版本,一個是預設存儲器次序的,另一個是顯式指定存儲器次序的。
void atomic_store(volatile A *object, C desired);
void atomic_store_explicit(volatile A *object, C desired, memory_order order);
對于預設存儲器次序的版本,該操作函數具有兩個參數,第一個參數指向一個整數原子類型的對象,第二個參數用于指定所要存儲的值。下面來看些例子:
#include <stdio.h>
#include <stdatomic.h>
#include <stdbool.h>
int main(int argc, const char * argv[])
{
volatile atomic_int intAtom = 0;
volatile atomic_bool boolAtom = false;
volatile atomic_char charAtom = '\0';
// 用-100來存儲intAtom原子對象的值
atomic_store(&intAtom, -100);
// 用true來存儲boolAtom原子對象的值
atomic_store(&boolAtom, true);
// 用'c'來存儲charAtom的值
atomic_store(&charAtom, 'c');
// 加載intAtom原子對象的值
int i = atomic_load(&intAtom);
// 加載boolAtom原子對象的值
bool b = atomic_load(&boolAtom);
// 加載charAtom原子對象的值
char c = atomic_load(&charAtom);
printf("i = %d, b = %d, c = %c\n", i, b, c);
}
四、整數原子類型對象的交換操作:
C11标準中的整數原子類型的交換操作其實就對應了本文一開始所提到的SWAP原子操作。C11标準中給出了兩個交換操作版本,一個是預設存儲器次序的,另一個是顯式指定存儲器次序的。
C atomic_exchange(volatile A *object, C desired);
C atomic_exchange_explicit(volatile A *object, C desired, memory_order order);
對于預設存儲器次序的版本,交換操作函數提供了兩個參數,第一個參數指向某個整數原子類型的對象;第二個參數指定了想要存儲到該原子對象中的值。該函數傳回指定原子對象在執行此操作之前的值。下面我們給出一些例子。
#include <stdio.h>
#include <stdatomic.h>
int main(int argc, const char * argv[])
{
volatile atomic_int atom = 0;
// 使用atomic_exchange操作将1寫入到atom原子對象,
// 然後傳回atom原先的值——0
int value = atomic_exchange(&atom, 1);
printf("value = %d, atom = %d\n", value, atomic_load(&atom));
// 我們可以再來一遍
value = atomic_exchange(&atom, 2);
// 這裡輸出:value = 1, atom = 2
printf("value = %d, atom = %d\n", value, atomic_load(&atom));
}
我們可以自己嘗試一下,用 atomic_exchange 原子操作來實作 atomic_flag_test_and_set 操作的語義,若有不太明白的地方歡迎留言。
五、整數原子類型對象的比較與交換操作:
C11标準中的整數原子類型對象的比較與交換操作其實就對應了本文一開始所提到的CAS原子操作。C11标準中給出了四個原子比較與交換操作的版本,兩個是預設存儲器次序的,另外兩個是顯式指定存儲器次序的。
_Bool atomic_compare_exchange_strong(volatile A *object, C *expected, C desired);
_Bool atomic_compare_exchange_strong_explicit(volatile A *object, C *expected, C desired,
memory_order success, memory_order failure);
_Bool atomic_compare_exchange_weak(volatile A *object, C *expected, C desired);
_Bool atomic_compare_exchange_weak_explicit(volatile A *object, C *expected, C desired, memory_order success, memory_order failure);
這裡有strong版本與weak版本。它們的語義都差不多,均實作了之前提到的CAS語義邏輯。對于預設存儲器次序的操作而言,strong與weak版本都提供了三個參數,第一個參數指向某個整數原子類型對象;第二個參數指向要進行比較的對象,并且如果比較失敗,那麼該操作會将原子對象的目前值拷貝到該參數所指向的對象中;第三個參數指定存儲到原子對象中的值。
如果比較成功,那麼desire值會被存放到原子對象中,并且傳回 true;如果比較失敗,那麼目前原子對象的值會被拷貝到expected所指向的對象中,并且傳回 false。
strong版本與CAS的語義完全一緻,而weak版本則有些差別。weak版本可能在目前比較成功的情況下,也會被判定為失敗。C11标準之是以加入weak語義是為了能使更多的原子操作機制來實作CAS功能,比如通過LL-SC機制來實作CAS原子操作的話,weak版本會更好一些。
那麼我們應該如何去選擇呢?C11标準建議,如果我們采用像之前提到的,通過循環去測試CAS比較是否成功的話,那麼使用weak版本在某些平台上能獲得更好的性能;如果我們隻是單獨對某個原子對象做一次CAS操作,而目前不管這次操作是否成功的話,那麼用strong版本更好一些。下面我們來舉些例子。
#include <stdio.h>
#include <stdatomic.h>
#include <stdbool.h>
int main(int argc, const char * argv[])
{
volatile atomic_int atom = 0;
// 我們先從atom原子對象加載其值
int expected = atomic_load(&atom);
// 我們就對atom原子對象操作一次,是以這裡用strong版本。
// 如果比較成功,就将1存儲到atom原子對象中
bool equal = atomic_compare_exchange_strong(&atom, &expected, 1);
// 這裡輸出:Is equal? 1, atom value is: 1
printf("Is equal? %d, atom value is: %d\n", equal, atomic_load(&atom));
// 我們再次加載atom的值
expected = atomic_load(&atom);
// 我們對expected進行了修改
expected += 10;
// 由于這次比較,expected所存儲的值與atom的值不相等,
// 是以将atom的值重新存放到expected中,且傳回false。
equal = atomic_compare_exchange_strong(&atom, &expected, -1);
// Is equal? 0, expected value is: 1
printf("Is equal? %d, expected value is: %d\n", equal, expected);
}
通過這個例子,相信各位對atomic_compare_exchange操作已經有了感性認識了吧~
下面筆者将為大家來示範一下,如何通過比較與交換原子操作來實作針對一個浮點數的多線程遞增計算。
#include <stdio.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <pthread.h>
/// 定義一個将被多線程共享的整數原子對象,
/// 它後面将會被充當一個單精度浮點數
static volatile atomic_int sAtomicFLoatObject;
/// 對多線程共享的原子對象進行求遞增操作
/// @param nLoops 指定對共享原子對象操作幾次
static void AtomicValueInc(int nLoops)
{
// 這裡對共享原子對象操作nLoops次
for(int loop = 0; loop < nLoops; loop++)
{
// 先讀取sAtomicFLoatObject的目前值
int orgValue = atomic_load(&sAtomicFLoatObject);
float dstValue;
do
{
// 我們将orgValue所表示的單精度浮點數萃取出來,
// 保證不損失任何精度,然後在此基礎上遞增0.1
dstValue = *(float*)&orgValue + 0.1f;
}
// 由于我們這裡需要最終獲得正确的值,是以這裡用了weak版本,
// 在循環條件下對于某些硬體平台能獲得更好的性能
while(!atomic_compare_exchange_weak(&sAtomicFLoatObject, &orgValue, *(int*)&dstValue));
}
}
/// 線程處理函數
static void* ThreadProc(void *args)
{
// 在使用者線程中執行10000次
AtomicValueInc(10000);
return NULL;
}
int main(int argc, const char * argv[])
{
const float zero = 0.0f;
// 我們這裡為了展示所使用的一些“黑科技”,
// 而顯式地用單精度浮點數所表示的IEEE整數來為
// sAtomicFLoatObject進行初始化
atomic_init(&sAtomicFLoatObject, *(int*)&zero);
pthread_t threadID;
// 建立線程并排程執行
if(pthread_create(&threadID, NULL, ThreadProc, NULL) != 0)
{
puts("Failed to create a thread!");
return 0;
}
// 在主線程中執行10000次
AtomicValueInc(10000);
// 等待線程執行完畢
pthread_join(threadID, NULL);
// 輸出最終結果
const int result = atomic_load(&sAtomicFLoatObject);
printf("The final result is: %f\n", *(float*)&result);
// 由于計算精度關系,最終結果可能不會正好為2000.0f,
// 是以,我們可以在寫一個簡單的算法進行驗證結果的正确性!
float sum = 0.0f;
for(int i = 0; i < 20000; i++) {
sum += 0.1f;
}
// 由于算法相同,在沒經過任何優化的情況下,
// 兩者在IEEE二進制表達上應該是完全一緻的!
if(sum == *(float*)&result)
puts("Equal!");
}
同樣,這裡也用到了pthread庫,是以如果各位在Linux環境下編譯運作的話需要添加 -pthread 編譯選項,macOS、iOS等Apple系統環境則不需要,pthread是被預設連接配接的。此外,上述代碼以及後續代碼都要用到C11标準,是以各位所使用的編譯器如果稍舊的話(比如GCC 4.8,Clang 3.6),那麼必須顯式地加上 -std=gnu11 編譯選項。
而在Windows系統下MSVC沒有提供原子操作的庫,筆者這裡為Windows平台的開發者封裝了一個,可供使用:https://github.com/zenny-chen/simple-stdatomic-for-VS-Clang
六、整數原子類型對象的基本算術邏輯操作:
C11标準中提供了針對整數原子類型對象的基本算術邏輯操作,這又被稱為原子擷取與修改(atomic fetch and modify)操作。這裡各位需要注意的是,以下這些操作不适用于 atomic_bool 原子類型,而隻能應用于除此之外的其他整數原子類型。
原子擷取與修改操作有如下這些品種:加法(add),減法(sub),按位與(and),按位或(or),按位異或(xor)。每種原子擷取與修改操作都有兩版本,一個版本為 預設存儲器次序,另一個版本為顯式指定存儲器次序。其函數原型如下所示:
C atomic_fetch_<key>(volatile A *object, M operand);
C atomic_fetch_<key>_explicit(volatile A *object, M operand, memory_order order);
上述函數原型的辨別符中,<key>對應于具體操作名稱,對于原子加法,其<key>就是 add;對于原子按位與操作,其<key>就是 and。對于預設存儲器次序的版本,這些函數具有兩個參數,第一個參數指向某個整數原子對象;第二個參數為修改操作的操作數,比如對于加法操作就是“加數”,對于減法操作則是“減數”;而原子對象則分别作為“被加數”和“被減數”。
之是以稱這些原子操作為“原子擷取與修改”操作,是因為這些原子操作的步驟都是先擷取指定原子對象的目前值,然後在此基礎上做算術邏輯運算,最後将計算結果寫入到該原子對象中并傳回該原子對象做此操作之前的值。這一過程很明顯,就是先擷取後修改。下面我們來看一些代碼例子。
#include <stdio.h>
#include <stdatomic.h>
int main(int argc, const char * argv[])
{
volatile atomic_int atom = 10;
// 這裡對原子對象atom做原子加法操作,
// 将它與5相加,再将結果存入該原子對象
int value = atomic_fetch_add(&atom, 5);
// 輸出:value = 10, atom = 15
printf("value = %d, atom = %d\n", value, atomic_load(&atom));
// 這裡對原子對象atom做原子減法操作,
// 将它與8相減,再将結果存入該原子對象
value = atomic_fetch_sub(&atom, 8);
// 輸出:value = 15, atom = 7
printf("value = %d, atom = %d\n", value, atomic_load(&atom));
// 這裡對原子對象atom做原子按位異或操作,
// 将它與7做按位異或j運算,再将結果存入該原子對象
value = atomic_fetch_xor(&atom, 7);
// 輸出:value = 7, atom = 0
printf("value = %d, atom = %d\n", value, atomic_load(&atom));
}
原子擷取與修改操作能應用在很多場合,比如我們要利用多線程對某些資源進行計算,然後進行彙總時可能就會對其中一個共享資源做原子擷取與修改操作,這也屬于我們在作業系統中常用的“fork-join”的一種機制。
下面我們将舉一個比較實際的例子。假定我們有100個數組,每個數組有10000個元素,我們現在要對這1000個數組中的所有元素進行求和操作,我們怎麼算比較快呢?傳統的思路是先檢視我們目前的計算環境有多少CPU,每個CPU含有多少核心,然後進行平均劃分。但這裡有個問題是,計算機系統往往不會隻有我們目前一個前台程式在運作,可能會有其他一些背景任務,甚至有一些高優先級的任務需要處理等等,比如我們邊運作我們這個程式,可能又在聽音樂,開着浏覽器在網上沖浪等等……所有這些任務都需要占用CPU資源。
是以,一種可能更好的方法是仍然針對核心個數開線程(比如你的CPU有四個核心,就開四個線程),但是每個線程不是平均配置設定給它所要計算的元素個數,而是給一批,這樣每個線程完成一批資料處理之後再去取下一批進行計算。這樣即便某些線程受到其他任務排程而被阻塞,但也不至于使目前的任務被“卡住”,其他線程可以“接手”它後面所要計算的資源。
是以,對于下面這個demo,為了簡單起見,筆者仍然用兩個線程,每個線程一次疊代就處理其中一個數組的所有元素之和,然後接着取下一個可操作的數組。我們利用原子加法操作來操縱目前所要操作數組的索引。這種解決多線程并行任務的方法想必能給各位一定的啟發。
#include <stdio.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <pthread.h>
/// 我們定義了帶有100個元素的數組,
/// 每個數組元素是一個含有10000個int元素的數組
static int sArrays[100][10000];
/// 此整數原子對象用于訓示目前線程所要操作的數組索引
static volatile atomic_int sAtomicArrayIndex = 0;
/// 此整數原子對象用于存放最終的求和結果
static volatile atomic_int sAtomicArraySum = 0;
/// 對共享數組進行求和操作
/// 如果目前數組還沒計算完,傳回true;否則傳回false
static bool AtomicComputeArraySum(void)
{
// 擷取目前所要計算的數組個數
const int nLoops = (int)(sizeof(sArrays[0]) / sizeof(sArrays[0][0]));
// 擷取數組sArrays總共有多少元素
const int arrayLen = (int)(sizeof(sArrays) / sizeof(sArrays[0]));
// 利用原子加法來擷取目前所要操作數組的索引
const int currArrayIndex = atomic_fetch_add(&sAtomicArrayIndex, 1);
// 若目前索引已經達到了數組長度,則直接傳回false,說明數組已經全部計算完成
if(currArrayIndex >= arrayLen) {
return false;
}
// 對目前指派到的數組元素進行求和
int sum = 0;
for(int index = 0; index < nLoops; index++) {
sum += sArrays[currArrayIndex][index];
}
// 将結果進行累加
atomic_fetch_add(&sAtomicArraySum, sum);
return true;
}
/// 線程處理函數
static void* ThreadProc(void *args)
{
// 在使用者線程中計算
while(AtomicComputeArraySum());
return NULL;
}
int main(int argc, const char * argv[])
{
// 擷取數組每個元素的數組長度
const int nElems = (int)(sizeof(sArrays[0]) / sizeof(sArrays[0][0]));
// 擷取數組sArrays總共有多少元素
const int arrayLen = (int)(sizeof(sArrays) / sizeof(sArrays[0]));
// 我們先對共享的二維數組進行初始化,
// 為了友善驗證結果,将它所有數組的所有元素初始化為1
for(int i = 0; i < arrayLen; i++)
{
for(int j = 0; j < nElems; j++) {
sArrays[i][j] = 1;
}
}
pthread_t threadID;
// 建立線程并排程執行
if(pthread_create(&threadID, NULL, ThreadProc, NULL) != 0)
{
puts("Failed to create a thread!");
return 0;
}
// 在主線程中計算
while(AtomicComputeArraySum());
// 等待線程執行完畢
pthread_join(threadID, NULL);
// 輸出最終結果
const int result = atomic_load(&sAtomicArraySum);
printf("The final result is: %d\n", result);
}
C11标準中的所引入的存儲器次序機制
C11标準引入了一組存儲器次序枚舉類型:
typedef enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
} memory_order;
用于指定存儲器通路按哪種次序進行,這裡不僅針對原子對象,而且也能包含正常的、非原子的存儲器通路。在一個多核多處理器環境下,當多個線程同時對幾個變量以寬松的存儲器次序(即使用memory_order_relaxed)進行讀寫時,其中一個線程所觀察到的這些變量值的變化與另一個寫這些變量的線程所見的次序可能是不同的。實際上,甚至在多個讀線程之間,這些變量值變化的所見次序也可能是不同的。此外,在單核單線程環境下,C11也允許使用這組存儲器次序,因為出于優化目的,編譯器可以重新編排互相獨立的讀寫操作順序。
我們如何來了解基于多核多線程的存儲器次序呢?對于不同線程所觀察到的若幹變量的讀寫次序而有所不同,是如何引發的呢?這得從多核處理器的存儲器結構層級說起。
現代多核處理器的存儲器層級一般分為多個層,最靠近CPU的為L1 Cache,容量最小但速度最快,并且它是僅針對一個處理器核心獨享的。
然後再向外一層是L2 Cache,在某些移動裝置上,它是被所有核心共享的,而在其他一些裝置以及主流的桌面處理器上它也是被單個核心獨享的,并且其容量比L1 Cache更大一些,速度也稍慢一些。
然後再向外一層是L3 Cache,當然有些中低端裝置可能沒有L3 Cache。L3 Cache是被所有核心共享的。其容量很大,目前來說基本都至少1MB了,不過速度比L2 Cache要慢。此外,如果它作為最後一層Cache的話(簡稱為LLC),那麼它也可能被核心GPU所共享。
最後就是我們上面提到的LLC,如果有L4 Cache的話就是L4 Cache,否則就是L3 Cache。上面提到了,LLC一般是給整個系統所共享。目前有些系統使用eDRAM作為LLC,其容量可以做得很大,當然速度也會更慢一些,一般來說SRAM的速度會更快一些,但它肯定比需要通過總線才能通路的外部DDR要來得快了~
然後最外部的就是外部存儲器了。
是以我們看到,我們用C語言寫一句看似很簡單的加載或存儲指派語句,而對于CPU來說可能要做非常多的工作,這其中就是既要保證一定的訪存效率,還得保證Cache緩存的資料一緻性等。是以,對于現代不少種類的處理器架構而言,都在其系統中引入了更弱的存儲器次序,使得整個訪存效率能得以提升。下面我們舉一個相對比較容易了解的一個例子來說明這種所謂的觀察到的存儲器次序不同的情景。
上圖中展示了一個具有三層Cache層級,并且有兩個處理器核心的系統。向下箭頭表示存儲(寫)操作;向上箭頭表示加載(讀)操作。并且左邊的操作時序早于右邊的操作。這裡有兩個被兩個核心線程所共享的變量x和y,同時假定右邊的核心線程先做 y = 0 的存儲操作,使得在它的Cache中都安排好了存放變量y的Cache條目(entry),并且假定此時沒有關于任何針對變量x的Cache條目。
首先,右邊核心線程先對y進行 y = 0 的存儲操作,等該操作全部完畢後再執行左邊核心線程的操作。
左邊核心線程先做 x = 1 的存儲操作,緊接着再做 y = 2 的存儲操作。完成之後,右邊核心線程立即對x和y進行讀取。
我們從圖中可觀察到,x和y的存儲操作依次經過了左邊核心的L1 Cache,L2 Cache,再是兩個核心共享的L3 Cache,最後到外部存儲器。此時,L3 Cache中已經有了x和y這兩個變量所對應的Cache 條目。是以在右邊核心讀的時候,x和y的寫次序基本是一緻的。
然後到了右邊核心的L2 Cache,由于之前沒有x相關的Cache條目,是以此時L2 Cache控制器會進行判斷是直接将它配置設定給一個空白的Cache條目還是将已有的Cache條目進行逐出,然後将變量x所處的Cache行添加到Cache條目中。這裡就會有一些延遲與存儲器重新安排的情況。此時,由于變量y已經處于Cache條目中,是以它有可能被直接寫回(write back),隻要之前針對x的Cache行的安排過程不将y所處的Cache行逐出。
這麼一來,右邊核心線程所觀察到的寫次序就會變為先寫y再寫x了。
當然,上述情況僅僅是存儲器次序的某一種,像x86、ARM處理器中均引入了非臨時(Non-Temporal)加載與存儲操作,這些操作不會通過Cache,而是直接針對外部存儲器控制器進行訪存。而它們的訪存次序就是典型的弱存儲器次序,因為即便在總線上都會有各種不同情況發生。這就好比,我們在做網絡通信的時候會碰到,先發送的請求反而後送達的情況。最簡單的例子,比如我們用微信或QQ在發消息,如果此時網絡信号不好,你會看到之前發送的幾條消息都在“轉圈圈”,等信号好的時候,往往是之前最後發送的那條消息率先送達給對方~筆者已經遇到過不少次這種情況了。
我們在上一章已經看到了,C11标準中所引入的大部分原子操作都有兩個版本,其中一個是具有預設存儲器次序的原子操作;還有一個則是顯式指定存儲器次序的原子操作。對于預設存儲器次序的原子操作而言,其存儲器次序為最嚴格的 memory_order_seq_cst,它表示目前的原子操作必須滿足存儲器順序一緻的(sequentially consistent)。一般來說,預設的存儲器次序,即 memory_order_seq_cst,對于某些場景下可能會過于嚴苛,進而會影響整體性能。而對于顯式指定的存儲器次序,無論是處理器系統還是編譯器都必須嚴格遵循所指定存儲器次序的限制條件,所實作的存儲器次序強度不能弱于所指定的存儲器次序類型。比如:
volatile atomic_int atom = 0;
// 這裡使用acquire次序加載atom原子對象
int value = atomic_load_explicit(&atom, memory_order_acquire);
上述代碼用了 memory_order_acquire 存儲器次序去加載atom原子對象。那麼無論是處理器系統還是編譯器實作,對atom原子對象加載所用的存儲器次序不能是比 memory_order_acquire 更弱的次序(比如 memory_order_relaxed);當然,比它更強沒有問題,比如使用 memory_order_seq_cst 完全順序一緻的存儲器次序。
C11标準中的栅欄操作
在正式描述上述列出的六種存儲器次序之前,我們這裡先插播一條關于栅欄操作的消息。有時候,我們可能對多線程所共享變量的不要求對它用原子操作,而僅僅想確定在某個點,對這些共享變量通路可見的次序一緻性。C11提供了一種栅欄操作可滿足此需求,其原型為:
void atomic_thread_fence(memory_order order);
我們看到,它就一個參數,用于指定目前操作的存儲器次序,并且沒有指明針對某一對象進行操作,而是在目前點對所有對存儲器次序具有依賴性的操作均起作用。
我們後面會談到存儲器次序依賴性(Dependency-ordered)以及依賴鍊(dependency chain)。
C11标準中的六種存儲器次序
下面我們就來詳細談談這六種存儲器次序。這裡先介紹C11标準對這些存儲器次序的大概定義,因為有些概念會互相穿插,是以把這些存儲器次序都列完再做更深入的描述。
1. memory_order_relaxed:對目前操作的其他讀寫不施加任何同步或排序上的限制。如果用此次序的目前操作為原子操作,那麼僅僅保證該操作的原子性。
2. memory_order_consume:帶有此存儲器次序的一次加載操作在受影響的存儲器位置執行了一次消費操作(consume operation):在目前線程中依賴于目前加載值的任何讀或寫都不能在此加載操作之前重新排序。在其他線程中,釋放同一原子變量的對具有資料依賴變量的寫在目前線程中是可見的。在大部分平台上,此存儲器次序隻是影響了編譯器優化。另外,消費操作引入了存儲器次序依賴性。
3. memory_order_acquire:帶有此存儲器次序的加載操作在受影響的存儲器位置執行*獲得操作*(*acquire operation*):在目前線程,沒有讀和寫在此加載之前可以被重新排序。在其他線程中,釋放同一原子變量的所有寫在目前線程中是可見的。
4. memory_order_release:帶有此存儲器次序的一次存儲操作執行釋放操作(release operation):在目前線程中,沒有讀和寫可以在此存儲之後被重新排序。在目前線程中對原子變量的所有寫對其他線程中獲得同一原子變量的操作是可見的。并且對原子變量攜帶依賴的寫在其他線程中消費同一原子變量的操作也變為可見的。
5. memory_order_acq_rel:帶有此存儲器次序的一次讀-修改-寫操作同時具備了一次獲得操作和一次釋放操作。在目前線程中,沒有存儲器讀和寫可以在此存儲之前或之後被重新排序。在其他線程中,釋放同一原子變量的所有寫在此修改前都是可見的(通過目前線程的此操作的acquire語義);并且此修改對其他線程中獲得同一原子變量的操作是可見的(通過目前線程的此操作的release語義)。
6. memory_order_seq_cst:帶有此存儲器次序的一次加載操作執行一個獲得操作,而一次存儲則執行一次釋放操作,并且一次讀-修改-寫操作同時執行一次獲得操作和一次釋放操作,外加一單個總和次序,在所有線程中均以相同次序觀察到對同一原子變量的所有修改。
在以上六種存儲器次序中,除了松弛(relax)存儲器次序,其他主要圍繞着獲得(acquire)語義和釋放(release)語義在講。我們不需要對這些概念感到恐慌,因為它們其實是非常自然的。從一般程式邏輯上講,當我們要加載一個多線程共享原子對象時,我們肯定要拿到目前最新的資料(或狀态),并且對于具有“獲得”語義操作的原子對象往往會以“鎖”的形式出現,我們可以回顧一下(上)篇文章開頭時介紹SWAP操作的那段僞代碼。
這也就意味着在做“獲得”語義的時候,我們肯定不想讓将作用于共享臨界資源的對象在此獲得操作之後産生副作用吧?否則的話,在臨界區中對該對象的使用可能仍然是無效的。而“釋放”語義往往用于伴随着存儲操作,我們使用“釋放”語義通常可用于釋放一個鎖,這就使得釋放操作後面的那些訪存操作不應該被提前到釋放操作之前,否則的話也相當于鎖失效。
為了幫助大家了解獲得語義和釋放語義,筆者這裡通過“基于鎖的”原子操作更形象地幫助大家了解。
上圖中,虛線箭頭表示目前線程的釋放操作對其他線程可見。()裡的單詞描述了目前操作所使用的存儲器次序。如果沒有(),則表示使用松弛的存儲器次序。
我們可以看到,這裡線程B先執行,線程A後執行,然後一開始是在目前上下文中針對某個數組做求和計算,然後把結果給sum。大家注意,這裡的sum是在目前線程中獨有的,而不是多線程共享的。是以整個操作不采用任何存儲器次序,換句話說,其存儲器次序是松弛的。
然後到下面,“獲得鎖”這個操作同時具有“獲得”語義和“釋放語義”。這裡使用獲得語義使得前面的對sum對象的指派操作不會被安排到“獲得鎖”操作的下面,也就是說,“獲得鎖”這個操作執行的時候,一定對sum的指派所産生的副作用可見。這麼一來,sum的值對于與之下面的多線程共享原子對象的求和操作確定是有效的。此外,這裡的“獲得”語義也使得目前的“獲得鎖”操作能“看見”其他線程對此鎖的“釋放”操作。而這裡使用“釋放”語義也是為了告訴其他線程,目前已經把鎖給鎖了。當然,如果此時上鎖失敗,那麼我們就不需要使用“釋放”語義。是以我們看到像C11标準中的 atomic_compare_exchange_weak_explicit 函數原型,對成功和失敗各設定了一個存儲器次序參數。
再下面對多線程共享原子對象的求和操作也同時用了“獲得”語義和“釋放”語義。這裡使用這兩個語義跟目前線程中的操作安排沒啥關系,畢竟它前後都有了“獲得”語義跟“釋放”語義的保護,已經不會被随便安排了,這裡主要是對外的可見性。畢竟這裡是對多線程共享原子對象的操作,是以這裡既要保證該原子對象在目前線程可見到外部線程對它的修改(是以用了獲得語義),而且在目前線程對它的修改也要讓其他線程可見(是以用了釋放語義)。
再下面是“釋放鎖”操作。這裡使用“釋放”語義非常自然,一方面在目前線程不讓它後續的訪存操作被重新安排到它前面去(否則的話,後面的列印結果未必是計算完整的。);另一方面,目前線程對鎖釋放後要對其他線程可見。
最後就是對共享原子對象值的擷取。這裡不需要添加任何存儲器次序,因為它前面的釋放操作已經確定了本次操作是在整個臨界區域結束之後才執行,更術語化地來說,它前面的釋放操作確定了之前對該多線程共享的原子操作的計算所産生的副作用對目前操作可見。