天天看點

Apollo 3.5 Cyber base代碼學習

Apollo 3.5 Cyber base代碼學習

    • `cyber/base/macros.h`
    • `cyber/base/atomic_fifo.h`
    • `cyber/base/for_each.h`
    • `cyber/base/wait_strategy`
    • `cyber/base/atomic_hash_map.h`
    • `cyber/base/rw_lock_guard`
    • `cyber/base/atomic_rw_lock`
    • `cyber/base/bounded_queue`
    • `cyber/base/concurrent_object_pool`
    • `cyber/base/object_pool`
    • `cyber/base/reentrant_rw_lock`
    • `cyber/base/signal`
    • `cyber/base/thread_pool`
    • `cyber/base/thread_safe_queue`
    • `cyber/base/unbounded_queue`

// Study:

是我的筆記

base中主要是一些thread safe的容器跟鎖等基本工具, 值得一看

cyber/base/macros.h

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_MACROS_H_
#define CYBER_BASE_MACROS_H_

#include <cstdlib>
#include <new>

// Study: __builtin_expect is a function to let the compiler know the chance of the condition to be true
// , the compiler can do optimization in assembly level using this information
#if __GNUC__ >= 3
#define likely(x) (__builtin_expect((x), 1))
#define unlikely(x) (__builtin_expect((x), 0))
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif

// Study: The size of cache line, cache line is the a block that will be fetch from memory to cache
#define CACHELINE_SIZE 64

// Study: Meta-programming, used SFINTE. When the T type have the `func` function, it will be true,
//        Otherwise it is false (since size of int is not 1)
//        basically used to determine the exist of the func for T in compile time
//        Can be mixed with the stl traits
#define DEFINE_TYPE_TRAIT(name, func)                            \
  template <typename T>                                          \
  class name {                                                   \
   private:                                                      \
    template <typename Class>                                    \
    static char Test(decltype(&Class::func)*);                   \
    template <typename>                                          \
    static int Test(...);                                        \
                                                                 \
   public:                                                       \
    static constexpr bool value = sizeof(Test<T>(nullptr)) == 1; \
  };                                                             \
                                                                 \
  template <typename T>                                          \
  constexpr bool name<T>::value;

// Study: Call the processer to pause (no operation)
//        The different of rep;nop; to nop;nop; is that processor can optimize with this
inline void cpu_relax() { asm volatile("rep; nop" ::: "memory"); }

// Study: Allocate memory
inline void* CheckedMalloc(size_t size) {
  void* ptr = std::malloc(size);
  if (!ptr) {
    throw std::bad_alloc();
  }
  return ptr;
}

// Study: Allocate memory and Clean location
inline void* CheckedCalloc(size_t num, size_t size) {
  void* ptr = std::calloc(num, size);
  if (!ptr) {
    throw std::bad_alloc();
  }
  return ptr;
}

#endif  // CYBER_BASE_MACROS_H_
           

cyber/base/atomic_fifo.h

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_ATOMIC_FIFO_H_
#define CYBER_BASE_ATOMIC_FIFO_H_

#include <atomic>
#include <cstdlib>
#include <cstring>
#include <iostream>

#include "cyber/base/macros.h"

namespace apollo {
namespace cyber {

template <typename T>
class AtomicFIFO {
 private:
  struct Node {
    T value;
  };

 public:
  // Study: Singleton
  static AtomicFIFO *GetInstance(int cap = 100) {
    static AtomicFIFO *inst = new AtomicFIFO(cap);
    return inst;
  }

  bool Push(const T &value);
  bool Pop(T *value);
  // insert();

 private:
  Node *node_arena_;
  // Study: Align to maximize the memory access speed
  alignas(CACHELINE_SIZE) std::atomic<uint32_t> head_;
  alignas(CACHELINE_SIZE) std::atomic<uint32_t> commit_;
  alignas(CACHELINE_SIZE) std::atomic<uint32_t> tail_;
  int capacity_;

