天天看點

ACE高效PROACTOR程式設計架構一ClientHandle

1、WIN32下面用proactor可以達到幾乎RAW IOCP的效率,由于封裝關系,應該是差那麼一點。

用戶端處理類的正常寫法:

//處理用戶端連接配接消息

class ClientHandler : public ACE_Service_Handler

{

public:

 /**構造函數

  *

  */

 ClientHandler(unsigned int client_recv_buf_size=SERVER_CLIENT_RECEIVE_BUF_SIZE)

  :_read_msg_block(client_recv_buf_size),_io_count(0)

 {

 }

 ~ClientHandler(){}

 /**

  *初始化,因為可能要用到ClientHandler記憶體池,而這個池又不一定要用NEW

 void init();

 /**清理函數,因為可能要用到記憶體池

 void fini();

//檢查是否逾時的函數

 void check_time_out(time_t cur_time);

 /**用戶端連接配接伺服器成功後調用

  * \param handle 套接字句柄

  * \param &message_block 第一次讀到的資料(未用)

//由Acceptor來調用!!!

 virtual void open (ACE_HANDLE handle,ACE_Message_Block &message_block);

 /**處理網絡讀操作結束消息

  * \param &result 讀操作結果

 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);

 /**處理網絡寫操作結束消息

  * \param &result 寫操作結果

 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);

private:

//**生成一個網絡讀請求

  * \param void 

  * \return 0-成功,-1失敗

 int  initiate_read_stream  (void);

 /**生成一個寫請求

  * \param mb 待發送的資料

  * \param nBytes 待發送資料大小

  * \return 0-成功,-1失敗

 int  initiate_write_stream (ACE_Message_Block & mb, size_t nBytes );

  * \return 檢查是否可以删除,用的是一個引用計數。每一個外出IO的時候+1,每一個IO成功後-1

 int check_destroy();

 //異步讀

 ACE_Asynch_Read_Stream _rs;

 //異步寫

 ACE_Asynch_Write_Stream _ws;

 //接收緩沖區隻要一個就夠了,因為壓根就沒想過要多讀,我直到現在也不是很清楚為什麼要多讀,多讀的話要考慮很多問題

 ACE_Message_Block _read_msg_block;

 //套接字句柄,這個可以不要了,因為基類就有個HANDLER在裡面的。

 //ACE_HANDLE _handle;

 //一個鎖,用戶端反正有東東要鎖的,注意,要用ACE_Recursive_Thread_Mutex而不是用ACE_Thread_Mutex,這裡面是可以重入的,而且在WIN32下是直接的EnterCriticalSection,可以達到很高的效率

 ACE_Recursive_Thread_Mutex _lock;

 //在外IO數量,其實就是引用計數啦,沒啥的。為0的時候就把這個東東關掉啦。

 long _io_count;

//檢查逾時用的,過段時間沒東東就CLOSE他了。

 time_t _last_net_io;

//本來想用另外一種模型的,隻用1個或者2個外出讀,後來想想,反正一般記憶體都是足夠的,就不管了。

 //ACE_Message_Block _send_msg_blocks[2];

 //ACE_Message_Block &_sending_msg_block;

 //ACE_Message_Block &_idle_msg_block;

//TODO:move to prriva and use friend class!!!

//隻是為了效率更高,不用STL的LIST是因為到現在我沒有可用的Node_Allocator,是以效率上會有問題。

 ClientHandler *_next;

 ClientHandler *next(){return _next;}

 void next(ClientHandler *obj){_next=obj;}

};

//這是具體實作,有些地方比較亂,懶得管了,鎖的有些地方不對。懶得改了,反正在出錯或者有瓶頸的時候再做也不遲。

void ClientHandler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)

 _last_net_io=ACE_OS::time(NULL);

 int byterecved=result.bytes_transferred ();

 if ( (result.success ()) && (byterecved != 0))

  //ACE_DEBUG ((LM_DEBUG,  "Receiver completed:%d\n",byterecved));

//處理完資料

  if(handle_received_data()==true)

  {

   //ACE_DEBUG ((LM_DEBUG,  "go on reading...\n"));

//把東東推到頭部,處理粘包

   _read_msg_block.crunch();

   initiate_read_stream();

  }

//這個地方不想用ACE_Atom_op,因為反正要有一個鎖,而且一般都會用鎖,不管了。假如不在意的話,應該直接用ACE_Atom_Op以達到最好的效率

  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);

  _io_count--;

 check_destroy ();

}

void ClientHandler::init()

//初始化資料,并不在構造函數裡做。

 _read_msg_block.rd_ptr(_read_msg_block.base());

 _read_msg_block.wr_ptr(_read_msg_block.base());

 this->handle(ACE_INVALID_HANDLE);

bool ClientHandler::handle_received_data()

...........自己處理

 return true;

//==================================================================

void ClientHandler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)

 //發送成功,RELEASE掉

 //這個不可能有多個RELEASE,直接XX掉

 //result.message_block ().release ();

 MsgBlockManager::get_instance().release_msg_block(&result.message_block());

//bool ClientHandler::destroy () 

//{

// FUNC_ENTER;

// ClientManager::get_instance().release_client_handle(this);

// FUNC_LEAVE;

// return false ;

//}

int  ClientHandler::initiate_read_stream  (void)

 ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);

//考慮到粘包的呀

 if (_rs.read (_read_msg_block, _read_msg_block.space()) == -1)

  ACE_ERROR_RETURN ((LM_ERROR,"%p\n","ACE_Asynch_Read_Stream::read"),-1);

 _io_count++;

 return 0;

/**生成一個寫請求

*

* \param mb 待發送的資料

* \param nBytes 待發送資料大小

* \return 0-成功,-1失敗

*/

int  ClientHandler::initiate_write_stream (ACE_Message_Block & mb, size_t nBytes )

 if (_ws.write (mb , nBytes ) == -1)

  mb.release ();

  ACE_ERROR_RETURN((LM_ERROR,"%p\n","ACE_Asynch_Write_File::write"),-1);

void ClientHandler::open (ACE_HANDLE handle,ACE_Message_Block &message_block)

 //FUNC_ENTER;

 _io_count=0;

 if(_ws.open(*this,this->handle())==-1)

  ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::open"));

 else if (_rs.open (*this, this->handle()) == -1)

  ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Read_Stream::open"));

 else

  initiate_read_stream ();

 check_destroy();

 //FUNC_LEAVE;

void ClientHandler::fini()

void ClientHandler::check_time_out(time_t cur_time)

 //ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);

 //ACE_DEBUG((LM_DEBUG,"cur_time is %u,last io is %u\n",cur_time,_last_net_io));

 //檢測是否已經為0了

 if(this->handle()==ACE_INVALID_HANDLE)

  return;

 if(cur_time-_last_net_io>CLIENT_TIME_OUT_SECONDS)

  ACE_OS::shutdown(this->handle(),SD_BOTH);

  ACE_OS::closesocket(this->handle());

  this->handle(ACE_INVALID_HANDLE);

int ClientHandler::check_destroy()

  if (_io_count> 0)

   return 1;

 ACE_OS::shutdown(this->handle(),SD_BOTH);

 ACE_OS::closesocket(this->handle());

//這個地方給記憶體池吧。

 ClientManager::get_instance().release_client_handle(this);

 //delete this;

繼續閱讀