天天看點

c++基于ThreadPool實作靈活的異步任務

在工作中有時會有這種需求,在延時中執行一些任務,等待任務逾時或者任務傳回結果再往下執行。如果不做封裝,可能會怎麼做?每次都進入while true?
......
auto start = std::chrono::system_clock::now();
auto timeout = 500;
while (true){
      auto now  = std::chrono::system_clock::now();
      auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(now- start );
      if (_duration > timeout){
        LOGGING_ERROR("duration is %d", timeout.count());
        break;
      }
      //do something

      std::this_thread::sleep_for(std::chrono::milliseconds(10));
}      

或許也能完成目的,但是寫法很不優雅。且程式執行到此處進入while(true)隻能等在這裡了,其他啥活都幹不了。

可以怎麼優化?可以把這部分任務放入線程中異步執行。 

// 耗時操作
auto fetchDataFromDB = [](std::string recvdData,std::function<int(std::string &)> cback) {
    // Make sure that function takes 5 seconds to complete
    std::this_thread::sleep_for(seconds(5));
    //Do stuff like creating DB Connection and fetching Data
    if(cback != nullptr){
      std::string out = "this is from callback ";
      cback(out);
    }
    return "DB_" + recvdData;
  };
 
//把fetchDataFromDB這一IO耗時任務放到線程裡異步執行
//
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data0",
      [&](std::string &result){
      std::cout << "callback result from thread:" << result << std::endl;
      return 0;
      });  


// do otherthings 可以做些其他工作
......
// 等待異步的結果
std::string result = resultFromDB.get(); // waits for fetchDataFromDB to return      

然而每次都要建立線程損耗也不小,可以基于線程池進一步改造。

me::ThreadPool pool(4);
......
//把fetchDataFromDB這一IO耗時操作放到pool中
pool.commit(fetchDataFromDB,"Data1",[&](std::string &result){
      std::cout << "callback result from pool thread:" << result << std::endl;
      return 0;
      });      

然而,這種還是不夠靈活。比如不能靈活控制耗時任務的逾時時間和檢測頻率。

//!
  //! \brief 任務運作函數
  //!
  //! \param f 嘀嗒時期調用的函數,如果使用 lambda 表達式則會自動比對該函數
  //! \param duration 逾時時間
  //! \param timer_duration 嘀嗒周期
  //! \return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)      
...... 
 auto f = [&]
  {
    return checkIO(errorCode);
  };
 
 // f的執行頻率為30毫秒執行一次,逾時時間5秒鐘
 auto future = misc::TaskRunner::getInstance()->taskRunner(f,std::chrono::seconds(5),30);

 misc::TaskRunner::getInstance()->waitForResult(future);      
#define KeepRunning (0U)
#define StopRunning (1U)

#include <ThreadPool.h>

namespace misc
{
//!
//! \brief 任務運作器
//!
class TaskRunner
{
public:
  //!
  //! \brief 單例模型
  //! \return
  //!
  static TaskRunner *getInstance()
  {
    static TaskRunner w;
    return &w;
  }
  ~TaskRunner() {  }

public:
  using taskFunction = std::function<int()>;
  using taskResult_t = std::shared_future<int>;
  //!
  //! \brief 任務運作函數
  //!
  //! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
  //!
  //! \param f 嘀嗒時期調用的函數,如果使用 std::bind 函數則會自動比對該函數
  //! \param duration 逾時時間
  //! \return
  //!
  taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration)
  {
    return start(f, duration);
  }
  //!
  //! \brief 任務運作函數
  //!
  //! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
  //!
  //! \param f 嘀嗒時期調用的函數,如果使用 lambda 表達式則會自動比對該函數
  //! \param duration 逾時時間
  //! \return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration)
  {
    return start(f, duration);
  }
  //!
  //! \brief 任務運作函數
  //!
  //! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
  //!
  //! \param f 嘀嗒時期調用的函數,如果使用 std::bind 函數則會自動比對該函數
  //! \param duration 逾時時間
  //! \param timer_duration 嘀嗒周期,因為預設的嘀嗒時期為
  //! 20um,如果想要自主決定,則使用該函數 \return
  //!
  taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration, int timer_duration)
  {
    return start(f, duration, timer_duration);
  }
  //!
  //! \brief 任務運作函數
  //!
  //! 如果運作逾時,則傳回失敗代碼(未啟用逾時功能)
  //!
  //! \param f 嘀嗒時期調用的函數,如果使用 lambda 表達式則會自動比對該函數
  //! \param duration 逾時時間
  //! \param timer_duration 嘀嗒周期,因為預設的嘀嗒時期為
  //! 20um,如果想要自主決定,則使用該函數 \return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
  {
    return start(f, duration, timer_duration);
  }


  int waitForResult(const taskResult_t &result)
  {
    if (result.valid())
    {
      result.wait();
    }

    return 1;
  }

private:
#define NOW std::chrono::system_clock::now()

  TaskRunner() : pool_(2) {}

  std::shared_future<int> start(const taskFunction &f, const std::chrono::steady_clock::duration &duration,
                                uint64_t interval = 1)
  {
    auto taskResultPtr = std::make_shared<resultMap_t>();
    taskResultPtr->isUsed_ = false;
    taskResultPtr->id_ = idFactory_++;
    auto id = taskResultPtr->id_;

    taskResultPtr->taskResult_ = pool_.enqueue([id, f, duration, interval, this] {
      auto func = f;
      return taskRunner(id, func, NOW, duration, interval);
    });

    return taskResultPtr->taskResult_;
  }

  int taskRunner(uint32_t id, const taskFunction &f, const std::chrono::system_clock::time_point &now,
                 const std::chrono::steady_clock::duration &timeout /* millisecond */,
                 uint32_t interval /* millisecond */)
  {
    while (true)
    {
      auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(NOW - now);
      if (_duration > timeout)
      {
        LOGGING_ERROR("duration is %d", timeout.count());
        return 1;
      }

      if (f() == StopRunning)
      {
        LOGGING_WARN("function is running out.");
        break;
      }

      std::this_thread::sleep_for(std::chrono::milliseconds(interval));
    }
    return 0;
  }

  void waitAll()
  {
      pool_.Wait();
  }

private:
  ThreadPool pool_;

  struct resultMap_t
  {
    unsigned int id_;
    bool isUsed_;
    taskResult_t taskResult_;
  };
  using resultMapPtr_t = std::shared_ptr<resultMap_t>;

  std::atomic_uint idFactory_ = ATOMIC_VAR_INIT(1);
};
}  // namespace misc