  // Study: Only allow Singleton
  explicit AtomicFIFO(int cap);
  ~AtomicFIFO();
  AtomicFIFO(AtomicFIFO &) = delete;
  AtomicFIFO &operator=(AtomicFIFO &) = delete;
};

template <typename T>
AtomicFIFO<T>::AtomicFIFO(int cap) : capacity_(cap) {
  node_arena_ = static_cast<Node *>(malloc(capacity_ * sizeof(Node)));
  memset(node_arena_, 0, capacity_ * sizeof(Node));

  // Study: Set value to 0
  head_.store(0, std::memory_order_relaxed);
  tail_.store(0, std::memory_order_relaxed);
  commit_.store(0, std::memory_order_relaxed);
}

template <typename T>
AtomicFIFO<T>::~AtomicFIFO() {
  if (node_arena_ != nullptr) {
    for (int i = 0; i < capacity_; i++) {
      // Study: Call the T destructor manaully, since it is puted in the malloc region, it will not auto destruct
      node_arena_[i].value.~T();
    }
    free(node_arena_);
  }
}

template <typename T>
bool AtomicFIFO<T>::Push(const T &value) {
  uint32_t oldt, newt;

  // Study:  Try push until success, return false if queue full
  oldt = tail_.load(std::memory_order_acquire);
  do {
    uint32_t h = head_.load(std::memory_order_acquire);
    uint32_t t = tail_.load(std::memory_order_acquire);

    if (((t + 1) % capacity_) == h) return false;

    newt = (oldt + 1) % capacity_;
    // Study:  If success, set tail_ to newt, otherwise set oldt to current tail_
    //         Ensure tails value sync
  } while (!tail_.compare_exchange_weak(oldt, newt, std::memory_order_acq_rel,
                                        std::memory_order_acquire));

  (node_arena_ + oldt)->value = value;

  // Study:  commit_ is basically same as tail_, but it is used in pop.
  //         It can let the pop operation not block the push core part
  while (unlikely(commit_.load(std::memory_order_acquire) != oldt)) cpu_relax();

  // Study:  After commit, this value can be pop in Pop()
  commit_.store(newt, std::memory_order_release);

  return true;
}

template <typename T>
bool AtomicFIFO<T>::Pop(T *value) {
  uint32_t oldh, newh;

  oldh = head_.load(std::memory_order_acquire);

  // Study:  Basically same logic as the push part, try pop until success. Return false if no element
  do {
    uint32_t h = head_.load(std::memory_order_acquire);
    uint32_t c = commit_.load(std::memory_order_acquire);

    if (h == c) return false;

    newh = (oldh + 1) % capacity_;

    *value = (node_arena_ + oldh)->value;
  } while (!head_.compare_exchange_weak(oldh, newh, std::memory_order_acq_rel,
                                        std::memory_order_acquire));

  return true;
}

}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_ATOMIC_FIFO_H_

           

cyber/base/for_each.h

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_FOR_EACH_H_
#define CYBER_BASE_FOR_EACH_H_

#include <type_traits>

#include "cyber/base/macros.h"

namespace apollo {
namespace cyber {
namespace base {

// Study: A trait to check whether the class have impl operator <
DEFINE_TYPE_TRAIT(HasLess, operator<)  // NOLINT

// Study: If both of them have impl <, use it
template <class Value, class End>
typename std::enable_if<HasLess<Value>::value && HasLess<End>::value,
                        bool>::type
LessThan(const Value& val, const End& end) {
  return val < end;
}

// Study: Otherwise, check equality.....
//        Actually, I thing this function name is misleading. This will only make sense when using in FOR_EACH
template <class Value, class End>
typename std::enable_if<!HasLess<Value>::value || !HasLess<End>::value,
                        bool>::type
LessThan(const Value& val, const End& end) {
  return val != end;
}

// Study: Loop until end, Be careful that i should not be a index, it should be a iterator or something similiar
#define FOR_EACH(i, begin, end)           \
  for (auto i = (true ? (begin) : (end)); \
       apollo::cyber::base::LessThan(i, (end)); ++i)

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_FOR_EACH_H_
           

cyber/base/wait_strategy

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_WAIT_STRATEGY_H_
#define CYBER_BASE_WAIT_STRATEGY_H_

#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <mutex>
#include <thread>

namespace apollo {
namespace cyber {
namespace base {

class WaitStrategy {
 public:
  // Study: One waiter is allow to pass
  virtual void NotifyOne() {}
  // Study: All waiter is allow to pass
  virtual void BreakAllWait() {}
  // Study: Wait here
  virtual bool EmptyWait() = 0;
  virtual ~WaitStrategy() {}
};

// Study: Blocked until allow to pass
class BlockWaitStrategy : public WaitStrategy {
 public:
  BlockWaitStrategy() {}
  void NotifyOne() override { cv_.notify_one(); }

  bool EmptyWait() override {
    std::unique_lock<std::mutex> lock(mutex_);
    cv_.wait(lock);
    return true;
  }

  void BreakAllWait() { cv_.notify_all(); }

 private:
  std::mutex mutex_;
  std::condition_variable cv_;
};


// Study: Sleeped and pass after the sleep time
class SleepWaitStrategy : public WaitStrategy {
 public:
  SleepWaitStrategy() {}
  explicit SleepWaitStrategy(uint64_t sleep_time_us)
      : sleep_time_us_(sleep_time_us) {}

  bool EmptyWait() override {
    std::this_thread::sleep_for(std::chrono::microseconds(sleep_time_us_));
    return true;
  }

  void SetSleepTimeMicroSecends(uint64_t sleep_time_us) {
    sleep_time_us_ = sleep_time_us;
  }

