天天看點

[modern c++] 無鎖的concurrency資料結構

前言:

前面讨論了使用 ​​“全局加鎖serilization資料結構” ​​​ 和 ​​“局部加鎖的concurrency資料結構”​​ ,這二者都離不開鎖的控制,隻要有鎖就會有時間損耗,最主要的是會引入鎖的副作用,比如 死鎖、活鎖 以及 優先級翻轉,那麼無鎖的 concurrency 資料結構就顯得 既高效又安全。

注:這裡說的無鎖不是說整個流程中一個鎖都沒有,而是不進行顯式加鎖,不人為顯式加鎖就不會引入鎖的副作用(所謂的lock-free可以了解為 free of lock disadvantages),我們在某種意義上認為它是無鎖的,因為雖然(隐式地)用了鎖,但是沒有引入鎖的副作用。比如 atomic 庫中除了 atomic_flag 外,其他類型的内部都是通過鎖來實作的,但是這裡我們使用任何atomic類都不認為是加鎖的,因為使用atomic不會引入鎖的副作用。另外需要說明,無鎖的concurrency 設計離不開 atomic類 和 memory order

簡單的無鎖棧:

簡化版:

template<typename T>
class lock_free_stack
{
private:
    struct node
    {
        T data;
        node* next;
        node(T const& data_) :
            data(data_)
        {}
    };
    std::atomic<node*> head;
public:
    void push(T const& data)
    {
        node* const new_node = new node(data);
        new_node->next = head.load();
        while (!head.compare_exchange_weak(new_node->next, new_node));
    }
    void pop(T& result)
    {
        node* old_head = head.load();
        while (!head.compare_exchange_weak(old_head, old_head->next));
        result = old_head->data;
    }
};      

上述代碼如何保證push的原子性?

棧的一個競争條件在于“每次壓棧時,棧頂元素替換是一定不能出現A-B-A的問題”,比如,線程A和B同時獲得了棧頂指針,那麼兩個線程在壓棧的時候誰先成功誰就會被替換掉,因為後壓棧的線程會向舊的棧頂壓棧,這會覆寫掉新的棧頂。那麼我們便可以通過compare_exchange_weak 的循環判斷來確定兩個線程最終會順序壓棧。因為即便A和B同時獲得了棧頂指針,當後壓棧的線程試圖壓棧時會發現 記憶體位址内的值 和 expected的不一樣,此時會把記憶體位址内的最新值(先壓棧的線程給的值)讀出來然後指派給expected在進行下一次判斷,如果還是不一樣,那麼再讀出來指派給expected,知道成功為止。由于是循環進行的,往往執行效率很高,一般情況下第二次都會成功。

上述代碼如何保證pop的原子性?

同push。

缺陷一:每個節點中的data不存在智能記憶體管理,如果T不是指針類型,那麼data将跟随節點一并釋放;如果是指針類型(大部分情況下都是指針類型),那麼需要手動釋放。

缺陷二:pop動作之後,舊的 head node 需要手動釋放。

節點内容自動釋放版:

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    node* next;
    node(T const& data_) :
      data(std::make_shared<T>(data_))
    {}
  };
  std::atomic<node*> head;
public:
  void push(T const& data)
  {
    node* const new_node = new node(data);
    new_node->next = head.load();
    while (!head.compare_exchange_weak(new_node->next,new_node));
  }

  std::shared_ptr<T> pop()
  {
    node* old_head = head.load();
    while (old_head &&
      !head.compare_exchange_weak(old_head, old_head->next));
    return old_head ? old_head->data : std::shared_ptr<T>();
  }
};      

上述代碼保證了 node 中 data是可以被自動釋放的,但是無法保證 pop之後的 舊 head node 會被正常釋放,因為 std::atomic<node*> head 這裡是 node* ,且也沒辦法弄成智能指針,是以需要人為delete,但是我們又無法保證在delete的時候其他所有線程都不再使用此節點,是以這個缺陷從上一個版本中延續了下來: “ 缺陷二:pop動作之後,舊的 head node 需要手動釋放。”

節點内容和節點都自動釋放:

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    std::shared_ptr<node> next;
    node(T const& data_) :
      data(std::make_shared<T>(data_))
    {}
  };
  std::shared_ptr<node> head;
public:
  void push(T const& data)
  {
    std::shared_ptr<node> const new_node = std::make_shared<node>(data);
    new_node->next = head.load();
    while (!std::atomic_compare_exchange_weak(&head,
      &new_node->next, new_node));
  }
  std::shared_ptr<T> pop()
  {
    std::shared_ptr<node> old_head = std::atomic_load(&head);
    while (old_head && !std::atomic_compare_exchange_weak(&head,
      &old_head, old_head->next));
    return old_head ? old_head->data : std::shared_ptr<T>();
  }
};      

這版不需要再關心廢棄head節點的釋放問題,且通過 atomic_load實作對 node結構體的原子讀(讀結構體是從記憶體位址依次取值,是以中途可能會被改變,如果被改變,則讀到的資料一半是正确的一半是錯誤的)。

這個版本在大部分情況下是能正常工作的,唯一可能出問題的點是 shared_ptr 的計數可能不準,這個也不是我們改關心的。

繼續閱讀