天天看點

Userspace RCU的使用與原理

本作品采用​​知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協定​​進行許可。

本作品 (李兆龍​ 博文, 由 李兆龍​ 創作),由 李兆龍 确認,轉載請注明版權。

所讨論的核心版本為2.6.26。

引言

我們先來看看維基對于RCU的描述:

  • In computer science, read-copy-update (RCU) is a synchronization mechanism that avoids the use of lock primitives while multiple threads concurrently read and update elements that are linked through pointers and that belong to shared data structures (e.g., linked lists, trees, hash tables).[1]
  • Whenever a thread is inserting or deleting elements of data structures in shared memory, all readers are guaranteed to see and traverse either the older or the new structure, therefore avoiding inconsistencies (e.g., dereferencing null pointers).[1]
  • It is used when performance of reads is crucial and is an example of space–time tradeoff, enabling fast operations at the cost of more space. This makes all readers proceed as if there were no synchronization involved, hence they will be fast, but also making updates more difficult.
  • 在計算機科學中,read-copy-update是一種同步機制,多個線程同時讀取和更新通過指針連結并屬于共享資料結構的元素(例如,連結清單,樹 ,哈希表)可避免使用鎖原語。
  • 每當線程在共享記憶體中插入或删除資料結構的元素時,都保證所有讀取都可以檢視和周遊舊結構或新結構,是以避免了不一緻(例如,取消引用空指針)。
  • 當讀取的性能至關重要時,可以使用它,它是時空權衡的一個示例,可以以更多空間為代價實作快速操作。 這使所有讀者都像沒有同步一樣繼續進行,是以它們将很快,但也會使更新更加麻煩。

RCU鎖無疑是提升小資料塊多讀操作效率極高的一種方案,因為可以做到任何資料讀操作完全無鎖(這難道不值得歡呼嗎!),當然寫操作在絕大多數情況下都需要上一把自旋鎖以保護多個寫操作之間的臨界區[1]。

Userspace RCU的使用與原理

讀寫操作之間的聯系是一件不太好做的事情,原因其實就是​

​Grace period(寬限期)​

​​,核心中的做法[1]非常優雅,不但簡單,直覺,而且對使用者代碼的侵入性也很低,其在​

​rcu_read_lock​

​​和​

​rcu_read_unlock​

​接口中分别關開中斷,這樣使得讀操作臨界區中不發生上下文切換,這樣對于寫操作來說,隻要每個核都經曆過一次上下文切換就可以退出寬限期了。

但是使用者态就想要無入侵代碼的做到這件事情就比較麻煩了,這裡比較知名的使用者态RCU庫就是[5](當然這會也就400不到的star), 這裡讀操作首先要加入全局連結清單,并設定為上線狀态(入侵較大),這樣在寫操作等待寬限期的時候就可以自旋知道是否可以退出寬限期,退出條件為所有的上線的讀線程資料為最新,在讀操作調用​

​rcu_quiescent_state​

​接口即可更新目前的最新資料版本,一般讀操作結束以後使用。

學習這個玩意的目的主要是為了完善我撸的那個HotRing庫[7],明眼人都能看出來其中最複雜的部分就是無鎖并發操作,這方面需要花點功夫,是以在幹活之前養精蓄銳一下,我想這還是很有必要的。

這篇文章主要是對​

​liburcu​

​​中的​

​test_urcu_qsbr.c​

​測試檔案做一個簡單的源碼分析,以此了解這個庫的基本用法與原理。

正文

首先這個測試的參數需要我們自己去配置,懶得看源碼的同學可以使用這個指令來跑測試​

​./test_urcu_qsbr 5 5 5 -v -c 1000000 -e 100000​

​,其中各個參數的意義如下:

  1. 前三個參數,讀線程數,寫線程數,測試運作時長
  2. -v 顯示調試資訊
  3. -c 每一個讀操作的間隔時長,用于測試寬限期
  4. -e 每一個寫操作的間隔時長
  5. 間隔時長機關為時鐘周期,其調用所傳參數次​

    ​__asm__ __volatile__ ("rep; nop" : : : "memory")​

    ​指令[8]