 private:
  uint64_t sleep_time_us_ = 10000;
};

// Study: Reschedule this thread, let other thread use first
class YieldWaitStrategy : public WaitStrategy {
 public:
  YieldWaitStrategy() {}
  bool EmptyWait() override {
    std::this_thread::yield();
    return true;
  }
};

// Study: Just pass??????
class BusySpinWaitStrategy : public WaitStrategy {
 public:
  BusySpinWaitStrategy() {}
  bool EmptyWait() override { return true; }
};

// Study: Like BlockWaitStrategy, but have a time limit. Return false if timeout
class TimeoutBlockWaitStrategy : public WaitStrategy {
 public:
  TimeoutBlockWaitStrategy() {}
  explicit TimeoutBlockWaitStrategy(uint64_t timeout)
      : time_out_(std::chrono::milliseconds(timeout)) {}

  void NotifyOne() override { cv_.notify_one(); }

  bool EmptyWait() override {
    std::unique_lock<std::mutex> lock(mutex_);
    if (cv_.wait_for(lock, time_out_) == std::cv_status::timeout) {
      return false;
    }
    return true;
  }

  void BreakAllWait() { cv_.notify_all(); }

  void SetTimeout(uint64_t timeout) {
    time_out_ = std::chrono::milliseconds(timeout);
  }

 private:
  std::mutex mutex_;
  std::condition_variable cv_;
  std::chrono::milliseconds time_out_;
};

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_WAIT_STRATEGY_H_
           

cyber/base/atomic_hash_map.h

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_ATOMIC_HASH_MAP_H_
#define CYBER_BASE_ATOMIC_HASH_MAP_H_

#include <stdint.h>
#include <atomic>
#include <type_traits>
#include <utility>

namespace apollo {
namespace cyber {
namespace base {

// Study: TableSize must be power of 2
/**
 * @brief A implementation of lock-free fixed size hash map
 *
 * @tparam K Type of key, must be integral
 * @tparam V Type of value
 * @tparam 128 Size of hash table
 * @tparam 0 Type traits, use for checking types of key & value
 */
template <typename K, typename V, std::size_t TableSize = 128,
          typename std::enable_if<std::is_integral<K>::value &&
                                      (TableSize & (TableSize - 1)) == 0,
                                  int>::type = 0>
class AtomicHashMap {
 public:
  AtomicHashMap() : capacity_(TableSize), mode_num_(capacity_ - 1) {}
  AtomicHashMap(const AtomicHashMap &other) = delete;
  AtomicHashMap &operator=(const AtomicHashMap &other) = delete;

  // Study: Just get the key, and perform like a regular hash map
  //        The atomic part is in Bucket and Entry
  //        It DOESN'T provide a hash function,
  //        so the user must be careful with the key selection
  bool Has(K key) {
    uint64_t index = key & mode_num_;
    return table_[index].Has(key);
  }

  bool Get(K key, V **value) {
    uint64_t index = key & mode_num_;
    return table_[index].Get(key, value);
  }

  bool Get(K key, V *value) {
    uint64_t index = key & mode_num_;
    V *val = nullptr;
    bool res = table_[index].Get(key, &val);
    if (res) {
      *value = *val;
    }
    return res;
  }

  void Set(K key) {
    uint64_t index = key & mode_num_;
    table_[index].Insert(key);
  }

  void Set(K key, const V &value) {
    uint64_t index = key & mode_num_;
    table_[index].Insert(key, value);
  }

  // Study: Support right value passing
  void Set(K key, V &&value) {
    uint64_t index = key & mode_num_;
    table_[index].Insert(key, std::forward<V>(value));
  }

 private:
   // Study:  The nodes in Bucket, actually same as a regular Node.
   //         Just used atomic for value and next
  struct Entry {
    Entry() {}
    explicit Entry(K key) : key(key) {
      value_ptr.store(new V(), std::memory_order_release);
    }
    Entry(K key, const V &value) : key(key) {
      value_ptr.store(new V(value), std::memory_order_release);
    }
    Entry(K key, V &&value) : key(key) {
      value_ptr.store(new V(std::forward<V>(value)), std::memory_order_release);
    }
    ~Entry() { delete value_ptr.load(std::memory_order_acquire); }

    K key = 0;
    std::atomic<V *> value_ptr = {nullptr};
    std::atomic<Entry *> next = {nullptr};
  };

  // Study:  The real place for storing all value for ONE key
  //         It is a linked list inside
  //         Moreover, the atomic is promised here and Entry
  class Bucket {
   public:
    Bucket() : head_(new Entry()) {}
    ~Bucket() {
      Entry *ite = head_;
      while (ite) {
        auto tmp = ite->next.load(std::memory_order_acquire);
        delete ite;
        ite = tmp;
      }
    }

    // Study: Too simple, don't explain
    bool Has(K key) {
      Entry *m_target = head_->next.load(std::memory_order_acquire);
      while (Entry *target = m_target) {
        if (target->key < key) {
          m_target = target->next.load(std::memory_order_acquire);
          continue;
        } else {
          return target->key == key;
        }
      }
      return false;
    }

