經常遇見根據關鍵字查找内容的應用如DNS查詢,标準庫的std::map系列可供選擇,但是它們是非線程安全的,一個線程安全的查找表實作如下,其主要是通過hash函數将各個key分散到具體的bucket中去,每個bucket帶有一個共享鎖boost::shared_mutex,進而實作線程安全的高并發資料結構:
#include <vector>
#include <memory>
#include <mutex>
#include <functional>
#include <list>
#include <utility>
#include <boost/thread/shared_mutex.hpp>
template<typename Key,typename Value,typename Hash=std::hash<Key> >
class threadsafe_lookup_table//隻針對每個bucket上鎖,而全局的vector不上鎖,進而降低了鎖的粒度。若使用map則是簡單的操作都需要對整個map上鎖
{
private:
class bucket_type//具體的桶bucket資料結構,帶有共享鎖boost::shared_mutex
{
private:
typedef std::pair<Key,Value> bucket_value;
typedef std::list<bucket_value> bucket_data;
typedef typename bucket_data::iterator bucket_iterator;
bucket_data data;
mutable boost::shared_mutex mutex;
bucket_iterator find_entry_for(Key const& key) const
{
return std::find_if(data.begin(),data.end(),
[&](bucket_value const& item)
{return item.first==key;});
}
public:
Value value_for(Key const& key,Value const& default_value) const
{
boost::shared_lock<boost::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
return (found_entry==data.end())?
default_value : found_entry->second;
}
void add_or_update_mapping(Key const& key,Value const& value)//異常安全的
{
std::unique_lock<boost::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry==data.end())
{
data.push_back(bucket_value(key,value));//若push_back抛出異常不會影響原來的值
}
else
{
found_entry->second=value;//指派抛出異常,原始值仍然為改變
}
}
void remove_mapping(Key const& key)
{
std::unique_lock<boost::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry!=data.end())
{
data.erase(found_entry);
}
}
};
std::vector<std::unique_ptr<bucket_type> > buckets;//vector裡有多個桶bucket,通過hash函數将key散列到一個具體的bucket中去
Hash hasher;
bucket_type& get_bucket(Key const& key) const//從vector找出key散列後對應的bucket
{
std::size_t const bucket_index=hasher(key)%buckets.size();
return *buckets[bucket_index];
}
public:
typedef Key key_type;
typedef Value mapped_type;
typedef Hash hash_type;
threadsafe_lookup_table(//構造函數
unsigned num_buckets=19, Hash const& hasher_=Hash()):
buckets(num_buckets),hasher(hasher_)
{
for(unsigned i=0;i<num_buckets;++i)
{
buckets[i].reset(new bucket_type);
}
}
threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;//為了簡化代碼禁止拷貝和指派
threadsafe_lookup_table& operator=(
threadsafe_lookup_table const& other)=delete;
Value value_for(Key const& key,Value const& default_value=Value()) const//value_for是lock free的,可以多個線程并發調用,不修改任何資料故異常安全
{
return get_bucket(key).value_for(key,default_value);
}
void add_or_update_mapping(Key const& key,Value const& value)//異常安全的,因為add_or_update_mapping是異常安全的
{
get_bucket(key).add_or_update_mapping(key,value);
}
void remove_mapping(Key const& key)
{
get_bucket(key).remove_mapping(key);//erase是異常安全的
}
};
這對連結清單實作一個線程安全的版本,連結清單中每個元素都持有一個mutex,進而對連結清單的每一個操作至多持有目前節點和下一節點的mutex,這樣鎖的粒度更細了提高了并發的性能:
#include <memory>
#include <mutex>
template<typename T>
class threadsafe_list
{
struct node//每個節點持有一個mutex
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node():
next()
{}
node(T const& value):
data(std::make_shared<T>(value))
{}
};
node head;
public:
threadsafe_list()
{}
~threadsafe_list()
{
remove_if([](T const&){return true;});
}
threadsafe_list(threadsafe_list const& other)=delete;
threadsafe_list& operator=(threadsafe_list const& other)=delete;
void push_front(T const& value)//從頭部插入一個節點隻需要鎖住head
{
std::unique_ptr<node> new_node(new node(value));//在臨界區外new,這樣既可以減小臨界區又可以避免臨界區中抛出異常
std::lock_guard<std::mutex> lk(head.m);
new_node->next=std::move(head.next);//unique_ptr不能直接指派,但可以通過reset或move
head.next=std::move(new_node);
}
template<typename Function>
void for_each(Function f)//針對連結清單中每個元素執行f
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);//鎖住目前節點後,立即釋放上一個節點
lk.unlock();//
f(*next->data);
current=next;
lk=std::move(next_lk);//向後移動,unique_lock is moveable not copyable,而lock_guard不具備移動語義,可見unique_lock比lock_guard靈活
}
}
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)//找到連結清單中事謂詞P傳回true的第一個元素
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();//拿到目前元素的鎖後立即釋放上一個鎖
if(p(*next->data))//謂詞P傳回true,那麼傳回該元素
{
return next->data;
}
current=next;
lk=std::move(next_lk);
}
return std::shared_ptr<T>();
}
template<typename Predicate>
void remove_if(Predicate p)//删除哪些使得謂詞P傳回true的元素
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*next->data))
{
std::unique_ptr<node> old_next=std::move(current->next);
current->next=std::move(next->next);//重置連接配接
next_lk.unlock();//注意這裡并沒有對lk解鎖或者重置
}
else
{
lk.unlock();
current=next;
lk=std::move(next_lk);
}
}
}
};