前面的代碼沒有看的必要,其實下面的代碼也比較簡單,放在這裡主要是為了看看我們擁有哪些線程。

// 前兩個機關為pthread_t,後兩個機關為unsigned long long,當然都是指針
  tid_reader = calloc(nr_readers, sizeof(*tid_reader));  // 存儲所有讀線程的tid
  tid_writer = calloc(nr_writers, sizeof(*tid_writer));  // 存儲所有寫線程的tid
  count_reader = calloc(nr_readers, sizeof(*count_reader));  // 存儲運作線程函數的參數,傳入一個指針,是以是同步修改的
  count_writer = calloc(nr_writers, sizeof(*count_writer));

  next_aff = 0;

  for (i_thr = 0; i_thr < nr_readers; i_thr++) {
    err = pthread_create(&tid_reader[i_thr], NULL, thr_reader,
             &count_reader[i_thr]);
    if (err != 0)
      exit(1);
  }
  for (i_thr = 0; i_thr < nr_writers; i_thr++) {
    err = pthread_create(&tid_writer[i_thr], NULL, thr_writer,
             &count_writer[i_thr]);
    if (err != 0)
      exit(1);
  }

  cmm_smp_mb();  // mfence,全屏障。序列化程式指令流中在MFENCE指令之前發生的所有 store 和 load 操作

  test_go = 1;

  sleep(duration);  // 測試線程跑duration秒

  test_stop = 1;  // 測試線程循環結束

  for (i_thr = 0; i_thr < nr_readers; i_thr++) {
    err = pthread_join(tid_reader[i_thr], &tret);
    if (err != 0)
      exit(1);
    tot_reads += count_reader[i_thr];  // 總讀操作數
  }
  for (i_thr = 0; i_thr < nr_writers; i_thr++) {
    err = pthread_join(tid_writer[i_thr], &tret);
    if (err != 0)
      exit(1);
    tot_writes += count_writer[i_thr];  // 總寫操作數
  }      

我們來看看讀線程做的事情,這裡就逐漸有意思起來了:

void *thr_reader(void *_count)
{
  unsigned long long *count = _count;
  int *local_ptr;

  printf_verbose("thread_begin %s, tid %lu\n",
      "reader", urcu_get_thread_id());  // 執行時加上-v 就可以顯示出來,看做printf就可以

  set_affinity();      // 修改CPU親和性,預設關閉,使用use_affinity控制

  rcu_register_thread();  // 将自己挂接在全局連結清單registry上,執行讀操作必須調用這個

  assert(rcu_read_ongoing());
  rcu_thread_offline();
  assert(!rcu_read_ongoing());
  rcu_thread_online();      // 設定為上線狀态,即把每線程私有的urcu_qsbr_reader的ctr更新至與全局相同
  
  /*  // rcu_thread_online最後調用如下代碼
  static inline void _urcu_qsbr_thread_online(void){
    urcu_assert(URCU_TLS(urcu_qsbr_reader).registered);
    cmm_barrier();  // 優化屏障,禁止編譯期重排
    _CMM_STORE_SHARED(URCU_TLS(urcu_qsbr_reader).ctr, CMM_LOAD_SHARED(urcu_qsbr_gp.ctr));
    cmm_smp_mb(); // mfence
  }
  */

  while (!test_go)        // 看不出來有什麼用處
  {
  }
  cmm_smp_mb();          // mfence

  for (;;) {
    rcu_read_lock();      // 空操作,加了個寂寞,其實個人認為這裡是為了以後做準備,核心這裡的操作是有意義的
    assert(rcu_read_ongoing());
    local_ptr = rcu_dereference(test_rcu_pointer);  // 下面重點聊
    rcu_debug_yield_read();    // debug操作
    if (local_ptr)
      assert(*local_ptr == 8);
    if (caa_unlikely(rduration))// caa_unlikely與分支預測有關,其實就是rduration有效的話睡眠,可以給-c加兩個零試試
      loop_sleep(rduration);
    rcu_read_unlock();      // 空操作
    URCU_TLS(nr_reads)++;    // 讀操作次數加1
    /* QS each 1024 reads */  // 每1024次才調用同步,意味着如果rduration時間較長,寫操作會一直被阻塞
    if (caa_unlikely((URCU_TLS(nr_reads) & ((1 << 10) - 1)) == 0))
      rcu_quiescent_state();  // 每1024次調用rcu_quiescent_state,進入靜默期,更新自己的c_str,讓寫操作可以退出寬限期
    if (caa_unlikely(!test_duration_read()))  // 與test_stop有關,睡眠duration秒後在主線程中修改
      break;
  }

  rcu_unregister_thread();  // 從全局連結清單中去掉

  /* test extra thread registration */
  rcu_register_thread();
  rcu_unregister_thread();

  *count = URCU_TLS(nr_reads);
  printf_verbose("thread_end %s, tid %lu\n",
      "reader", urcu_get_thread_id());
  return ((void*)1);

}      