    // Study: Loop and return the ptr to the key element
    bool Find(K key, Entry **prev_ptr, Entry **target_ptr) {
      Entry *prev = head_;
      Entry *m_target = head_->next.load(std::memory_order_acquire);
      while (Entry *target = m_target) {
        if (target->key == key) {
          *prev_ptr = prev;
          *target_ptr = target;
          return true;
        } else if (target->key > key) {
          *prev_ptr = prev;
          *target_ptr = target;
          return false;
        } else {
          prev = target;
          m_target = target->next.load(std::memory_order_acquire);
        }
      }
      *prev_ptr = prev;
      *target_ptr = nullptr;
      return false;
    }

    // Study: Keep insert until success
    void Insert(K key, const V &value) {
      Entry *prev = nullptr;
      Entry *target = nullptr;
      Entry *new_entry = new Entry(key, value);
      V *new_value = new V(value);
      while (true) {
        if (Find(key, &prev, &target)) {
          // key exists, update value
          auto old_val_ptr = target->value_ptr.load(std::memory_order_acquire);
          if (target->value_ptr.compare_exchange_strong(
                  old_val_ptr, new_value, std::memory_order_acq_rel,
                  std::memory_order_relaxed)) {
            delete new_entry;
            return;
          }
          continue;
        } else {
          new_entry->next.store(target, std::memory_order_release);
          if (prev->next.compare_exchange_strong(target, new_entry,
                                                 std::memory_order_acq_rel,
                                                 std::memory_order_relaxed)) {
            // Insert success
            delete new_value;
            return;
          }
          // another entry has been inserted, retry
        }
      }
    }

    // Study: Same as above. All the below code is also simple so no explain
    void Insert(K key, V &&value) {
      Entry *prev = nullptr;
      Entry *target = nullptr;
      Entry *new_entry = new Entry(key, value);
      auto new_value = new V(std::forward<V>(value));
      while (true) {
        if (Find(key, &prev, &target)) {
          // key exists, update value
          auto old_val_ptr = target->value_ptr.load(std::memory_order_acquire);
          if (target->value_ptr.compare_exchange_strong(
                  old_val_ptr, new_value, std::memory_order_acq_rel,
                  std::memory_order_relaxed)) {
            delete new_entry;
            return;
          }
          continue;
        } else {
          new_entry->next.store(target, std::memory_order_release);
          if (prev->next.compare_exchange_strong(target, new_entry,
                                                 std::memory_order_acq_rel,
                                                 std::memory_order_relaxed)) {
            // Insert success
            delete new_value;
            return;
          }
          // another entry has been inserted, retry
        }
      }
    }

    void Insert(K key) {
      Entry *prev = nullptr;
      Entry *target = nullptr;
      Entry *new_entry = new Entry(key);
      auto new_value = new V();
      while (true) {
        if (Find(key, &prev, &target)) {
          // key exists, update value
          auto old_val_ptr = target->value_ptr.load(std::memory_order_acquire);
          if (target->value_ptr.compare_exchange_strong(
                  old_val_ptr, new_value, std::memory_order_acq_rel,
                  std::memory_order_relaxed)) {
            delete new_entry;
            return;
          }
          continue;
        } else {
          new_entry->next.store(target, std::memory_order_release);
          if (prev->next.compare_exchange_strong(target, new_entry,
                                                 std::memory_order_acq_rel,
                                                 std::memory_order_relaxed)) {
            // Insert success
            delete new_value;
            return;
          }
          // another entry has been inserted, retry
        }
      }
    }

    bool Get(K key, V **value) {
      Entry *prev = nullptr;
      Entry *target = nullptr;
      if (Find(key, &prev, &target)) {
        *value = target->value_ptr.load(std::memory_order_acquire);
        return true;
      }
      return false;
    }

    Entry *head_;
  };

 private:
  Bucket table_[TableSize];
  uint64_t capacity_;
  uint64_t mode_num_;
};

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_ATOMIC_HASH_MAP_H_
           

cyber/base/rw_lock_guard

Provide two wrapper

ReadLockGuard

,

WriteLockGuard

for Read-Write-Lock. Using RAII, lock at constructor and unlock at destructor.

cyber/base/atomic_rw_lock

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_ATOMIC_RW_LOCK_H_
#define CYBER_BASE_ATOMIC_RW_LOCK_H_

#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>

#include "cyber/base/rw_lock_guard.h"

namespace apollo {
namespace cyber {
namespace base {

class AtomicRWLock {
  // Study: Since only allow using this lock with the lock guard,
  //        so the lock and unlock function is placed on private
  //        then the lockguard must define as friend here.
  friend class ReadLockGuard<AtomicRWLock>;
  friend class WriteLockGuard<AtomicRWLock>;

 public:
  static const int32_t RW_LOCK_FREE = 0;
  static const int32_t WRITE_EXCLUSIVE = -1;
  static const uint32_t MAX_RETRY_TIMES = 5;
  AtomicRWLock() {}
  explicit AtomicRWLock(bool write_first) : write_first_(write_first) {}

 private:
  // all these function only can used by ReadLockGuard/WriteLockGuard;
  void ReadLock();
  void WriteLock();

