#ifndef SEARCH_ENGINE_H
#define SEARCH_ENGINE_H
#include "boost_comm.h"
#include "message_header.h"
namespace tspace
{
class search_engine
{
public:
search_engine();
~search_engine();
public:
static search_engine *instance()
{
static search_engine ins_;
return &ins_;
}
public:
static void print()
{
cout << "search_engine:" << instance()->m_add_feature_counts_ << " "
<< instance()->m_start_task_counts_ << " "
<< instance()->m_get_result_counts_ << " ";
}
public:
static bool mt_push_add_featrue(add_feature *src);
static bool mt_push_start_task(start_task *src);
static bool mt_push_stop_task(stop_task *src);
static bool mt_push_get_result(get_result *src);
boost::atomic<int> m_add_feature_counts_;
boost::atomic<int> m_total_feature_counts_ ;
boost::atomic<int> m_start_task_counts_;
boost::atomic<int> m_stop_task_counts_;
boost::atomic<int> m_get_result_counts_;
boost::mutex mtx1_;
boost::mutex mtx2_;
boost::mutex mtx3_;
boost::mutex mtx4_;
static bool mt_pop_add_feature(ptr_add_feature &ctx);
static bool mt_pop_start_task(ptr_start_task &ctx);
boost::mutex map_lock_;
std::map<std::string, start_task *> m_curr_task_list;
static bool mt_pop_stop_task(ptr_stop_task &ctx);
static bool mt_pop_get_result(ptr_get_result &ctx);
protected:
//std=c++11,spsc_queue
#ifdef STD_CPP11
boost::lockfree::spsc_queue<ptr_add_feature> add_feature_;
boost::lockfree::spsc_queue<ptr_start_task> start_task_;
boost::lockfree::spsc_queue<ptr_stop_task> stop_task_;
boost::lockfree::spsc_queue<ptr_get_result> get_result_;
#else
boost::lockfree::queue<ptr_add_feature> add_feature_;
boost::lockfree::queue<ptr_start_task> start_task_;
boost::lockfree::queue<ptr_start_task> stop_task_;
boost::lockfree::queue<ptr_get_result> get_result_;
#endif
public:
boost::mutex mtx_;
static int mt_push_task_result(const string taskid, result_data *data, int max = 100);
static int mt_commit_task_result(const string &taskid);
static int mt_pop_task_result(const std::string taskid, std::string &json, const int size);
protected:
std::map<std::string, ptr_ret_result> taskid_result_;
typedef std::map<std::string, ptr_ret_result>::iterator taskid_result_itr;
//boost::circular_buffer<flow_in_window_t> flows_in_windows_;
friend class file_system;
friend class picture_search;
};
}
#endif // SEARCH_ENGINE_H
#include "search_engine.h"
#include "thread_pool.h"
#include "picture_search.h"
#include "file_system.h"
#include "api_database.h"
#include "tcp_client.h"
#include "object_pool.h"
#include "session_manager.h"
#include "config.h"
#include "base64.h"
namespace tspace
{
search_engine::search_engine()
:add_feature_(2048),
start_task_(1024),
get_result_(1024),
stop_task_(1024),
m_add_feature_counts_(0),
m_start_task_counts_(0),
m_get_result_counts_(0),
m_total_feature_counts_(0)
{
}
search_engine::~search_engine()
{
}
boost::mutex mtx_1_;
void add_feature_run()
{
try
{
//boost::unique_lock<boost::mutex> wlock(mtx_1_);
add_feature *p = NULL;
bool ret = search_engine::mt_pop_add_feature(p);
if (ret && p && p->container_bin_.size() > 0)
{
bin_value *ptr_bin = p->container_bin_.front();
//file_system::fstore_file(p->car_id_, p->json_basic_, (char *)p->buffer_ctx_, p->buffer_size_);
file_system::fstore_file(p->car_id_, p->json_basic_, (char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_);
buffer_pool::bin_value_free(p->buffer_ctx_);
delete ptr_bin;
delete p;
}
else
{
std::ostringstream oslog;
oslog << "add_feature_run failed ret:" << ret;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
}
catch (...)
{
std::ostringstream oslog;
oslog << "add_feature_run throw exception" ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
}
boost::mutex mtx_2_;
void start_task_run()
{
try
{
//boost::unique_lock<boost::mutex> wlock(mtx_2_);
start_task *p = NULL;
bool ret = search_engine::mt_pop_start_task(p);
if (ret && p && p->container_bin_.size() > 0)
{
//task start
bin_value *ptr_bin = p->container_bin_.front();
search_engine::instance()->map_lock_.lock();
if(search_engine::instance()->m_curr_task_list.find(p->task_id_)==search_engine::instance()->m_curr_task_list.end())
{
auto itr = search_engine::instance()->m_curr_task_list.insert(make_pair<>(p->task_id_, p));
search_engine::instance()->map_lock_.unlock();
file_system::search_file(p->task_id_, p->json_condition_, p->json_detail_,
(char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_);
search_engine::instance()->map_lock_.lock();
if(search_engine::instance()->m_curr_task_list.find(p->task_id_) !=search_engine::instance()->m_curr_task_list.end())
search_engine::instance()->m_curr_task_list.erase(itr.first);
search_engine::instance()->map_lock_.unlock();
}
else
{
std::ostringstream oslog;
oslog <<"taskID "<<p->task_id_<<" is exist ,start task is failed";
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
search_engine::instance()->map_lock_.unlock();
}
delete ptr_bin;
delete p;
//task end
}
}
catch (...)
{
std::ostringstream oslog;
oslog << "start_task_run throw exception" ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
}
boost::mutex mtx_3_;
void stop_task_run()
{
try
{
//boost::unique_lock<boost::mutex> wlock(mtx_3_);
stop_task *p = NULL;
bool ret = search_engine::mt_pop_stop_task(p);
if (ret && p)
{
search_engine::instance()->map_lock_.lock();
auto itr = search_engine::instance()->m_curr_task_list.find(p->task_id_);
if (itr != search_engine::instance()->m_curr_task_list.end())
{
itr->second->task_state_ = task_sigal_interrupt;
search_engine::instance()->m_curr_task_list.erase(itr);
}
search_engine::instance()->map_lock_.unlock();
delete p;
}
}
catch (...)
{
std::ostringstream oslog;
oslog << "stop_task_run throw exception" ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
}
boost::mutex mtx_4_;
void get_result_run()
{
try
{
//boost::unique_lock<boost::mutex> wlock(mtx_4_);
get_result *p = NULL;
bool ret = search_engine::mt_pop_get_result(p);
if (ret && p)
{
std::string str_json;
search_engine::mt_pop_task_result(p->task_id_, str_json, 100);
if (p->user_ctx_)
{
unsigned char *buff = buffer_pool::bin_value_alloc();
int len = BUFFER_POOL_BIN_CHUNK_SIZE;
bool ret = protocol_functions::construct_response((char *)buff, len, str_json.c_str());
(*((tcp_client *)p->user_ctx_))((char *)buff, len);
buffer_pool::bin_value_free(buff);
}
delete p;
}
}
catch (...)
{
std::ostringstream oslog;
oslog << "get_result_run throw exception" ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
}
#ifdef ENABLE_BOOST_THREAD
bool search_engine::mt_push_add_featrue(add_feature *src)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx1_);
bool ret = instance()->add_feature_.push(src);
if (ret)
{
instance()->m_add_feature_counts_++;
thread_pool::post_task(add_feature_run);
if (instance()->m_total_feature_counts_++ % 400 == 1)
{
std::ostringstream oslog;
oslog << "add_feature:" << instance()->m_add_feature_counts_ << " total_feature_counts_:" << instance()->m_total_feature_counts_ << LOG_END;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());
}
}
else
{
std::ostringstream oslog;
oslog << "search_engine add_feature max:" << instance()->m_add_feature_counts_ ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
return ret;
}
bool search_engine::mt_pop_add_feature(ptr_add_feature &ctx)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx1_);
bool ret = instance()->add_feature_.pop(ctx);
if (ret)
{
instance()->m_add_feature_counts_--;
}
return ret;
}
bool search_engine::mt_push_start_task(start_task *src)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx2_);
bool ret = instance()->start_task_.push(src);
if (ret)
{
instance()->m_start_task_counts_++;
thread_pool::post_task(start_task_run);
}
else
{
std::ostringstream oslog;
oslog << "search_engine start_task max:" << instance()->m_start_task_counts_ << LOG_END;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
return ret;
}
bool search_engine::mt_push_stop_task(stop_task * src)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx4_);
bool ret = instance()->stop_task_.push(src);
if (ret)
{
instance()->m_stop_task_counts_++;
thread_pool::post_task(stop_task_run);
}
else
{
std::ostringstream oslog;
oslog << "search_engine stop_task max:" << instance()->m_stop_task_counts_ ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
return ret;
}
bool search_engine::mt_pop_start_task(ptr_start_task &ctx)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx2_);
bool ret = instance()->start_task_.pop(ctx);
if (ret)
{
instance()->m_start_task_counts_--;
}
return ret;
}
bool search_engine::mt_pop_stop_task(ptr_stop_task & ctx)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx4_);
bool ret = instance()->stop_task_.pop(ctx);
if (ret)
{
instance()->m_stop_task_counts_++;
}
return ret;
}
bool search_engine::mt_push_get_result(get_result *src)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx3_);
bool ret = instance()->get_result_.push(src);
if (ret)
{
instance()->m_get_result_counts_++;
thread_pool::post_task(get_result_run);
}
else
{
std::ostringstream oslog;
oslog << "search_engine get_result max:" << instance()->m_get_result_counts_ ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
return ret;
}
bool search_engine::mt_pop_get_result(ptr_get_result &ctx)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx3_);
bool ret = instance()->get_result_.pop(ctx);
if (ret)
{
instance()->m_get_result_counts_++;
}
return ret;
}
int search_engine::mt_push_task_result(const string taskid, result_data *data, int max/*=100*/)
{
bool ret = false;
try
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
//LOG(INFO)<< "push task result taskid:" << taskid <<LOG_END;
taskid_result_itr itr = instance()->taskid_result_.find(taskid);
if (itr != instance()->taskid_result_.end())
{
if (itr->second->ret_result_.size() < 100)
{
itr->second->ret_result_.insert(data);
//LOG(INF) <<"data:"<<data<<" size:" << itr->second->ret_result_.size() << endl;
}
else
{
result_data *b_data=*(itr->second->ret_result_.begin());
if(data->matchingScore_>b_data->matchingScore_){
itr->second->ret_result_.insert(data);
object_pool::free_data(b_data);
itr->second->ret_result_.erase(itr->second->ret_result_.begin());
}
else
{
object_pool::free_data(data);
}
}
}
else
{
ret_result *ptr_res = object_pool::alloc_result("0", "success");
ptr_res->ret_result_.insert(data);
ret = instance()->taskid_result_.insert(make_pair<>(taskid, ptr_res)).second;
}
}
catch (...)
{
std::ostringstream oslog;
oslog << "mt_push_task_result excp" ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
}
return ret;
}
int search_engine::mt_commit_task_result(const string &taskid)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
taskid_result_itr itr_result = instance()->taskid_result_.find(taskid);
std::string strSql = "";
std::ostringstream ostrstream;
if (itr_result != instance()->taskid_result_.end())
{
//char db_buff[1024];
ostrstream<<"insert into picsearch_fstore_detail values";
int count =0;
for (std::multiset<ptr_result_data, result_data_cmp>::iterator itr = itr_result->second->ret_result_.begin();
itr != itr_result->second->ret_result_.end(); )
{
string carPlateNumber = (*itr)->carPlateNumber_;
//char outbuf[1024];
// int outlen = 1024;
#ifdef _WIN32
#else
//extern int code_convert(char *from_charset, char *to_charset, char *inbuf, int inlen, char *outbuf, int outlen);
//code_convert("gb2312", "utf-8", (char *)carPlateNumber.c_str(), carPlateNumber.size(), outbuf, outlen);
#endif
//taskId,carId,matchingScore,snapShotTime,carPlateNumber,searchType
if(count++>0)
ostrstream<<",";
ostrstream<<"("<<taskid<<","<<(*itr)->objectId_<<","<<(*itr)->numType_ << "," <<(*itr)->matchingScore_<<","<<(*itr)->snapshotTime_<<",0)";
/*snprintf(db_buff, 1024, "insert into picsearch_fstore_detail values(%s,%s,%3.3f,%s,'%s',%d)",
taskid.c_str(), (*itr)->carId_.c_str(),(*itr)->matchingScore_, (*itr)->snapshotTime_.c_str(),
carPlateNumber.c_str(), 0);*/
object_pool::free_data(*itr);
itr_result->second->ret_result_.erase(itr++);
}
strSql = ostrstream.str();
file_system::instance()->task_detail_db_->excute_insert(strSql.c_str());
object_pool::free_result(itr_result->second);
instance()->taskid_result_.erase(itr_result);
}
}
int search_engine::mt_pop_task_result(const std::string taskid, std::string &str_json, const int size)
{
boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
std::ostringstream oslog;
oslog << "pop task result taskid:" << taskid ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());
taskid_result_itr itr = instance()->taskid_result_.find(taskid);
if (itr != instance()->taskid_result_.end())
{
ptr_ret_result ret_res = itr->second;
//assert(ptr_res != NULL);
static Json::Value root;
root.clear();
static Json::Value arrayObj;
arrayObj.clear();
Json::FastWriter writer;
//construct json string acc size
std::multiset<ptr_result_data, result_data_cmp>::iterator itr = ret_res->ret_result_.begin();
for (; itr !=ret_res->ret_result_.end();itr++)
{
result_data *p = NULL;
p = *itr;
if (!p)
{
break;
}
Json::FastWriter writer;
static Json::Value subdetail;
subdetail.clear();
subdetail["objectId"] = p->objectId_.c_str();
subdetail["type"] = p->numType_.c_str();
subdetail["matchingScore"] = p->matchingScore_;
subdetail["snapshotTime"] = p->snapshotTime_;
string detailString = writer.write(subdetail);
arrayObj.append(subdetail);
}
root["returnCode"] = 0;
root["msg"] = "success";
root["data"] = arrayObj;
str_json = writer.write(root);
std::ostringstream oslog;
oslog << "task result:" << str_json << LOG_END;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());
}
else
{
std::ostringstream oslog;
oslog << "can't find taskid:" << taskid ;
LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
//<!--如果任務已經結束,傳回returnCode = 2,data : [] -->
str_json = "{\"returnCode\":2,\"msg\":\"can't find taskid\",\"data\":[]}";
}
return 0;
}
#endif
}
需要說明的是spsc_queue支援一個生産者一個消費者的情況,lockfree::queue支援多個生産者,多個消費者的情況,定義的無鎖隊列在類的構造函數中初始化指定固定大小,否則程式會崩潰,無鎖隊列中沒有統計元素個數的方法,可以在用一個外部變量在push和pop的地方做統計。