天天看點

boost之無鎖隊列使用執行個體

#ifndef SEARCH_ENGINE_H
#define SEARCH_ENGINE_H

#include "boost_comm.h"
#include "message_header.h"

namespace tspace
{
  class search_engine
  {
  public:
    search_engine();
    ~search_engine();
  public:
    static search_engine *instance()
    {
      static search_engine ins_;
      return &ins_;
    }
  public:

    static void print()
    {
      cout << "search_engine:" << instance()->m_add_feature_counts_ << " "
        << instance()->m_start_task_counts_ << " "
        << instance()->m_get_result_counts_ << " ";
    }
  public:
    static bool mt_push_add_featrue(add_feature *src);
    static bool mt_push_start_task(start_task *src);
    static bool mt_push_stop_task(stop_task *src);
    static bool mt_push_get_result(get_result *src);

    boost::atomic<int> m_add_feature_counts_;
    boost::atomic<int> m_total_feature_counts_ ;
    boost::atomic<int> m_start_task_counts_;
    boost::atomic<int> m_stop_task_counts_;
    boost::atomic<int> m_get_result_counts_;

    boost::mutex mtx1_;
    boost::mutex mtx2_;
    boost::mutex mtx3_;
    boost::mutex mtx4_;
    static bool mt_pop_add_feature(ptr_add_feature &ctx);
    static bool mt_pop_start_task(ptr_start_task &ctx);
    boost::mutex map_lock_;
    std::map<std::string, start_task *> m_curr_task_list;
    static bool mt_pop_stop_task(ptr_stop_task &ctx);
    static bool mt_pop_get_result(ptr_get_result &ctx);
  protected:
    //std=c++11,spsc_queue
#ifdef STD_CPP11
    boost::lockfree::spsc_queue<ptr_add_feature> add_feature_;
    boost::lockfree::spsc_queue<ptr_start_task> start_task_;
    boost::lockfree::spsc_queue<ptr_stop_task> stop_task_;
    boost::lockfree::spsc_queue<ptr_get_result> get_result_;
#else
    boost::lockfree::queue<ptr_add_feature> add_feature_;
    boost::lockfree::queue<ptr_start_task> start_task_;
    boost::lockfree::queue<ptr_start_task> stop_task_;
    boost::lockfree::queue<ptr_get_result> get_result_;
#endif
  public:
    boost::mutex mtx_;
    static int mt_push_task_result(const string taskid, result_data *data, int max = 100);
    static int mt_commit_task_result(const string &taskid);
    static int mt_pop_task_result(const std::string taskid, std::string &json, const int size);
  protected:
    std::map<std::string, ptr_ret_result> taskid_result_;
    typedef std::map<std::string, ptr_ret_result>::iterator taskid_result_itr;
    //boost::circular_buffer<flow_in_window_t> flows_in_windows_;
    friend class file_system;
    friend class picture_search;
  };
}

#endif // SEARCH_ENGINE_H      
#include "search_engine.h"

#include "thread_pool.h"
#include "picture_search.h"

#include "file_system.h"
#include "api_database.h"
#include "tcp_client.h"

#include "object_pool.h"
#include "session_manager.h"

#include "config.h"
#include "base64.h"

namespace tspace
{
  search_engine::search_engine()
    :add_feature_(2048),
    start_task_(1024),
    get_result_(1024),
                stop_task_(1024),
    m_add_feature_counts_(0),
    m_start_task_counts_(0),
    m_get_result_counts_(0),
    m_total_feature_counts_(0)
  {
  }

  search_engine::~search_engine()
  {
  }