  void ReadUnlock();
  void WriteUnlock();

  AtomicRWLock(const AtomicRWLock&) = delete;
  AtomicRWLock& operator=(const AtomicRWLock&) = delete;
  std::atomic<uint32_t> write_lock_wait_num_ = {0};
  std::atomic<int32_t> lock_num_ = {0};
  bool write_first_ = true;
};

// Study: First wait all write lock release using looping
//        (will reschedule this thread if still not release after N try)
//        If in write frist mode, need also wait waiting write lock
inline void AtomicRWLock::ReadLock() {
  uint32_t retry_times = 0;
  int32_t lock_num = lock_num_.load();
  if (write_first_) {
    do {
      while (lock_num < RW_LOCK_FREE || write_lock_wait_num_.load() > 0) {
        if (++retry_times == MAX_RETRY_TIMES) {
          // saving cpu
          std::this_thread::yield();
          retry_times = 0;
        }
        lock_num = lock_num_.load();
      }
    } while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
                                              std::memory_order_acq_rel,
                                              std::memory_order_relaxed));
  } else {
    do {
      while (lock_num < RW_LOCK_FREE) {
        if (++retry_times == MAX_RETRY_TIMES) {
          // saving cpu
          std::this_thread::yield();
          retry_times = 0;
        }
        lock_num = lock_num_.load();
      }
    } while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
                                              std::memory_order_acq_rel,
                                              std::memory_order_relaxed));
  }
}

// Study:  Don't think too much, just lock
inline void AtomicRWLock::WriteLock() {
  int32_t rw_lock_free = RW_LOCK_FREE;
  uint32_t retry_times = 0;
  write_lock_wait_num_.fetch_add(1);
  while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
                                          std::memory_order_acq_rel,
                                          std::memory_order_relaxed)) {
    // rw_lock_free will change after CAS fail, so init agin
    rw_lock_free = RW_LOCK_FREE;
    if (++retry_times == MAX_RETRY_TIMES) {
      // saving cpu
      std::this_thread::yield();
      retry_times = 0;
    }
  }
  write_lock_wait_num_.fetch_sub(1);
}

// Study: Read lock is +, unlock is -
inline void AtomicRWLock::ReadUnlock() { lock_num_.fetch_sub(1); }

// Study: Write lock is -, unlock is +
inline void AtomicRWLock::WriteUnlock() { lock_num_.fetch_add(1); }

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_ATOMIC_RW_LOCK_H_
           

cyber/base/bounded_queue

A atomic fixed-size queue imlpemented using circular array and atomic variable.

The atomic imlpementation is similiar to the atomic_fifo so not describe it.

The special feature of this class is its wait_strategy_. It allow WaitQueue.

It is similiar to Dequeue, but it will wait if the queue is empty.

cyber/base/concurrent_object_pool

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_CONCURRENT_OBJECT_POOL_H_
#define CYBER_BASE_CONCURRENT_OBJECT_POOL_H_

#include <atomic>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <utility>

#include "cyber/base/for_each.h"
#include "cyber/base/macros.h"

namespace apollo {
namespace cyber {
namespace base {

// Study: The internal data structure is a linked list
//        that behave like a stack while using a continuous memory
//        The have a abstract free node list above the node list
template <typename T>
class CCObjectPool : public std::enable_shared_from_this<CCObjectPool<T>> {
 public:
  explicit CCObjectPool(uint32_t size);
  virtual ~CCObjectPool();

  // Study: It arguments is used to pass to the constructor of T
  //        If input is rvalue, pass reference to rvalue.
  //        If it is lvalue, it will be lvalue reference
  template <typename... Args>
  void ConstructAll(Args &&... args);

  template <typename... Args>
  std::shared_ptr<T> ConstructObject(Args &&... args);

  std::shared_ptr<T> GetObject();
  void ReleaseObject(T *);
  uint32_t size() const;

 private:
  struct Node {
    T object;
    Node *next;
  };

  struct alignas(2 * sizeof(Node *)) Head {
    uintptr_t count;
    Node *node;
  };

 private:
  CCObjectPool(CCObjectPool &) = delete;
  CCObjectPool &operator=(CCObjectPool &) = delete;
  bool FindFreeHead(Head *head);

