Apollo 3.5 Cyber base代碼學習
-
- `cyber/base/macros.h`
- `cyber/base/atomic_fifo.h`
- `cyber/base/for_each.h`
- `cyber/base/wait_strategy`
- `cyber/base/atomic_hash_map.h`
- `cyber/base/rw_lock_guard`
- `cyber/base/atomic_rw_lock`
- `cyber/base/bounded_queue`
- `cyber/base/concurrent_object_pool`
- `cyber/base/object_pool`
- `cyber/base/reentrant_rw_lock`
- `cyber/base/signal`
- `cyber/base/thread_pool`
- `cyber/base/thread_safe_queue`
- `cyber/base/unbounded_queue`
// Study:
是我的筆記
base中主要是一些thread safe的容器跟鎖等基本工具, 值得一看
cyber/base/macros.h
cyber/base/macros.h
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_MACROS_H_
#define CYBER_BASE_MACROS_H_
#include <cstdlib>
#include <new>
// Study: __builtin_expect is a function to let the compiler know the chance of the condition to be true
// , the compiler can do optimization in assembly level using this information
#if __GNUC__ >= 3
#define likely(x) (__builtin_expect((x), 1))
#define unlikely(x) (__builtin_expect((x), 0))
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif
// Study: The size of cache line, cache line is the a block that will be fetch from memory to cache
#define CACHELINE_SIZE 64
// Study: Meta-programming, used SFINTE. When the T type have the `func` function, it will be true,
// Otherwise it is false (since size of int is not 1)
// basically used to determine the exist of the func for T in compile time
// Can be mixed with the stl traits
#define DEFINE_TYPE_TRAIT(name, func) \
template <typename T> \
class name { \
private: \
template <typename Class> \
static char Test(decltype(&Class::func)*); \
template <typename> \
static int Test(...); \
\
public: \
static constexpr bool value = sizeof(Test<T>(nullptr)) == 1; \
}; \
\
template <typename T> \
constexpr bool name<T>::value;
// Study: Call the processer to pause (no operation)
// The different of rep;nop; to nop;nop; is that processor can optimize with this
inline void cpu_relax() { asm volatile("rep; nop" ::: "memory"); }
// Study: Allocate memory
inline void* CheckedMalloc(size_t size) {
void* ptr = std::malloc(size);
if (!ptr) {
throw std::bad_alloc();
}
return ptr;
}
// Study: Allocate memory and Clean location
inline void* CheckedCalloc(size_t num, size_t size) {
void* ptr = std::calloc(num, size);
if (!ptr) {
throw std::bad_alloc();
}
return ptr;
}
#endif // CYBER_BASE_MACROS_H_
cyber/base/atomic_fifo.h
cyber/base/atomic_fifo.h
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_ATOMIC_FIFO_H_
#define CYBER_BASE_ATOMIC_FIFO_H_
#include <atomic>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include "cyber/base/macros.h"
namespace apollo {
namespace cyber {
template <typename T>
class AtomicFIFO {
private:
struct Node {
T value;
};
public:
// Study: Singleton
static AtomicFIFO *GetInstance(int cap = 100) {
static AtomicFIFO *inst = new AtomicFIFO(cap);
return inst;
}
bool Push(const T &value);
bool Pop(T *value);
// insert();
private:
Node *node_arena_;
// Study: Align to maximize the memory access speed
alignas(CACHELINE_SIZE) std::atomic<uint32_t> head_;
alignas(CACHELINE_SIZE) std::atomic<uint32_t> commit_;
alignas(CACHELINE_SIZE) std::atomic<uint32_t> tail_;
int capacity_;
// Study: Only allow Singleton
explicit AtomicFIFO(int cap);
~AtomicFIFO();
AtomicFIFO(AtomicFIFO &) = delete;
AtomicFIFO &operator=(AtomicFIFO &) = delete;
};
template <typename T>
AtomicFIFO<T>::AtomicFIFO(int cap) : capacity_(cap) {
node_arena_ = static_cast<Node *>(malloc(capacity_ * sizeof(Node)));
memset(node_arena_, 0, capacity_ * sizeof(Node));
// Study: Set value to 0
head_.store(0, std::memory_order_relaxed);
tail_.store(0, std::memory_order_relaxed);
commit_.store(0, std::memory_order_relaxed);
}
template <typename T>
AtomicFIFO<T>::~AtomicFIFO() {
if (node_arena_ != nullptr) {
for (int i = 0; i < capacity_; i++) {
// Study: Call the T destructor manaully, since it is puted in the malloc region, it will not auto destruct
node_arena_[i].value.~T();
}
free(node_arena_);
}
}
template <typename T>
bool AtomicFIFO<T>::Push(const T &value) {
uint32_t oldt, newt;
// Study: Try push until success, return false if queue full
oldt = tail_.load(std::memory_order_acquire);
do {
uint32_t h = head_.load(std::memory_order_acquire);
uint32_t t = tail_.load(std::memory_order_acquire);
if (((t + 1) % capacity_) == h) return false;
newt = (oldt + 1) % capacity_;
// Study: If success, set tail_ to newt, otherwise set oldt to current tail_
// Ensure tails value sync
} while (!tail_.compare_exchange_weak(oldt, newt, std::memory_order_acq_rel,
std::memory_order_acquire));
(node_arena_ + oldt)->value = value;
// Study: commit_ is basically same as tail_, but it is used in pop.
// It can let the pop operation not block the push core part
while (unlikely(commit_.load(std::memory_order_acquire) != oldt)) cpu_relax();
// Study: After commit, this value can be pop in Pop()
commit_.store(newt, std::memory_order_release);
return true;
}
template <typename T>
bool AtomicFIFO<T>::Pop(T *value) {
uint32_t oldh, newh;
oldh = head_.load(std::memory_order_acquire);
// Study: Basically same logic as the push part, try pop until success. Return false if no element
do {
uint32_t h = head_.load(std::memory_order_acquire);
uint32_t c = commit_.load(std::memory_order_acquire);
if (h == c) return false;
newh = (oldh + 1) % capacity_;
*value = (node_arena_ + oldh)->value;
} while (!head_.compare_exchange_weak(oldh, newh, std::memory_order_acq_rel,
std::memory_order_acquire));
return true;
}
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_ATOMIC_FIFO_H_
cyber/base/for_each.h
cyber/base/for_each.h
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_FOR_EACH_H_
#define CYBER_BASE_FOR_EACH_H_
#include <type_traits>
#include "cyber/base/macros.h"
namespace apollo {
namespace cyber {
namespace base {
// Study: A trait to check whether the class have impl operator <
DEFINE_TYPE_TRAIT(HasLess, operator<) // NOLINT
// Study: If both of them have impl <, use it
template <class Value, class End>
typename std::enable_if<HasLess<Value>::value && HasLess<End>::value,
bool>::type
LessThan(const Value& val, const End& end) {
return val < end;
}
// Study: Otherwise, check equality.....
// Actually, I thing this function name is misleading. This will only make sense when using in FOR_EACH
template <class Value, class End>
typename std::enable_if<!HasLess<Value>::value || !HasLess<End>::value,
bool>::type
LessThan(const Value& val, const End& end) {
return val != end;
}
// Study: Loop until end, Be careful that i should not be a index, it should be a iterator or something similiar
#define FOR_EACH(i, begin, end) \
for (auto i = (true ? (begin) : (end)); \
apollo::cyber::base::LessThan(i, (end)); ++i)
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_FOR_EACH_H_
cyber/base/wait_strategy
cyber/base/wait_strategy
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_WAIT_STRATEGY_H_
#define CYBER_BASE_WAIT_STRATEGY_H_
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <mutex>
#include <thread>
namespace apollo {
namespace cyber {
namespace base {
class WaitStrategy {
public:
// Study: One waiter is allow to pass
virtual void NotifyOne() {}
// Study: All waiter is allow to pass
virtual void BreakAllWait() {}
// Study: Wait here
virtual bool EmptyWait() = 0;
virtual ~WaitStrategy() {}
};
// Study: Blocked until allow to pass
class BlockWaitStrategy : public WaitStrategy {
public:
BlockWaitStrategy() {}
void NotifyOne() override { cv_.notify_one(); }
bool EmptyWait() override {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock);
return true;
}
void BreakAllWait() { cv_.notify_all(); }
private:
std::mutex mutex_;
std::condition_variable cv_;
};
// Study: Sleeped and pass after the sleep time
class SleepWaitStrategy : public WaitStrategy {
public:
SleepWaitStrategy() {}
explicit SleepWaitStrategy(uint64_t sleep_time_us)
: sleep_time_us_(sleep_time_us) {}
bool EmptyWait() override {
std::this_thread::sleep_for(std::chrono::microseconds(sleep_time_us_));
return true;
}
void SetSleepTimeMicroSecends(uint64_t sleep_time_us) {
sleep_time_us_ = sleep_time_us;
}
private:
uint64_t sleep_time_us_ = 10000;
};
// Study: Reschedule this thread, let other thread use first
class YieldWaitStrategy : public WaitStrategy {
public:
YieldWaitStrategy() {}
bool EmptyWait() override {
std::this_thread::yield();
return true;
}
};
// Study: Just pass??????
class BusySpinWaitStrategy : public WaitStrategy {
public:
BusySpinWaitStrategy() {}
bool EmptyWait() override { return true; }
};
// Study: Like BlockWaitStrategy, but have a time limit. Return false if timeout
class TimeoutBlockWaitStrategy : public WaitStrategy {
public:
TimeoutBlockWaitStrategy() {}
explicit TimeoutBlockWaitStrategy(uint64_t timeout)
: time_out_(std::chrono::milliseconds(timeout)) {}
void NotifyOne() override { cv_.notify_one(); }
bool EmptyWait() override {
std::unique_lock<std::mutex> lock(mutex_);
if (cv_.wait_for(lock, time_out_) == std::cv_status::timeout) {
return false;
}
return true;
}
void BreakAllWait() { cv_.notify_all(); }
void SetTimeout(uint64_t timeout) {
time_out_ = std::chrono::milliseconds(timeout);
}
private:
std::mutex mutex_;
std::condition_variable cv_;
std::chrono::milliseconds time_out_;
};
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_WAIT_STRATEGY_H_
cyber/base/atomic_hash_map.h
cyber/base/atomic_hash_map.h
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_ATOMIC_HASH_MAP_H_
#define CYBER_BASE_ATOMIC_HASH_MAP_H_
#include <stdint.h>
#include <atomic>
#include <type_traits>
#include <utility>
namespace apollo {
namespace cyber {
namespace base {
// Study: TableSize must be power of 2
/**
* @brief A implementation of lock-free fixed size hash map
*
* @tparam K Type of key, must be integral
* @tparam V Type of value
* @tparam 128 Size of hash table
* @tparam 0 Type traits, use for checking types of key & value
*/
template <typename K, typename V, std::size_t TableSize = 128,
typename std::enable_if<std::is_integral<K>::value &&
(TableSize & (TableSize - 1)) == 0,
int>::type = 0>
class AtomicHashMap {
public:
AtomicHashMap() : capacity_(TableSize), mode_num_(capacity_ - 1) {}
AtomicHashMap(const AtomicHashMap &other) = delete;
AtomicHashMap &operator=(const AtomicHashMap &other) = delete;
// Study: Just get the key, and perform like a regular hash map
// The atomic part is in Bucket and Entry
// It DOESN'T provide a hash function,
// so the user must be careful with the key selection
bool Has(K key) {
uint64_t index = key & mode_num_;
return table_[index].Has(key);
}
bool Get(K key, V **value) {
uint64_t index = key & mode_num_;
return table_[index].Get(key, value);
}
bool Get(K key, V *value) {
uint64_t index = key & mode_num_;
V *val = nullptr;
bool res = table_[index].Get(key, &val);
if (res) {
*value = *val;
}
return res;
}
void Set(K key) {
uint64_t index = key & mode_num_;
table_[index].Insert(key);
}
void Set(K key, const V &value) {
uint64_t index = key & mode_num_;
table_[index].Insert(key, value);
}
// Study: Support right value passing
void Set(K key, V &&value) {
uint64_t index = key & mode_num_;
table_[index].Insert(key, std::forward<V>(value));
}
private:
// Study: The nodes in Bucket, actually same as a regular Node.
// Just used atomic for value and next
struct Entry {
Entry() {}
explicit Entry(K key) : key(key) {
value_ptr.store(new V(), std::memory_order_release);
}
Entry(K key, const V &value) : key(key) {
value_ptr.store(new V(value), std::memory_order_release);
}
Entry(K key, V &&value) : key(key) {
value_ptr.store(new V(std::forward<V>(value)), std::memory_order_release);
}
~Entry() { delete value_ptr.load(std::memory_order_acquire); }
K key = 0;
std::atomic<V *> value_ptr = {nullptr};
std::atomic<Entry *> next = {nullptr};
};
// Study: The real place for storing all value for ONE key
// It is a linked list inside
// Moreover, the atomic is promised here and Entry
class Bucket {
public:
Bucket() : head_(new Entry()) {}
~Bucket() {
Entry *ite = head_;
while (ite) {
auto tmp = ite->next.load(std::memory_order_acquire);
delete ite;
ite = tmp;
}
}
// Study: Too simple, don't explain
bool Has(K key) {
Entry *m_target = head_->next.load(std::memory_order_acquire);
while (Entry *target = m_target) {
if (target->key < key) {
m_target = target->next.load(std::memory_order_acquire);
continue;
} else {
return target->key == key;
}
}
return false;
}
// Study: Loop and return the ptr to the key element
bool Find(K key, Entry **prev_ptr, Entry **target_ptr) {
Entry *prev = head_;
Entry *m_target = head_->next.load(std::memory_order_acquire);
while (Entry *target = m_target) {
if (target->key == key) {
*prev_ptr = prev;
*target_ptr = target;
return true;
} else if (target->key > key) {
*prev_ptr = prev;
*target_ptr = target;
return false;
} else {
prev = target;
m_target = target->next.load(std::memory_order_acquire);
}
}
*prev_ptr = prev;
*target_ptr = nullptr;
return false;
}
// Study: Keep insert until success
void Insert(K key, const V &value) {
Entry *prev = nullptr;
Entry *target = nullptr;
Entry *new_entry = new Entry(key, value);
V *new_value = new V(value);
while (true) {
if (Find(key, &prev, &target)) {
// key exists, update value
auto old_val_ptr = target->value_ptr.load(std::memory_order_acquire);
if (target->value_ptr.compare_exchange_strong(
old_val_ptr, new_value, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
delete new_entry;
return;
}
continue;
} else {
new_entry->next.store(target, std::memory_order_release);
if (prev->next.compare_exchange_strong(target, new_entry,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// Insert success
delete new_value;
return;
}
// another entry has been inserted, retry
}
}
}
// Study: Same as above. All the below code is also simple so no explain
void Insert(K key, V &&value) {
Entry *prev = nullptr;
Entry *target = nullptr;
Entry *new_entry = new Entry(key, value);
auto new_value = new V(std::forward<V>(value));
while (true) {
if (Find(key, &prev, &target)) {
// key exists, update value
auto old_val_ptr = target->value_ptr.load(std::memory_order_acquire);
if (target->value_ptr.compare_exchange_strong(
old_val_ptr, new_value, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
delete new_entry;
return;
}
continue;
} else {
new_entry->next.store(target, std::memory_order_release);
if (prev->next.compare_exchange_strong(target, new_entry,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// Insert success
delete new_value;
return;
}
// another entry has been inserted, retry
}
}
}
void Insert(K key) {
Entry *prev = nullptr;
Entry *target = nullptr;
Entry *new_entry = new Entry(key);
auto new_value = new V();
while (true) {
if (Find(key, &prev, &target)) {
// key exists, update value
auto old_val_ptr = target->value_ptr.load(std::memory_order_acquire);
if (target->value_ptr.compare_exchange_strong(
old_val_ptr, new_value, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
delete new_entry;
return;
}
continue;
} else {
new_entry->next.store(target, std::memory_order_release);
if (prev->next.compare_exchange_strong(target, new_entry,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// Insert success
delete new_value;
return;
}
// another entry has been inserted, retry
}
}
}
bool Get(K key, V **value) {
Entry *prev = nullptr;
Entry *target = nullptr;
if (Find(key, &prev, &target)) {
*value = target->value_ptr.load(std::memory_order_acquire);
return true;
}
return false;
}
Entry *head_;
};
private:
Bucket table_[TableSize];
uint64_t capacity_;
uint64_t mode_num_;
};
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_ATOMIC_HASH_MAP_H_
cyber/base/rw_lock_guard
cyber/base/rw_lock_guard
Provide two wrapper
ReadLockGuard
,
WriteLockGuard
for Read-Write-Lock. Using RAII, lock at constructor and unlock at destructor.
cyber/base/atomic_rw_lock
cyber/base/atomic_rw_lock
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_ATOMIC_RW_LOCK_H_
#define CYBER_BASE_ATOMIC_RW_LOCK_H_
#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>
#include "cyber/base/rw_lock_guard.h"
namespace apollo {
namespace cyber {
namespace base {
class AtomicRWLock {
// Study: Since only allow using this lock with the lock guard,
// so the lock and unlock function is placed on private
// then the lockguard must define as friend here.
friend class ReadLockGuard<AtomicRWLock>;
friend class WriteLockGuard<AtomicRWLock>;
public:
static const int32_t RW_LOCK_FREE = 0;
static const int32_t WRITE_EXCLUSIVE = -1;
static const uint32_t MAX_RETRY_TIMES = 5;
AtomicRWLock() {}
explicit AtomicRWLock(bool write_first) : write_first_(write_first) {}
private:
// all these function only can used by ReadLockGuard/WriteLockGuard;
void ReadLock();
void WriteLock();
void ReadUnlock();
void WriteUnlock();
AtomicRWLock(const AtomicRWLock&) = delete;
AtomicRWLock& operator=(const AtomicRWLock&) = delete;
std::atomic<uint32_t> write_lock_wait_num_ = {0};
std::atomic<int32_t> lock_num_ = {0};
bool write_first_ = true;
};
// Study: First wait all write lock release using looping
// (will reschedule this thread if still not release after N try)
// If in write frist mode, need also wait waiting write lock
inline void AtomicRWLock::ReadLock() {
uint32_t retry_times = 0;
int32_t lock_num = lock_num_.load();
if (write_first_) {
do {
while (lock_num < RW_LOCK_FREE || write_lock_wait_num_.load() > 0) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
} else {
do {
while (lock_num < RW_LOCK_FREE) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
}
}
// Study: Don't think too much, just lock
inline void AtomicRWLock::WriteLock() {
int32_t rw_lock_free = RW_LOCK_FREE;
uint32_t retry_times = 0;
write_lock_wait_num_.fetch_add(1);
while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// rw_lock_free will change after CAS fail, so init agin
rw_lock_free = RW_LOCK_FREE;
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
}
write_lock_wait_num_.fetch_sub(1);
}
// Study: Read lock is +, unlock is -
inline void AtomicRWLock::ReadUnlock() { lock_num_.fetch_sub(1); }
// Study: Write lock is -, unlock is +
inline void AtomicRWLock::WriteUnlock() { lock_num_.fetch_add(1); }
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_ATOMIC_RW_LOCK_H_
cyber/base/bounded_queue
cyber/base/bounded_queue
A atomic fixed-size queue imlpemented using circular array and atomic variable.
The atomic imlpementation is similiar to the atomic_fifo so not describe it.
The special feature of this class is its wait_strategy_. It allow WaitQueue.
It is similiar to Dequeue, but it will wait if the queue is empty.
cyber/base/concurrent_object_pool
cyber/base/concurrent_object_pool
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_CONCURRENT_OBJECT_POOL_H_
#define CYBER_BASE_CONCURRENT_OBJECT_POOL_H_
#include <atomic>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <utility>
#include "cyber/base/for_each.h"
#include "cyber/base/macros.h"
namespace apollo {
namespace cyber {
namespace base {
// Study: The internal data structure is a linked list
// that behave like a stack while using a continuous memory
// The have a abstract free node list above the node list
template <typename T>
class CCObjectPool : public std::enable_shared_from_this<CCObjectPool<T>> {
public:
explicit CCObjectPool(uint32_t size);
virtual ~CCObjectPool();
// Study: It arguments is used to pass to the constructor of T
// If input is rvalue, pass reference to rvalue.
// If it is lvalue, it will be lvalue reference
template <typename... Args>
void ConstructAll(Args &&... args);
template <typename... Args>
std::shared_ptr<T> ConstructObject(Args &&... args);
std::shared_ptr<T> GetObject();
void ReleaseObject(T *);
uint32_t size() const;
private:
struct Node {
T object;
Node *next;
};
struct alignas(2 * sizeof(Node *)) Head {
uintptr_t count;
Node *node;
};
private:
CCObjectPool(CCObjectPool &) = delete;
CCObjectPool &operator=(CCObjectPool &) = delete;
bool FindFreeHead(Head *head);
std::atomic<Head> free_head_;
// Study: This is the place for storing the real object
Node *node_arena_ = nullptr;
uint32_t capacity_ = 0;
};
// Study: Initialize node_arena_
template <typename T>
CCObjectPool<T>::CCObjectPool(uint32_t size) : capacity_(size) {
node_arena_ = static_cast<Node *>(CheckedCalloc(capacity_, sizeof(Node)));
FOR_EACH(i, 0, capacity_ - 1) { node_arena_[i].next = node_arena_ + 1 + i; }
node_arena_[capacity_ - 1].next = nullptr;
free_head_.store({0, node_arena_}, std::memory_order_relaxed);
}
// Study: Using the same argument to create object and fill all the node_arena_
template <typename T>
template <typename... Args>
void CCObjectPool<T>::ConstructAll(Args &&... args) {
FOR_EACH(i, 0, capacity_) {
new (node_arena_ + i) T(std::forward<Args>(args)...);
}
}
// Study: This is a very danger implementation
// Since it allow other class get the object shared pointer
// However, it not destruct the object when pool destruct
// It mean that it will cause memory leak if type T have a heap-base member
// Maybe it assumed this pool will only destructed after all object is manually released
// of released by shared-pointer
template <typename T>
CCObjectPool<T>::~CCObjectPool() {
std::free(node_arena_);
}
template <typename T>
bool CCObjectPool<T>::FindFreeHead(Head *head) {
Head new_head;
Head old_head = free_head_.load(std::memory_order_acquire);
do {
// Study: Already at the tails
if (unlikely(old_head.node == nullptr)) {
return false;
}
new_head.node = old_head.node->next;
new_head.count = old_head.count + 1;
} while (!free_head_.compare_exchange_weak(old_head, new_head,
std::memory_order_acq_rel,
std::memory_order_acquire));
// Study: Get the free head, and move the free head
*head = old_head;
return true;
}
// Study: Get one resource, release it if that object no one if point to
template <typename T>
std::shared_ptr<T> CCObjectPool<T>::GetObject() {
Head free_head;
if (unlikely(!FindFreeHead(&free_head))) {
return nullptr;
}
auto self = this->shared_from_this();
return std::shared_ptr<T>(reinterpret_cast<T *>(free_head.node),
[self](T *object) { self->ReleaseObject(object); });
}
// Study: Get the first node in free node list.
template <typename T>
template <typename... Args>
std::shared_ptr<T> CCObjectPool<T>::ConstructObject(Args &&... args) {
Head free_head;
if (unlikely(!FindFreeHead(&free_head))) {
return nullptr;
}
auto self = this->shared_from_this();
T *ptr = new (free_head.node) T(std::forward<Args>(args)...);
return std::shared_ptr<T>(ptr, [self](T *object) {
object->~T();
self->ReleaseObject(object);
});
}
// Study: When an object is released, let it head relinked to the free node list.
template <typename T>
void CCObjectPool<T>::ReleaseObject(T *object) {
Head new_head;
Node *node = reinterpret_cast<Node *>(object);
Head old_head = free_head_.load(std::memory_order_acquire);
do {
node->next = old_head.node;
new_head.node = node;
new_head.count = old_head.count + 1;
} while (!free_head_.compare_exchange_weak(old_head, new_head,
std::memory_order_acq_rel,
std::memory_order_acquire));
}
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_CONCURRENT_OBJECT_POOL_H_
cyber/base/object_pool
cyber/base/object_pool
The regular version of
concurrent_object_pool
without any concurrent promise.
The object create and destruction inside the pool will only happen in the pool constructor and destructor.
This is quite different to
concurrent_object_pool
.
Since
concurrent_object_pool
allow public call of release and create of an object.
And
concurrent_object_pool
doesn’t call object destructor in pool destructor.
Obviously that, the using assumption of this two class is different and they cannot be replace each other easily.
cyber/base/reentrant_rw_lock
cyber/base/reentrant_rw_lock
先定義一下Reentrant rw lock
大體上跟等價於一般的RW lock
主要不同就是
它保護了當前持有鎖的線程,想再加鎖時不會被blocking
減少上下文切換的成本
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_REENTRANT_RW_LOCK_H_
#define CYBER_BASE_REENTRANT_RW_LOCK_H_
#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>
#include "cyber/base/rw_lock_guard.h"
namespace apollo {
namespace cyber {
namespace base {
// Study: 要做reentrant 就一定要先知道自己thread id
static const std::thread::id NULL_THREAD_ID = std::thread::id();
class ReentrantRWLock {
friend class ReadLockGuard<ReentrantRWLock>;
friend class WriteLockGuard<ReentrantRWLock>;
public:
static const int32_t RW_LOCK_FREE = 0;
// Study: lock num = WRITE_EXCLUSIVE mean can read
static const int32_t WRITE_EXCLUSIVE = -1;
static const uint32_t MAX_RETRY_TIMES = 5;
static const std::thread::id null_thread;
ReentrantRWLock() {}
explicit ReentrantRWLock(bool write_first) : write_first_(write_first) {}
private:
// all these function only can used by ReadLockGuard/WriteLockGuard;
void ReadLock();
void WriteLock();
void ReadUnlock();
void WriteUnlock();
ReentrantRWLock(const ReentrantRWLock&) = delete;
ReentrantRWLock& operator=(const ReentrantRWLock&) = delete;
// Study: Check who getting the write lock
std::thread::id write_thread_id_ = {NULL_THREAD_ID};
std::atomic<uint32_t> write_lock_wait_num_ = {0};
// Study: Allow multiple repeated lock, so need lock num to count the lock
std::atomic<int32_t> lock_num_ = {0};
bool write_first_ = true;
};
inline void ReentrantRWLock::ReadLock() {
// Study: Reentrant Check
if (write_thread_id_ == std::this_thread::get_id()) {
return;
}
uint32_t retry_times = 0;
int32_t lock_num = lock_num_.load(std::memory_order_acquire);
if (write_first_) {
do {
while (lock_num < RW_LOCK_FREE ||
write_lock_wait_num_.load(std::memory_order_acquire) > 0) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load(std::memory_order_acquire);
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
} else {
do {
while (lock_num < RW_LOCK_FREE) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load(std::memory_order_acquire);
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
}
}
inline void ReentrantRWLock::WriteLock() {
auto this_thread_id = std::this_thread::get_id();
// Study: Reentrant Check
if (write_thread_id_ == this_thread_id) {
lock_num_.fetch_sub(1);
return;
}
int32_t rw_lock_free = RW_LOCK_FREE;
uint32_t retry_times = 0;
write_lock_wait_num_.fetch_add(1);
while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// rw_lock_free will change after CAS fail, so init agin
rw_lock_free = RW_LOCK_FREE;
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
}
write_thread_id_ = this_thread_id;
write_lock_wait_num_.fetch_sub(1);
}
// Study: 比非reentrant version 多了個thread check
inline void ReentrantRWLock::ReadUnlock() {
if (write_thread_id_ == std::this_thread::get_id()) {
return;
}
lock_num_.fetch_sub(1);
}
// Study: lock num的更新要同時維護write_thread_id_
inline void ReentrantRWLock::WriteUnlock() {
if (lock_num_.fetch_add(1) == WRITE_EXCLUSIVE) {
write_thread_id_ = NULL_THREAD_ID;
}
}
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_REENTRANT_RW_LOCK_H_
cyber/base/signal
cyber/base/signal
signal and slot 的 concept可看qt
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_SIGNAL_H_
#define CYBER_BASE_SIGNAL_H_
#include <algorithm>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
namespace apollo {
namespace cyber {
namespace base {
// Study: Forward Declaration
template <typename... Args>
class Slot;
template <typename... Args>
class Connection;
template <typename... Args>
class Signal {
public:
using Callback = std::function<void(Args...)>;
using SlotPtr = std::shared_ptr<Slot<Args...>>;
using SlotList = std::list<SlotPtr>;
using ConnectionType = Connection<Args...>;
Signal() {}
virtual ~Signal() { DisconnectAllSlots(); }
// Study: When the signal is activate
void operator()(Args... args) {
// Study: limit the lock scope
SlotList local;
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto& slot : slots_) {
local.emplace_back(slot);
}
}
if (!local.empty()) {
for (auto& slot : local) {
(*slot)(args...);
}
}
ClearDisconnectedSlots();
}
// Study: This is the first step in using a signal
// Connect signal to slot
ConnectionType Connect(const Callback& cb) {
auto slot = std::make_shared<Slot<Args...>>(cb);
{
std::lock_guard<std::mutex> lock(mutex_);
slots_.emplace_back(slot);
}
return ConnectionType(slot, this);
}
bool Disconnect(const ConnectionType& conn) {
bool find = false;
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto& slot : slots_) {
if (conn.HasSlot(slot)) {
find = true;
slot->Disconnect();
}
}
}
if (find) {
ClearDisconnectedSlots();
}
return find;
}
void DisconnectAllSlots() {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& slot : slots_) {
slot->Disconnect();
}
slots_.clear();
}
private:
Signal(const Signal&) = delete;
Signal& operator=(const Signal&) = delete;
void ClearDisconnectedSlots() {
std::lock_guard<std::mutex> lock(mutex_);
slots_.erase(
std::remove_if(slots_.begin(), slots_.end(),
[](const SlotPtr& slot) { return !slot->connected(); }),
slots_.end());
}
SlotList slots_;
std::mutex mutex_;
};
// Study: This represent the connection status between signal and slot
// Help maintain the real time connectivity
template <typename... Args>
class Connection {
public:
using SlotPtr = std::shared_ptr<Slot<Args...>>;
using SignalPtr = Signal<Args...>*;
Connection() : slot_(nullptr), signal_(nullptr) {}
Connection(const SlotPtr& slot, const SignalPtr& signal)
: slot_(slot), signal_(signal) {}
virtual ~Connection() {
slot_ = nullptr;
signal_ = nullptr;
}
Connection& operator=(const Connection& another) {
if (this != &another) {
this->slot_ = another.slot_;
this->signal_ = another.signal_;
}
return *this;
}
bool HasSlot(const SlotPtr& slot) const {
if (slot != nullptr && slot_ != nullptr) {
return slot_.get() == slot.get();
}
return false;
}
bool IsConnected() const {
if (slot_) {
return slot_->connected();
}
return false;
}
bool Disconnect() {
if (signal_ && slot_) {
return signal_->Disconnect(*this);
}
return false;
}
private:
SlotPtr slot_;
SignalPtr signal_;
};
template <typename... Args>
class Slot {
public:
using Callback = std::function<void(Args...)>;
Slot(const Slot& another)
: cb_(another.cb_), connected_(another.connected_) {}
explicit Slot(const Callback& cb, bool connected = true)
: cb_(cb), connected_(connected) {}
virtual ~Slot() {}
// Study: When the slot have receive signal, do cb
void operator()(Args... args) {
if (connected_ && cb_) {
cb_(args...);
}
}
void Disconnect() { connected_ = false; }
bool connected() const { return connected_; }
private:
Callback cb_;
bool connected_ = true;
};
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_SIGNAL_H_
cyber/base/thread_pool
cyber/base/thread_pool
建在
bounded_queue
上的一個thread pool,
Task 是一個function 跟對應的argument
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_BASE_THREAD_POOL_H_
#define CYBER_BASE_THREAD_POOL_H_
#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <stdexcept>
#include <thread>
#include <utility>
#include <vector>
#include "cyber/base/bounded_queue.h"
namespace apollo {
namespace cyber {
namespace base {
class ThreadPool {
public:
explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);
// Study: F is the function, Args is arguments,
// it will return the value that wrapped by future
template <typename F, typename... Args>
auto Enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
std::vector<std::thread> workers_;
BoundedQueue<std::function<void()>> task_queue_;
std::atomic_bool stop_;
};
inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
: stop_(false) {
if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
throw std::runtime_error("Task queue init failed.");
}
// Study: Thread pool of course have thread worker
for (size_t i = 0; i < threads; ++i) {
workers_.emplace_back([this] {
while (!stop_) {
std::function<void()> task;
if (task_queue_.WaitDequeue(&task)) {
task();
}
}
});
}
}
// before using the return value, you should check value.valid()
template <typename F, typename... Args>
auto ThreadPool::Enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
// don't allow enqueueing after stopping the pool
if (stop_) {
return std::future<return_type>();
}
task_queue_.Enqueue([task]() { (*task)(); });
return res;
};
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
if (stop_.exchange(true)) {
return;
}
task_queue_.BreakAllWait();
for (std::thread& worker : workers_) {
worker.join();
}
}
} // namespace base
} // namespace cyber
} // namespace apollo
#endif // CYBER_BASE_THREAD_POOL_H_
cyber/base/thread_safe_queue
cyber/base/thread_safe_queue
就一個queue加上了一個mutex.
而它為了也提供wait queue, 是以也加了condition_variable
cyber/base/unbounded_queue
cyber/base/unbounded_queue
bounded_queue的 unbounded版。
不過要留意兩者實現完全不一樣, 而且unbounded_queue為了thread safe
它是用linked_list, 而不是dynamic array.
unbounded_queue理論上性能會比bounded_queue低
而且它都用compare_exchange_strong, 應該是所設計上就不是給大量並發的場景的