在工作中有時會有這種需求,在延時中執行一些任務,等待任務逾時或者任務傳回結果再往下執行。如果不做封裝,可能會怎麼做?每次都進入while true?
......
auto start = std::chrono::system_clock::now();
auto timeout = 500;
while (true){
auto now = std::chrono::system_clock::now();
auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(now- start );
if (_duration > timeout){
LOGGING_ERROR("duration is %d", timeout.count());
break;
}
//do something
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
或許也能完成目的,但是寫法很不優雅。且程式執行到此處進入while(true)隻能等在這裡了,其他啥活都幹不了。
可以怎麼優化?可以把這部分任務放入線程中異步執行。
// 耗時操作
auto fetchDataFromDB = [](std::string recvdData,std::function<int(std::string &)> cback) {
// Make sure that function takes 5 seconds to complete
std::this_thread::sleep_for(seconds(5));
//Do stuff like creating DB Connection and fetching Data
if(cback != nullptr){
std::string out = "this is from callback ";
cback(out);
}
return "DB_" + recvdData;
};
//把fetchDataFromDB這一IO耗時任務放到線程裡異步執行
//
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data0",
[&](std::string &result){
std::cout << "callback result from thread:" << result << std::endl;
return 0;
});
// do otherthings 可以做些其他工作
......
// 等待異步的結果
std::string result = resultFromDB.get(); // waits for fetchDataFromDB to return
然而每次都要建立線程損耗也不小,可以基于線程池進一步改造。
me::ThreadPool pool(4);
......
//把fetchDataFromDB這一IO耗時操作放到pool中
pool.commit(fetchDataFromDB,"Data1",[&](std::string &result){
std::cout << "callback result from pool thread:" << result << std::endl;
return 0;
});
然而,這種還是不夠靈活。比如不能靈活控制耗時任務的逾時時間和檢測頻率。
//!
//! \brief 任務運作函數
//!
//! \param f 嘀嗒時期調用的函數,如果使用 lambda 表達式則會自動比對該函數
//! \param duration 逾時時間
//! \param timer_duration 嘀嗒周期
//! \return
//!
taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
......
auto f = [&]
{
return checkIO(errorCode);
};
// f的執行頻率為30毫秒執行一次,逾時時間5秒鐘
auto future = misc::TaskRunner::getInstance()->taskRunner(f,std::chrono::seconds(5),30);
misc::TaskRunner::getInstance()->waitForResult(future);
#define KeepRunning (0U)
#define StopRunning (1U)
#include <ThreadPool.h>
namespace misc
{
//!
//! \brief 任務運作器
//!
class TaskRunner
{
public:
//!
//! \brief 單例模型
//! \return
//!
static TaskRunner *getInstance()
{
static TaskRunner w;
return &w;
}
~TaskRunner() { }
public:
using taskFunction = std::function<int()>;
using taskResult_t = std::shared_future<int>;
//!
//! \brief 任務運作函數
//!
//! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
//!
//! \param f 嘀嗒時期調用的函數,如果使用 std::bind 函數則會自動比對該函數
//! \param duration 逾時時間
//! \return
//!
taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration)
{
return start(f, duration);
}
//!
//! \brief 任務運作函數
//!
//! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
//!
//! \param f 嘀嗒時期調用的函數,如果使用 lambda 表達式則會自動比對該函數
//! \param duration 逾時時間
//! \return
//!
taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration)
{
return start(f, duration);
}
//!
//! \brief 任務運作函數
//!
//! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
//!
//! \param f 嘀嗒時期調用的函數,如果使用 std::bind 函數則會自動比對該函數
//! \param duration 逾時時間
//! \param timer_duration 嘀嗒周期,因為預設的嘀嗒時期為
//! 20um,如果想要自主決定,則使用該函數 \return
//!
taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration, int timer_duration)
{
return start(f, duration, timer_duration);
}
//!
//! \brief 任務運作函數
//!
//! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
//!
//! \param f 嘀嗒時期調用的函數,如果使用 lambda 表達式則會自動比對該函數
//! \param duration 逾時時間
//! \param timer_duration 嘀嗒周期,因為預設的嘀嗒時期為
//! 20um,如果想要自主決定,則使用該函數 \return
//!
taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
{
return start(f, duration, timer_duration);
}
int waitForResult(const taskResult_t &result)
{
if (result.valid())
{
result.wait();
}
return 1;
}
private:
#define NOW std::chrono::system_clock::now()
TaskRunner() : pool_(2) {}
std::shared_future<int> start(const taskFunction &f, const std::chrono::steady_clock::duration &duration,
uint64_t interval = 1)
{
auto taskResultPtr = std::make_shared<resultMap_t>();
taskResultPtr->isUsed_ = false;
taskResultPtr->id_ = idFactory_++;
auto id = taskResultPtr->id_;
taskResultPtr->taskResult_ = pool_.enqueue([id, f, duration, interval, this] {
auto func = f;
return taskRunner(id, func, NOW, duration, interval);
});
return taskResultPtr->taskResult_;
}
int taskRunner(uint32_t id, const taskFunction &f, const std::chrono::system_clock::time_point &now,
const std::chrono::steady_clock::duration &timeout /* millisecond */,
uint32_t interval /* millisecond */)
{
while (true)
{
auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(NOW - now);
if (_duration > timeout)
{
LOGGING_ERROR("duration is %d", timeout.count());
return 1;
}
if (f() == StopRunning)
{
LOGGING_WARN("function is running out.");
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
return 0;
}
void waitAll()
{
pool_.Wait();
}
private:
ThreadPool pool_;
struct resultMap_t
{
unsigned int id_;
bool isUsed_;
taskResult_t taskResult_;
};
using resultMapPtr_t = std::shared_ptr<resultMap_t>;
std::atomic_uint idFactory_ = ATOMIC_VAR_INIT(1);
};
} // namespace misc