  std::atomic<Head> free_head_;
  // Study: This is the place for storing the real object
  Node *node_arena_ = nullptr;
  uint32_t capacity_ = 0;
};

// Study: Initialize node_arena_
template <typename T>
CCObjectPool<T>::CCObjectPool(uint32_t size) : capacity_(size) {
  node_arena_ = static_cast<Node *>(CheckedCalloc(capacity_, sizeof(Node)));
  FOR_EACH(i, 0, capacity_ - 1) { node_arena_[i].next = node_arena_ + 1 + i; }
  node_arena_[capacity_ - 1].next = nullptr;
  free_head_.store({0, node_arena_}, std::memory_order_relaxed);
}

// Study: Using the same argument to create object and fill all the node_arena_
template <typename T>
template <typename... Args>
void CCObjectPool<T>::ConstructAll(Args &&... args) {
  FOR_EACH(i, 0, capacity_) {
    new (node_arena_ + i) T(std::forward<Args>(args)...);
  }
}

// Study: This is a very danger implementation
//        Since it allow other class get the object shared pointer
//        However, it not destruct the object when pool destruct
//        It mean that it will cause memory leak if type T have a heap-base member
//        Maybe it assumed this pool will only destructed after all object is manually released
//        of released by shared-pointer
template <typename T>
CCObjectPool<T>::~CCObjectPool() {
  std::free(node_arena_);
}

template <typename T>
bool CCObjectPool<T>::FindFreeHead(Head *head) {
  Head new_head;
  Head old_head = free_head_.load(std::memory_order_acquire);
  do {
    // Study: Already at the tails
    if (unlikely(old_head.node == nullptr)) {
      return false;
    }
    new_head.node = old_head.node->next;
    new_head.count = old_head.count + 1;
  } while (!free_head_.compare_exchange_weak(old_head, new_head,
                                             std::memory_order_acq_rel,
                                             std::memory_order_acquire));
  // Study: Get the free head, and move the free head
  *head = old_head;
  return true;
}

// Study: Get one resource, release it if that object no one if point to
template <typename T>
std::shared_ptr<T> CCObjectPool<T>::GetObject() {
  Head free_head;
  if (unlikely(!FindFreeHead(&free_head))) {
    return nullptr;
  }
  auto self = this->shared_from_this();
  return std::shared_ptr<T>(reinterpret_cast<T *>(free_head.node),
                            [self](T *object) { self->ReleaseObject(object); });
}

// Study: Get the first node in free node list.
template <typename T>
template <typename... Args>
std::shared_ptr<T> CCObjectPool<T>::ConstructObject(Args &&... args) {
  Head free_head;
  if (unlikely(!FindFreeHead(&free_head))) {
    return nullptr;
  }
  auto self = this->shared_from_this();
  T *ptr = new (free_head.node) T(std::forward<Args>(args)...);
  return std::shared_ptr<T>(ptr, [self](T *object) {
    object->~T();
    self->ReleaseObject(object);
  });
}

// Study: When an object is released, let it head relinked to the free node list.
template <typename T>
void CCObjectPool<T>::ReleaseObject(T *object) {
  Head new_head;
  Node *node = reinterpret_cast<Node *>(object);
  Head old_head = free_head_.load(std::memory_order_acquire);
  do {
    node->next = old_head.node;
    new_head.node = node;
    new_head.count = old_head.count + 1;
  } while (!free_head_.compare_exchange_weak(old_head, new_head,
                                             std::memory_order_acq_rel,
                                             std::memory_order_acquire));
}

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_CONCURRENT_OBJECT_POOL_H_
           

cyber/base/object_pool

The regular version of

concurrent_object_pool

without any concurrent promise.

The object create and destruction inside the pool will only happen in the pool constructor and destructor.

This is quite different to

concurrent_object_pool

.

Since

concurrent_object_pool

allow public call of release and create of an object.

And

concurrent_object_pool

doesn’t call object destructor in pool destructor.

Obviously that, the using assumption of this two class is different and they cannot be replace each other easily.

cyber/base/reentrant_rw_lock

先定義一下Reentrant rw lock

大體上跟等價於一般的RW lock

主要不同就是

 它保護了當前持有鎖的線程,想再加鎖時不會被blocking

減少上下文切換的成本

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_REENTRANT_RW_LOCK_H_
#define CYBER_BASE_REENTRANT_RW_LOCK_H_

#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>

#include "cyber/base/rw_lock_guard.h"

namespace apollo {
namespace cyber {
namespace base {

// Study:  要做reentrant 就一定要先知道自己thread id
static const std::thread::id NULL_THREAD_ID = std::thread::id();
class ReentrantRWLock {
  friend class ReadLockGuard<ReentrantRWLock>;
  friend class WriteLockGuard<ReentrantRWLock>;

 public:
  static const int32_t RW_LOCK_FREE = 0;
  // Study:  lock num = WRITE_EXCLUSIVE mean can read
  static const int32_t WRITE_EXCLUSIVE = -1;
  static const uint32_t MAX_RETRY_TIMES = 5;
  static const std::thread::id null_thread;
  ReentrantRWLock() {}
  explicit ReentrantRWLock(bool write_first) : write_first_(write_first) {}

 private:
  // all these function only can used by ReadLockGuard/WriteLockGuard;
  void ReadLock();
  void WriteLock();

  void ReadUnlock();
  void WriteUnlock();