  boost::mutex mtx_1_;
  void add_feature_run()
  {
    try
    {
      //boost::unique_lock<boost::mutex> wlock(mtx_1_);
      add_feature *p = NULL;
      bool ret = search_engine::mt_pop_add_feature(p);
      if (ret && p && p->container_bin_.size() > 0)
      {
        bin_value *ptr_bin = p->container_bin_.front();
        //file_system::fstore_file(p->car_id_, p->json_basic_, (char *)p->buffer_ctx_, p->buffer_size_);
        file_system::fstore_file(p->car_id_, p->json_basic_, (char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_);
        buffer_pool::bin_value_free(p->buffer_ctx_);
        delete ptr_bin;
        delete p;
      }
      else
      {
        std::ostringstream oslog;
        oslog << "add_feature_run failed ret:" << ret;
        LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
      }
    }
    catch (...)
    {
      std::ostringstream oslog;
      oslog << "add_feature_run throw exception" ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }
  }

  boost::mutex mtx_2_;
  void start_task_run()
  {
    try
    {
      //boost::unique_lock<boost::mutex> wlock(mtx_2_);
      start_task *p = NULL;
      bool ret = search_engine::mt_pop_start_task(p);
      if (ret && p && p->container_bin_.size() > 0)
      {
        //task start
        bin_value *ptr_bin = p->container_bin_.front();
        search_engine::instance()->map_lock_.lock();
        if(search_engine::instance()->m_curr_task_list.find(p->task_id_)==search_engine::instance()->m_curr_task_list.end())
        {
          auto itr = search_engine::instance()->m_curr_task_list.insert(make_pair<>(p->task_id_, p));
          search_engine::instance()->map_lock_.unlock();
          file_system::search_file(p->task_id_, p->json_condition_, p->json_detail_, 
            (char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_);
          search_engine::instance()->map_lock_.lock();
          if(search_engine::instance()->m_curr_task_list.find(p->task_id_) !=search_engine::instance()->m_curr_task_list.end())
            search_engine::instance()->m_curr_task_list.erase(itr.first);
          search_engine::instance()->map_lock_.unlock();
        }
        else 
        {
          std::ostringstream oslog;
          oslog <<"taskID "<<p->task_id_<<" is exist ,start task is failed";
          LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
          search_engine::instance()->map_lock_.unlock();
        }
        delete ptr_bin;
        delete p;
        //task end
      }
    }
    catch (...)
    {
      std::ostringstream oslog;
      oslog << "start_task_run throw exception" ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }
    
  }

  boost::mutex mtx_3_;
  void stop_task_run()
  {
    try
    {
      //boost::unique_lock<boost::mutex> wlock(mtx_3_);
      stop_task *p = NULL;
      bool ret = search_engine::mt_pop_stop_task(p);
      if (ret && p)
      {
        search_engine::instance()->map_lock_.lock();
        auto itr = search_engine::instance()->m_curr_task_list.find(p->task_id_);
        if (itr != search_engine::instance()->m_curr_task_list.end())
        {
          itr->second->task_state_ = task_sigal_interrupt;
          search_engine::instance()->m_curr_task_list.erase(itr);
        }
        search_engine::instance()->map_lock_.unlock();

        delete p;
      }
    }
    catch (...)
    {
      std::ostringstream oslog;
      oslog << "stop_task_run throw exception" ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }

  }

  boost::mutex mtx_4_;
  void get_result_run()
  {
    try
    {
      //boost::unique_lock<boost::mutex> wlock(mtx_4_);
      get_result *p = NULL;
      bool ret = search_engine::mt_pop_get_result(p);
      if (ret && p)
      {
        std::string str_json;
        search_engine::mt_pop_task_result(p->task_id_, str_json, 100);
        if (p->user_ctx_)
        {
          unsigned char *buff = buffer_pool::bin_value_alloc();
          int len = BUFFER_POOL_BIN_CHUNK_SIZE;
          bool ret = protocol_functions::construct_response((char *)buff, len, str_json.c_str());
          (*((tcp_client *)p->user_ctx_))((char *)buff, len);
          buffer_pool::bin_value_free(buff);
        }
        delete p;
      }
    }
    catch (...)
    {
      std::ostringstream oslog;
      oslog << "get_result_run throw exception" ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }
  }
#ifdef ENABLE_BOOST_THREAD
  bool search_engine::mt_push_add_featrue(add_feature *src)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx1_);
    bool ret = instance()->add_feature_.push(src);
    if (ret)
    {
      instance()->m_add_feature_counts_++;
      thread_pool::post_task(add_feature_run);
      if (instance()->m_total_feature_counts_++ % 400 == 1)
      {
        std::ostringstream oslog;
        oslog << "add_feature:" << instance()->m_add_feature_counts_ << " total_feature_counts_:" << instance()->m_total_feature_counts_ << LOG_END;
        LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());
      }
    }
    else
    {
      std::ostringstream oslog;
      oslog << "search_engine add_feature max:" << instance()->m_add_feature_counts_ ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }
    
