天天看點

C++并發實戰18: 線程安全的查找表和連結清單

       經常遇見根據關鍵字查找内容的應用如DNS查詢,标準庫的std::map系列可供選擇,但是它們是非線程安全的,一個線程安全的查找表實作如下,其主要是通過hash函數将各個key分散到具體的bucket中去,每個bucket帶有一個共享鎖boost::shared_mutex,進而實作線程安全的高并發資料結構:

#include <vector>
#include <memory>
#include <mutex>
#include <functional>
#include <list>
#include <utility>
#include <boost/thread/shared_mutex.hpp>

template<typename Key,typename Value,typename Hash=std::hash<Key> >
class threadsafe_lookup_table//隻針對每個bucket上鎖,而全局的vector不上鎖,進而降低了鎖的粒度。若使用map則是簡單的操作都需要對整個map上鎖
{
private:
    class bucket_type//具體的桶bucket資料結構,帶有共享鎖boost::shared_mutex
    {
    private:
        typedef std::pair<Key,Value> bucket_value;
        typedef std::list<bucket_value> bucket_data;
        typedef typename bucket_data::iterator bucket_iterator;

        bucket_data data;
        mutable boost::shared_mutex mutex;

        bucket_iterator find_entry_for(Key const& key) const
        {
            return std::find_if(data.begin(),data.end(),
                [&](bucket_value const& item)
                {return item.first==key;});
        }
    public:
        Value value_for(Key const& key,Value const& default_value) const
        {
            boost::shared_lock<boost::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            return (found_entry==data.end())?
                default_value : found_entry->second;
        }

        void add_or_update_mapping(Key const& key,Value const& value)//異常安全的
        {
            std::unique_lock<boost::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            if(found_entry==data.end())
            {
                data.push_back(bucket_value(key,value));//若push_back抛出異常不會影響原來的值
            }
            else
            {
                found_entry->second=value;//指派抛出異常,原始值仍然為改變
            }
        }
    
        void remove_mapping(Key const& key)
        {
            std::unique_lock<boost::shared_mutex> lock(mutex);
            bucket_iterator const found_entry=find_entry_for(key);
            if(found_entry!=data.end())
            {
                data.erase(found_entry);
            }
        }
    };
    
    std::vector<std::unique_ptr<bucket_type> > buckets;//vector裡有多個桶bucket,通過hash函數将key散列到一個具體的bucket中去
    Hash hasher;

    bucket_type& get_bucket(Key const& key) const//從vector找出key散列後對應的bucket
    {
        std::size_t const bucket_index=hasher(key)%buckets.size();
        return *buckets[bucket_index];
    }

public:
    typedef Key key_type;
    typedef Value mapped_type;
    typedef Hash hash_type;
    
    threadsafe_lookup_table(//構造函數
        unsigned num_buckets=19, Hash const& hasher_=Hash()):
        buckets(num_buckets),hasher(hasher_)
    {
        for(unsigned i=0;i<num_buckets;++i)
        {
            buckets[i].reset(new bucket_type);
        }
    }

    threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;//為了簡化代碼禁止拷貝和指派
    threadsafe_lookup_table& operator=(
        threadsafe_lookup_table const& other)=delete;
    
    Value value_for(Key const& key,Value const& default_value=Value()) const//value_for是lock free的,可以多個線程并發調用,不修改任何資料故異常安全
    {
        return get_bucket(key).value_for(key,default_value);
    }
    
    void add_or_update_mapping(Key const& key,Value const& value)//異常安全的,因為add_or_update_mapping是異常安全的
    {
        get_bucket(key).add_or_update_mapping(key,value);
    }
    
    void remove_mapping(Key const& key)
    {
        get_bucket(key).remove_mapping(key);//erase是異常安全的
    }
};
           

    這對連結清單實作一個線程安全的版本,連結清單中每個元素都持有一個mutex,進而對連結清單的每一個操作至多持有目前節點和下一節點的mutex,這樣鎖的粒度更細了提高了并發的性能:

#include <memory>
#include <mutex>

template<typename T>
class threadsafe_list
{
    struct node//每個節點持有一個mutex
    {
        std::mutex m;
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;

        node():
            next()
        {}
        
        node(T const& value):
            data(std::make_shared<T>(value))
        {}
    };
    
    node head;

public:
    threadsafe_list()
    {}

    ~threadsafe_list()
    {
        remove_if([](T const&){return true;});
    }

    threadsafe_list(threadsafe_list const& other)=delete;
    threadsafe_list& operator=(threadsafe_list const& other)=delete;
    
    void push_front(T const& value)//從頭部插入一個節點隻需要鎖住head
    {
        std::unique_ptr<node> new_node(new node(value));//在臨界區外new,這樣既可以減小臨界區又可以避免臨界區中抛出異常
        std::lock_guard<std::mutex> lk(head.m);
        new_node->next=std::move(head.next);//unique_ptr不能直接指派,但可以通過reset或move
        head.next=std::move(new_node);
    }

    template<typename Function>
    void for_each(Function f)//針對連結清單中每個元素執行f
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);//鎖住目前節點後,立即釋放上一個節點
            lk.unlock();//
            f(*next->data);
            current=next;
            lk=std::move(next_lk);//向後移動,unique_lock is moveable not copyable,而lock_guard不具備移動語義,可見unique_lock比lock_guard靈活
        }
    }

    template<typename Predicate>
    std::shared_ptr<T> find_first_if(Predicate p)//找到連結清單中事謂詞P傳回true的第一個元素
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();//拿到目前元素的鎖後立即釋放上一個鎖
            if(p(*next->data))//謂詞P傳回true,那麼傳回該元素
            {
                return next->data;
            }
            current=next;
            lk=std::move(next_lk);
        }
        return std::shared_ptr<T>();
    }

    template<typename Predicate>
    void remove_if(Predicate p)//删除哪些使得謂詞P傳回true的元素
    {
        node* current=&head;
        std::unique_lock<std::mutex> lk(head.m);
        while(node* const next=current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            if(p(*next->data))
            {
                std::unique_ptr<node> old_next=std::move(current->next);
                current->next=std::move(next->next);//重置連接配接
                next_lk.unlock();//注意這裡并沒有對lk解鎖或者重置
            }
            else
            {
                lk.unlock();
                current=next;
                lk=std::move(next_lk);
            }
        }
    }
};