  ReentrantRWLock(const ReentrantRWLock&) = delete;
  ReentrantRWLock& operator=(const ReentrantRWLock&) = delete;
  // Study:  Check who getting the write lock
  std::thread::id write_thread_id_ = {NULL_THREAD_ID};
  std::atomic<uint32_t> write_lock_wait_num_ = {0};
  // Study:  Allow multiple repeated lock, so need lock num to count the lock
  std::atomic<int32_t> lock_num_ = {0};
  bool write_first_ = true;
};

inline void ReentrantRWLock::ReadLock() {
  // Study:  Reentrant Check
  if (write_thread_id_ == std::this_thread::get_id()) {
    return;
  }

  uint32_t retry_times = 0;
  int32_t lock_num = lock_num_.load(std::memory_order_acquire);
  if (write_first_) {
    do {
      while (lock_num < RW_LOCK_FREE ||
             write_lock_wait_num_.load(std::memory_order_acquire) > 0) {
        if (++retry_times == MAX_RETRY_TIMES) {
          // saving cpu
          std::this_thread::yield();
          retry_times = 0;
        }
        lock_num = lock_num_.load(std::memory_order_acquire);
      }
    } while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
                                              std::memory_order_acq_rel,
                                              std::memory_order_relaxed));
  } else {
    do {
      while (lock_num < RW_LOCK_FREE) {
        if (++retry_times == MAX_RETRY_TIMES) {
          // saving cpu
          std::this_thread::yield();
          retry_times = 0;
        }
        lock_num = lock_num_.load(std::memory_order_acquire);
      }
    } while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
                                              std::memory_order_acq_rel,
                                              std::memory_order_relaxed));
  }
}

inline void ReentrantRWLock::WriteLock() {
  auto this_thread_id = std::this_thread::get_id();
  // Study:  Reentrant Check
  if (write_thread_id_ == this_thread_id) {
    lock_num_.fetch_sub(1);
    return;
  }
  int32_t rw_lock_free = RW_LOCK_FREE;
  uint32_t retry_times = 0;
  write_lock_wait_num_.fetch_add(1);
  while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
                                          std::memory_order_acq_rel,
                                          std::memory_order_relaxed)) {
    // rw_lock_free will change after CAS fail, so init agin
    rw_lock_free = RW_LOCK_FREE;
    if (++retry_times == MAX_RETRY_TIMES) {
      // saving cpu
      std::this_thread::yield();
      retry_times = 0;
    }
  }
  write_thread_id_ = this_thread_id;
  write_lock_wait_num_.fetch_sub(1);
}

// Study:  比非reentrant version 多了個thread check
inline void ReentrantRWLock::ReadUnlock() {
  if (write_thread_id_ == std::this_thread::get_id()) {
    return;
  }
  lock_num_.fetch_sub(1);
}

// Study:  lock num的更新要同時維護write_thread_id_
inline void ReentrantRWLock::WriteUnlock() {
  if (lock_num_.fetch_add(1) == WRITE_EXCLUSIVE) {
    write_thread_id_ = NULL_THREAD_ID;
  }
}

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_REENTRANT_RW_LOCK_H_
           

cyber/base/signal

signal and slot 的 concept可看qt

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_SIGNAL_H_
#define CYBER_BASE_SIGNAL_H_

#include <algorithm>
#include <functional>
#include <list>
#include <memory>
#include <mutex>

namespace apollo {
namespace cyber {
namespace base {

// Study:  Forward Declaration
template <typename... Args>
class Slot;

template <typename... Args>
class Connection;

template <typename... Args>
class Signal {
 public:
  using Callback = std::function<void(Args...)>;
  using SlotPtr = std::shared_ptr<Slot<Args...>>;
  using SlotList = std::list<SlotPtr>;
  using ConnectionType = Connection<Args...>;

  Signal() {}
  virtual ~Signal() { DisconnectAllSlots(); }

  // Study:  When the signal is activate
  void operator()(Args... args) {
    // Study:  limit the lock scope
    SlotList local;
    {
      std::lock_guard<std::mutex> lock(mutex_);
      for (auto& slot : slots_) {
        local.emplace_back(slot);
      }
    }

    if (!local.empty()) {
      for (auto& slot : local) {
        (*slot)(args...);
      }
    }

    ClearDisconnectedSlots();
  }

  // Study: This is the first step in using a signal
  //        Connect signal to slot
  ConnectionType Connect(const Callback& cb) {
    auto slot = std::make_shared<Slot<Args...>>(cb);
    {
      std::lock_guard<std::mutex> lock(mutex_);
      slots_.emplace_back(slot);
    }

    return ConnectionType(slot, this);
  }

  bool Disconnect(const ConnectionType& conn) {
    bool find = false;
    {
      std::lock_guard<std::mutex> lock(mutex_);
      for (auto& slot : slots_) {
        if (conn.HasSlot(slot)) {
          find = true;
          slot->Disconnect();
        }
      }
    }

    if (find) {
      ClearDisconnectedSlots();
    }
    return find;
  }

  void DisconnectAllSlots() {
    std::lock_guard<std::mutex> lock(mutex_);
    for (auto& slot : slots_) {
      slot->Disconnect();
    }
    slots_.clear();
  }