    return ret;
  }

  bool search_engine::mt_pop_add_feature(ptr_add_feature &ctx)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx1_);
    bool ret = instance()->add_feature_.pop(ctx);
    if (ret)
    {
      instance()->m_add_feature_counts_--;
    }
    return ret;
  }

  bool search_engine::mt_push_start_task(start_task *src)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx2_);
    bool ret = instance()->start_task_.push(src);
    if (ret)
    {
      instance()->m_start_task_counts_++;
      thread_pool::post_task(start_task_run);
    }
    else
    {
      std::ostringstream oslog;
      oslog << "search_engine start_task max:" << instance()->m_start_task_counts_ << LOG_END;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }
    
    return ret;
  }
  bool search_engine::mt_push_stop_task(stop_task * src)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx4_);
    bool ret = instance()->stop_task_.push(src);
    if (ret)
    {
      instance()->m_stop_task_counts_++;
      thread_pool::post_task(stop_task_run);
    }
    else
    {
      std::ostringstream oslog;
      oslog << "search_engine stop_task max:" << instance()->m_stop_task_counts_ ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }

    return ret;
  }
  bool search_engine::mt_pop_start_task(ptr_start_task &ctx)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx2_);
    bool ret = instance()->start_task_.pop(ctx);
    if (ret)
    {
      instance()->m_start_task_counts_--;
    }
    return ret;
  }
  bool search_engine::mt_pop_stop_task(ptr_stop_task & ctx)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx4_);
    bool ret = instance()->stop_task_.pop(ctx);
    if (ret)
    {
      instance()->m_stop_task_counts_++;
    }
    return ret;
  }
  bool search_engine::mt_push_get_result(get_result *src)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx3_);
    bool ret = instance()->get_result_.push(src);
    if (ret)
    {
      instance()->m_get_result_counts_++;
      thread_pool::post_task(get_result_run);
    }
    else
    {
      std::ostringstream oslog;
      oslog << "search_engine get_result max:" << instance()->m_get_result_counts_ ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }
    
    return ret;
  }
  bool search_engine::mt_pop_get_result(ptr_get_result &ctx)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx3_);
    bool ret = instance()->get_result_.pop(ctx);
    if (ret)
    {
      instance()->m_get_result_counts_++;
    }
    return ret;
  }

  int search_engine::mt_push_task_result(const string taskid, result_data *data, int max/*=100*/)
  {
    bool ret = false;
    try
    {
      boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
      //LOG(INFO)<< "push task result taskid:" << taskid <<LOG_END;
      taskid_result_itr itr = instance()->taskid_result_.find(taskid);
      if (itr != instance()->taskid_result_.end())
      {
        if (itr->second->ret_result_.size() < 100)
        {
          itr->second->ret_result_.insert(data);
          //LOG(INF) <<"data:"<<data<<" size:" << itr->second->ret_result_.size() << endl;
        }
        else
        {
            result_data *b_data=*(itr->second->ret_result_.begin());
          if(data->matchingScore_>b_data->matchingScore_){
            itr->second->ret_result_.insert(data);
            object_pool::free_data(b_data);
            itr->second->ret_result_.erase(itr->second->ret_result_.begin());
          }
          else
          {
            object_pool::free_data(data);
          }
        }
      }
      else
      {
        ret_result *ptr_res = object_pool::alloc_result("0", "success");
        ptr_res->ret_result_.insert(data);
        ret = instance()->taskid_result_.insert(make_pair<>(taskid, ptr_res)).second;
      }
    }
    catch (...)
    {
      std::ostringstream oslog;
      oslog << "mt_push_task_result excp" ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
    }
    
    return ret;
  }

  int search_engine::mt_commit_task_result(const string &taskid)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
    taskid_result_itr itr_result = instance()->taskid_result_.find(taskid);
    std::string strSql = "";
    std::ostringstream ostrstream;
    if (itr_result != instance()->taskid_result_.end())
    {
      //char db_buff[1024];
      ostrstream<<"insert into picsearch_fstore_detail values";
      int count =0;
      for (std::multiset<ptr_result_data, result_data_cmp>::iterator itr = itr_result->second->ret_result_.begin();
        itr != itr_result->second->ret_result_.end(); )
      {
        string carPlateNumber = (*itr)->carPlateNumber_;
        //char outbuf[1024];
      //  int outlen = 1024;
#ifdef _WIN32
#else
        //extern int code_convert(char *from_charset, char *to_charset, char *inbuf, int inlen, char *outbuf, int outlen);
        //code_convert("gb2312", "utf-8", (char *)carPlateNumber.c_str(), carPlateNumber.size(), outbuf, outlen);
#endif
        //taskId,carId,matchingScore,snapShotTime,carPlateNumber,searchType
        if(count++>0)
          ostrstream<<",";
        ostrstream<<"("<<taskid<<","<<(*itr)->objectId_<<","<<(*itr)->numType_ << "," <<(*itr)->matchingScore_<<","<<(*itr)->snapshotTime_<<",0)";
        /*snprintf(db_buff, 1024, "insert into picsearch_fstore_detail values(%s,%s,%3.3f,%s,'%s',%d)",
          taskid.c_str(), (*itr)->carId_.c_str(),(*itr)->matchingScore_, (*itr)->snapshotTime_.c_str(), 
          carPlateNumber.c_str(), 0);*/
        object_pool::free_data(*itr);
        itr_result->second->ret_result_.erase(itr++);
      }
      strSql = ostrstream.str();
        file_system::instance()->task_detail_db_->excute_insert(strSql.c_str());
        
      object_pool::free_result(itr_result->second);
      instance()->taskid_result_.erase(itr_result);
    }
  }

  int search_engine::mt_pop_task_result(const std::string taskid, std::string &str_json, const int size)
  {
    boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
    std::ostringstream oslog;
    oslog << "pop task result taskid:" << taskid ;
    LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());
    taskid_result_itr itr = instance()->taskid_result_.find(taskid);
    if (itr != instance()->taskid_result_.end())
    {
      ptr_ret_result ret_res = itr->second;
      //assert(ptr_res != NULL);
      static Json::Value root;
      root.clear();
      static Json::Value arrayObj;
      arrayObj.clear();
      Json::FastWriter writer;
      //construct json string acc size
      std::multiset<ptr_result_data, result_data_cmp>::iterator itr = ret_res->ret_result_.begin();
          
      for (; itr !=ret_res->ret_result_.end();itr++)
      {
        result_data *p = NULL;
        p = *itr;
        
        if (!p)
        {
          break;
        }
        Json::FastWriter writer;

        static Json::Value subdetail;
        subdetail.clear();
        subdetail["objectId"] = p->objectId_.c_str();
        subdetail["type"] = p->numType_.c_str();
        subdetail["matchingScore"] = p->matchingScore_;
        subdetail["snapshotTime"] = p->snapshotTime_;

        string detailString = writer.write(subdetail);

        arrayObj.append(subdetail);

      }
      root["returnCode"] = 0;
      root["msg"] = "success";
      root["data"] = arrayObj;
      str_json = writer.write(root);
      std::ostringstream oslog;
      oslog << "task result:" << str_json << LOG_END;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());

    }
    else
    {
      std::ostringstream oslog;
      oslog << "can't find taskid:" << taskid ;
      LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
      //<!--如果任務已經結束,傳回returnCode = 2,data : [] -->
      str_json = "{\"returnCode\":2,\"msg\":\"can't find taskid\",\"data\":[]}";
    }
    return 0;
  }
#endif
}      

需要說明的是spsc_queue支援一個生産者一個消費者的情況,lockfree::queue支援多個生産者,多個消費者的情況,定義的無鎖隊列在類的構造函數中初始化指定固定大小,否則程式會崩潰,無鎖隊列中沒有統計元素個數的方法,可以在用一個外部變量在push和pop的地方做統計。