// proactor_tcp_server.cpp :
//
#include "stdafx.h"
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Log_Msg.h"
#include "ace/streams.h"
#pragma comment(lib,"ACED.lib")
class Service_Proactor : public ACE_Service_Handler
{
public:
~Service_Proactor()
{
if (this->handle() != ACE_INVALID_HANDLE)
ACE_OS::closesocket(this->handle());
}
virtual void open(ACE_HANDLE h, ACE_Message_Block&)
{
this->handle(h);
if (this->reader_.open(*this) != 0)
{
ACE_LOG_MSG->log(LM_INFO, ACE_TEXT("%p\n"),
ACE_TEXT("HA_Proactive_Service open"));
delete this;
return;
}
ACE_Message_Block *mb = new ACE_Message_Block(buffer, 1024);
if (this->reader_.read(*mb, mb->space()) != 0)
{
ACE_LOG_MSG->log(LM_INFO, "Begin read fail\n");
delete this;
return;
}
return;
}
// 異步讀完成後會調用此函數
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block();
if (!result.success() || result.bytes_transferred() == 0)
{
mb.release();
delete this;
return;
}
mb.copy(""); // 結束标記'\0'
ACE_LOG_MSG->log(LM_INFO, "rev:\t%s\n", mb.rd_ptr());
mb.release();
ACE_Message_Block *nmb = new ACE_Message_Block(buffer, 1024);
if (this->reader_.read(*nmb, nmb->space()) != 0)
return;
}
private:
ACE_Asynch_Read_Stream reader_;
char buffer[1024];
};
int main(int argc, char *argv[])
{
ACE_OS_Object_Manager::instance()->starting_up();
// 指定log資訊到proactor_tcp_server.txt檔案
ACE_OSTREAM_TYPE *output = new std::ofstream("proactor_tcp_server.txt");
ACE_LOG_MSG->msg_ostream(output, 1);
ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
ACE_Asynch_Acceptor<Service_Proactor> acceptor;
if (acceptor.open(ACE_INET_Addr(3000, "127.0.0.1")) == -1)
return -1;
// 執行順序:打開連接配接成功後,執行異步讀操作,
// 當接收到資料時,handle_events()傳回1,然後ACE架構将調用handle_read_stream
while (true)
{
ACE_UINT16 ret = ACE_Proactor::instance()->handle_events();
ACE_LOG_MSG->log(LM_INFO, "handle_events return %d \r\n", ret);
}
return 0;
}
// proactor_tcp_client.cpp
//
#include "stdafx.h"
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/streams.h"
#include <sstream>
#pragma comment(lib,"ACED.lib")
class Service_Proactor : public ACE_Service_Handler
{
public:
~Service_Proactor()
{
if (this->handle() != ACE_INVALID_HANDLE)
ACE_OS::closesocket(this->handle());
}
virtual void open(ACE_HANDLE h, ACE_Message_Block&)
{
this->handle(h);
if (this->writer_.open(*this) != 0)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
ACE_TEXT("HA_Proactive_Service open")));
delete this;
return;
}
for (ACE_INT16 i = 0; i < 10; ++i)
{
write_data("1234");
ACE_LOG_MSG->log(LM_INFO, "write data : 1234\r\n");
ACE_OS::sleep(2);
}
return;
}
void write_data(const char * data)
{
ACE_Message_Block *mb = new ACE_Message_Block(100);
mb->copy(data);
if (this->writer_.write(*mb, mb->length()) != 0)
{
ACE_OS::printf("Begin write fail\n");
delete this;
return;
}
}
// 異步寫完成後會調用此函數
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_LOG_MSG->log(LM_INFO, "handle_write_stream() \r\n");
ACE_Message_Block &mb = result.message_block();
mb.release();
return;
}
protected:
private:
ACE_Asynch_Write_Stream writer_;
};
int main(int argc, char *argv[])
{
ACE_OS_Object_Manager::instance()->starting_up();
ACE_OSTREAM_TYPE *output = new std::ofstream("proactor_tcp_client.txt");
ACE_LOG_MSG->msg_ostream(output, 1);
ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
ACE_INET_Addr addr(3000, "127.0.0.1");
Service_Proactor *client = new Service_Proactor();
ACE_Asynch_Connector<Service_Proactor> connector;
connector.open();
if (connector.connect(addr) == -1)
return -1;
while (true)
{
ACE_UINT16 ret = ACE_Proactor::instance()->handle_events();
ACE_LOG_MSG->log(LM_INFO, "handle_events return %d \r\n", ret);
}
return 0;
}
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICdzFWRoRXdvN1LclHdpZXYyd2LcBzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX90zdaVXOHJmb1cVY0ZFSlZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39jN0ADO1MzMxAjMwkDM0EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
異步操作實作架構:
1、
所有具體的異步操作,都繼承于抽象的ACE_Asynch_Operation,并主要實作open方法。
都擁有一個相應的具體異步操作實作Impl成員,這些Impl都有共同的抽象叫ACE_Asynch_Operation_Impl。
2、
這些具體的異步操作類都跟ACE_Handler相關聯,在異步操作類open時,需要指定相關聯的ACE_Handler。
當發生相關操作時,比如
ACE_Asynch_Read_Stream發生異步read、
ACE_Asynch_Write_Stream發生異步write
操作完成時就會調用相關聯的handle_read_stream、handle_write_stream。
同理其它異步操作也一樣的道理。
這事件機制的實作由Proactor,進行路由和分派。
是以需要對Proactor的handle_events進行一直阻塞調用。
當每次handle_events成功傳回一次後,就會發生一次異步完成通知調用操作(即handle_xxx_xxx被調用)。
3、
在具體的異步操作實作中的open時,就将會發生了proactor對相關句柄的注冊。
例如可能會發生對socket句柄、檔案句柄等發生注冊綁定。這注冊過程已封裝在架構裡頭,不用操心。
當這些句柄上所發生的操作完成時,由proactor的循環事件處理并分發通知調用各個相關的handle_xxx_xxx。
Proactor是單例,為大夥共享的。誰都可用它。
但通常我們隻要用它來啟動事件循環即可。其它的句柄注冊綁定啊、事件的完成通知等,架構已經完成了。
啟動事件,在相應的事件處理那裡做我們要做的工作即可。
啟動事件循環可以直接proactor_run_event_loop,
或者自己寫循環,調用handle_events。
int
ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle,
const void *completion_key)
{
ULONG_PTR comp_key (reinterpret_cast<ULONG_PTR> (completion_key));
// No locking is needed here as no state changes.
ACE_HANDLE cp = ::CreateIoCompletionPort (handle,
this->completion_port_,
comp_key,
this->number_of_threads_);
// ...
}