​rcu_dereference​

​調用過程如下:

#define// 在我的機器是個空操作,因為我的機器X86,弱模型,要是arm就該生成代碼了
        (_________p1);        \      // 
        })

#define// 在我的機器是優化屏障。arm的話就該是LFENCE了。
    _CMM_LOAD_SHARED(p);  \
  })

#define

#define      

因為指針指派(八位元組)本身是原子操作,是以指令級别操作是原子的[9]。為了cache一緻性,插上了記憶體屏障。可以讓其它的讀者/寫者可以看到保護指針的最新值。

我們再來看看寫線程的操作:

void *thr_writer(void *_count)
{
  unsigned long long *count = _count;
  int *new, *old;

  printf_verbose("thread_begin %s, tid %lu\n",
      "writer", urcu_get_thread_id());

  set_affinity();

  while (!test_go)
  {
  }
  cmm_smp_mb();

  // 和上面如出一轍
  
  for (;;) {
    new = malloc(sizeof(int));
    assert(new);
    *new = 8;
    old = rcu_xchg_pointer(&test_rcu_pointer, new);  // 一會我們細看,其實就是一個原子交換并傳回舊值
    if (caa_unlikely(wduration))
      loop_sleep(wduration);
    printf("before\n");
    synchronize_rcu();                // 等待寬限期
    printf("after\n");
    if (old)
      *old = 0;
    free(old);
    URCU_TLS(nr_writes)++;
    if (caa_unlikely(!test_duration_write()))
      break;
    if (caa_unlikely(wdelay))
      loop_sleep(wdelay);
  }

  printf_verbose("thread_end %s, tid %lu\n",
      "writer", urcu_get_thread_id());
  *count = URCU_TLS(nr_writes);
  return ((void*)2);
}      

我們先來看看​

​synchronize_rcu​

​的簡化代碼,隻看簡化的原因是原代碼太複雜了,不想看了。

void synchronize_rcu(void)
{
    CDS_LIST_HEAD(qsreaders);
    DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING);
    ......
    urcu_wait_add(&gp_waiters, &wait)  /* 将writer自身置于wait狀态 */
    ......
    wait_for_readers(®istry, &cur_snap_readers, &qsreaders); /* writer阻塞在這裡 */

    cmm_barrier();
    .....
}

// 這其實就是一個自旋等待的一個過程
static void wait_for_readers(struct cds_list_head *input_readers,
                 struct cds_list_head *cur_snap_readers,
                 struct cds_list_head *qsreaders)
{
    unsigned int wait_loops = 0;
    struct rcu_reader *index, *tmp;