 private:
  Signal(const Signal&) = delete;
  Signal& operator=(const Signal&) = delete;

  void ClearDisconnectedSlots() {
    std::lock_guard<std::mutex> lock(mutex_);
    slots_.erase(
        std::remove_if(slots_.begin(), slots_.end(),
                       [](const SlotPtr& slot) { return !slot->connected(); }),
        slots_.end());
  }

  SlotList slots_;
  std::mutex mutex_;
};

// Study:  This represent the connection status between signal and slot
//         Help maintain the real time connectivity
template <typename... Args>
class Connection {
 public:
  using SlotPtr = std::shared_ptr<Slot<Args...>>;
  using SignalPtr = Signal<Args...>*;

  Connection() : slot_(nullptr), signal_(nullptr) {}
  Connection(const SlotPtr& slot, const SignalPtr& signal)
      : slot_(slot), signal_(signal) {}
  virtual ~Connection() {
    slot_ = nullptr;
    signal_ = nullptr;
  }

  Connection& operator=(const Connection& another) {
    if (this != &another) {
      this->slot_ = another.slot_;
      this->signal_ = another.signal_;
    }
    return *this;
  }

  bool HasSlot(const SlotPtr& slot) const {
    if (slot != nullptr && slot_ != nullptr) {
      return slot_.get() == slot.get();
    }
    return false;
  }

  bool IsConnected() const {
    if (slot_) {
      return slot_->connected();
    }
    return false;
  }

  bool Disconnect() {
    if (signal_ && slot_) {
      return signal_->Disconnect(*this);
    }
    return false;
  }

 private:
  SlotPtr slot_;
  SignalPtr signal_;
};

template <typename... Args>
class Slot {
 public:
  using Callback = std::function<void(Args...)>;
  Slot(const Slot& another)
      : cb_(another.cb_), connected_(another.connected_) {}
  explicit Slot(const Callback& cb, bool connected = true)
      : cb_(cb), connected_(connected) {}
  virtual ~Slot() {}

  // Study:  When the slot have receive signal, do cb  
  void operator()(Args... args) {
    if (connected_ && cb_) {
      cb_(args...);
    }
  }

  void Disconnect() { connected_ = false; }
  bool connected() const { return connected_; }

 private:
  Callback cb_;
  bool connected_ = true;
};

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_SIGNAL_H_

           

cyber/base/thread_pool

建在

bounded_queue

上的一個thread pool,

Task 是一個function 跟對應的argument

/******************************************************************************
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

#ifndef CYBER_BASE_THREAD_POOL_H_
#define CYBER_BASE_THREAD_POOL_H_

#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <stdexcept>
#include <thread>
#include <utility>
#include <vector>

#include "cyber/base/bounded_queue.h"

namespace apollo {
namespace cyber {
namespace base {

class ThreadPool {
 public:
  explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);

  // Study: F is the function, Args is arguments,
  //        it will return the value that wrapped by future
  template <typename F, typename... Args>
  auto Enqueue(F&& f, Args&&... args)
      -> std::future<typename std::result_of<F(Args...)>::type>;

  ~ThreadPool();

 private:
  std::vector<std::thread> workers_;
  BoundedQueue<std::function<void()>> task_queue_;
  std::atomic_bool stop_;
};

inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
    : stop_(false) {
  if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
    throw std::runtime_error("Task queue init failed.");
  }
  // Study: Thread pool of course have thread worker
  for (size_t i = 0; i < threads; ++i) {
    workers_.emplace_back([this] {
      while (!stop_) {
        std::function<void()> task;
        if (task_queue_.WaitDequeue(&task)) {
          task();
        }
      }
    });
  }
}

// before using the return value, you should check value.valid()
template <typename F, typename... Args>
auto ThreadPool::Enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type> {
  using return_type = typename std::result_of<F(Args...)>::type;

  auto task = std::make_shared<std::packaged_task<return_type()>>(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));

  std::future<return_type> res = task->get_future();

  // don't allow enqueueing after stopping the pool
  if (stop_) {
    return std::future<return_type>();
  }
  task_queue_.Enqueue([task]() { (*task)(); });
  return res;
};

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
  if (stop_.exchange(true)) {
    return;
  }
  task_queue_.BreakAllWait();
  for (std::thread& worker : workers_) {
    worker.join();
  }
}

}  // namespace base
}  // namespace cyber
}  // namespace apollo

#endif  // CYBER_BASE_THREAD_POOL_H_
           

cyber/base/thread_safe_queue

就一個queue加上了一個mutex.

而它為了也提供wait queue, 是以也加了condition_variable

cyber/base/unbounded_queue

bounded_queue的 unbounded版。

不過要留意兩者實現完全不一樣, 而且unbounded_queue為了thread safe

它是用linked_list, 而不是dynamic array.

unbounded_queue理論上性能會比bounded_queue低

而且它都用compare_exchange_strong, 應該是所設計上就不是給大量並發的場景的

繼續閱讀