天天看點

淺談c++中的開源架構類目--分享二(架構)

  這個系列Lee哥會分享一些c++中的開源架構。

  主要從TCP協定棧、架構、并發性、資料庫、國際化、壓縮、日志、多媒體庫、序列化、XML庫、腳本、Json庫、數學庫、安全、WEB應用架構、網絡庫、異步事件等方面來分享。分享的内容不僅限于自己來寫,有會借鑒網上大牛們的一些文章與大家分享。

  之前我們講了TCP,今天我們來講一講架構。這裡Lee哥把架構的技術分成4個子產品。

  1.facebook開發和使用的開源c++庫folly

  2.高性能網絡應用程式c++11 evented IO LibSourcey

  3.企業應用程式開發架構libphenom

  4.通用C++庫的集合Boost

  對于TCP協定棧想了解的朋友可以看一下

  bilibili/video/av57632138

  今天分享的是:boost asio 實作一個TCP服務端線程池

  原文轉載于blog.csdn/hust_wangyajun/article/details/52571752

  tcp的伺服器端綁定并監聽端口,如果用戶端比較少,可以對每個用戶端建立一個線程進行通訊處理,但當用戶端的數量比較龐大的時候這種思路就變得不可行,一方面線程切換的開銷太大,另一方面,多數線程并不出于“工作”狀态,

賣二手手機靓号平台

長期出于等待事件的狀态。這時,可以使用線程池的架構加快處理速度。

  廢話少說,直接上代碼

  #include

  using boost::asio::ip::tcp;

  class handler_allocator

  : private boost::noncopyable

  {

  public:

  handler_allocator()

  : in_use_(false)

  }

  void* allocate(std::size_t size)

  if (!in_use_ && size < storage_.size)

  in_use_=true;

  return storage_.address();

  else

  return ::operator new(size);

  void deallocate(void* pointer)

  if (pointer==storage_.address())

  in_use_=false;

  ::operator delete(pointer);

  private:

  // Storage space used for handler-based custom memory allocation.

  boost::aligned_storage<1024> storage_;

  // Whether the handler-based custom allocation storage has been used.

  bool in_use_;

  };

  template

  class custom_alloc_handler

  custom_alloc_handler(handler_allocator& a, Handler h)

  : allocator_(a),

  handler_(h)

  void operator()(Arg1 arg1)

  handler_(arg1);

  void operator()(Arg1 arg1, Arg2 arg2)

  handler_(arg1, arg2);

  friend void* asio_handler_allocate(std::size_t size,

  custom_alloc_handler* this_handler)

  return this_handler->allocator_.allocate(size);

  friend void asio_handler_deallocate(void pointer, std::size_t /size*/,

  this_handler->allocator_.deallocate(pointer);

  handler_allocator& allocator_;

  Handler handler_;

  // Helper function to wrap a handler object to add custom allocation.

  inline custom_alloc_handler make_custom_alloc_handler(

  handler_allocator& a, Handler h)

  return custom_alloc_handler(a, h);

  /// A pool of io_service objects.

  class io_service_pool

  /// Construct the io_service pool.

  explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)

  if (pool_size==0)

  throw std::runtime_error("io_service_pool size is 0");

  // Give all the io_services work to do so that their run() functions will not

  // exit until they are explicitly stopped.

  for (std::size_t i=0; i < pool_size; ++i)

  io_service_ptr io_service(new boost::asio::io_service);

  work_ptr work(new boost::asio::io_service::work(*io_service));

  io_services_.push_back(io_service);

  work_.push_back(work);

  // Run all io_service objects in the pool.

  void run()

  // Create a pool of threads to run all of the io_services.

  std::vector > threads;

  for (std::size_t i=0; i < io_services_.size(); ++i)

  boost::shared_ptr thread(new boost::thread(

  boost::bind(&boost::asio::io_service::run, io_services_[i])));

  threads.push_back(thread);

  // Wait for all threads in the pool to exit.

  for (std::size_t i=0; i < threads.size(); ++i)

  threads[i]->join();

  // Stop all io_service objects in the pool.

  void stop()

  // Explicitly stop all io_services.

  io_services_[i]->stop();

  // Get an io_service to use.

  boost::asio::io_service& get_io_service()

  // Use a round-robin scheme to choose the next io_service to use.

  boost::asio::io_service& io_service=*io_services_[next_io_service_];

  ++next_io_service_;

  if (next_io_service_==io_services_.size())

  next_io_service_=0;

  return io_service;

  typedef boost::shared_ptr io_service_ptr;

  typedef boost::shared_ptr work_ptr;

  /// The pool of io_services.

  std::vector io_services_;

  /// The work that keeps the io_services running.

  std::vector work_;

  /// The next io_service to use for a connection.

  std::size_t next_io_service_;

  class session

  : public boost::enable_shared_from_this

  session(boost::asio::io_service& work_service

  , boost::asio::io_service& io_service)

  : socket_(io_service)

  , io_work_service(work_service)

  tcp::socket& socket()

  return socket_;

  void start()

  boost::system::error_code error;

  handle_write(error);

  socket_.async_read_some(boost::asio::buffer(data_),

  make_custom_alloc_handler(allocator_,

  boost::bind(&session::handle_read,

  shared_from_this(),

  boost::asio::placeholders::error,

  boost::asio::placeholders::bytes_transferred)));

  void handle_read(const boost::system::error_code& error,

  size_t bytes_transferred)

  if (!error)

  boost::shared_ptr > buf(new std::vector);

  buf->resize(bytes_transferred);

  std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());

  io_work_service.post(boost::bind(&session::on_receive

  , shared_from_this(), buf, bytes_transferred));

  void handle_write(const boost::system::error_code& error)

  char notice[]="Welcome to Connect to Hiper Service";

  size_t num;

  try

  num=socket_.send(boost::asio::buffer(notice));

  catch(std::exception &e)

  std::cout<<"exception: "<

  if(num>0)

  std::cout<<"send : "<< notice << std::endl;

  void on_receive(boost::shared_ptr > buffers

  , size_t bytes_transferred)

  char data_stream=&(buffers->begin());

  // in here finish the work.

  std::cout << "receive :" << bytes_transferred << " bytes." <<

  "message :" << data_stream << std::endl;

  // The io_service used to finish the work.

  boost::asio::io_service& io_work_service;

  // The socket used to communicate with the client.

  tcp::socket socket_;

  // Buffer used to store data received from the client.

  boost::array data_;

  // The allocator to use for handler-based custom memory allocation.

  handler_allocator allocator_;

  typedef boost::shared_ptr session_ptr;

  class server

  server(short port, std::size_t io_service_pool_size)

  : io_service_pool_(io_service_pool_size)

  , io_service_work_pool_(io_service_pool_size)

  , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))

  session_ptr new_session(new session(io_service_work_pool_.get_io_service()

  , io_service_pool_.get_io_service()));

  acceptor_.async_accept(new_session->socket(),

  boost::bind(&server::handle_accept, this, new_session,

  boost::asio::placeholders::error));

  void handle_accept(session_ptr new_session,

  const boost::system::error_code& error)

  new_session->start();

  new_session.reset(new session(io_service_work_pool_.get_io_service()

  io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run

  , &io_service_pool_)));

  work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run

  , &io_service_work_pool_)));

  io_service_pool_.stop();

  io_service_work_pool_.stop();

  io_thread_->join();

  work_thread_->join();

  boost::shared_ptr io_thread_;

  boost::shared_ptr work_thread_;

  io_service_pool io_service_pool_;

  io_service_pool io_service_work_pool_;

  tcp::acceptor acceptor_;