    /*
     * Wait for each thread URCU_TLS(rcu_reader).ctr to either
     * indicate quiescence (offline), or for them to observe the
     * current rcu_gp.ctr value.
     */
    for (;;) {          /* 直到所有reader.ctr已經到最新才跳出循環 */
        uatomic_set(&rcu_gp.futex, -1);
        cds_list_for_each_entry(index, input_readers, node) {
        _CMM_STORE_SHARED(index->waiting, 1);
        
        /* 周遊所有輸入的reader */
        cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
            switch (rcu_reader_state(&index->ctr)) {
            case RCU_READER_ACTIVE_CURRENT:   /* reader.ctr已經最新 */
            case RCU_READER_INACTIVE:      /* reader處于offline狀态 */
                cds_list_move(&index->node, qsreaders); /* 從周遊清單中移除 */
                break;
            case RCU_READER_ACTIVE_OLD:     /* reader.ctr不是最新 */
                break;
            }
        }

        if (cds_list_empty(input_readers)) {
            uatomic_set(&rcu_gp.futex, 0);   /* 清單空了,表示所有reader已更新 跳出循環 */
            break;
        }
    }
}      

我們再來看看​

​rcu_xchg_pointer​

​,其設計調用如下代碼:

#define

#define

#define

// uatomic_xchg實際調用_uatomic_xchg
#define

static inline __attribute__((always_inline))
unsigned long __uatomic_exchange(void *addr, unsigned long val, int len)
{
  /* Note: the "xchg" instruction does not need a "lock" prefix. */
  switch (len) {
  case 1:
  {  // CMPXCHG
    unsigned char result;
    __asm__ __volatile__(
    "xchgb %0, %1"
      : "=q"(result), "+m"(*__hp(addr))
      : "0" ((unsigned char)val)
      : "memory");
    return result;
  }
  case 2:
  {
    unsigned short result;
    __asm__ __volatile__(
    "xchgw %0, %1"
      : "=r"(result), "+m"(*__hp(addr))
      : "0" ((unsigned short)val)
      : "memory");
    return result;
  }
  case 4:
  {
    unsigned int result;
    __asm__ __volatile__(
    "xchgl %0, %1"
      : "=r"(result), "+m"(*__hp(addr))
      : "0" ((unsigned int)val)
      : "memory");
    return result;
  }
#if
  case 8:  // 将第二個參數x放入寄存器中與第一個指針參數所指的内容交換,傳回所指内容原先的值。
  {
    unsigned long result;
    __asm__ __volatile__(  // 交換指令
    "xchgq %0, %1"
      : "=r"(result), "+m"(*__hp(addr))
      : "0" ((unsigned long)val)
      : "memory");
    return result;
  }
#endif
  }
  /*
   * generate an illegal instruction. Cannot catch this with
   * linker tricks when optimizations are disabled.
   */
  __asm__ __volatile__("ud2");
  return 0;
}      

這裡其實理論來講應該還要有一個記憶體屏障的,為了避免寫操作未完成時這個原子交換就已經執行,我認為這樣的原因可以在intel手冊中的一些文字解釋,就是記憶體屏障,帶#Lock字首的指令,I/O操作都會把資料載入記憶體,也就是重新整理​

​Invaildate queue​

​中的資料,而交換類指令本身假設一定帶#Lock指令:

Userspace RCU的使用與原理

而且鎖定指令保證原子是不收記憶體邊界是否對齊的影響的(在weakly ordered memory 的機器上可能出現例外,arm就是弱模型):

Userspace RCU的使用與原理

總結

好了這個簡單的樣例到這就結束了,liburcu中有五種類型的RCU鎖,這裡隻是分析了一下​

​qsbr​

​的使用和大概實作方式,前面也提到學習這個東西其實是為了我那個HotRing的輪子,與我而言暫時來說已經夠了,更多的探索大家有興趣可以自己玩玩,有關其使用就放到下一篇部落格再讨論了。

  1. Linux RCU機制詳解 (透徹)
  2. Userspace RCU原理
  3. RCU機制
  4. ​​Read-copy-update wiki​​
  5. ​​userspace-rcu liburcu​​
  6. HotRing: A Hotspot-Aware In-Memory Key-Value Store
  7. ​​Super-long HotRing-s​​
  8. pause 指令與 rep;nop
  9. 淺談記憶體屏障,C++記憶體序與記憶體模型