天天看點

ACE Proactor 異步操作

// proactor_tcp_server.cpp :
           
//

#include "stdafx.h"

#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Log_Msg.h"
#include "ace/streams.h"

#pragma comment(lib,"ACED.lib")

class Service_Proactor : public ACE_Service_Handler
{
public:
	~Service_Proactor()
	{
		if (this->handle() != ACE_INVALID_HANDLE)
			ACE_OS::closesocket(this->handle());
	}

	virtual void open(ACE_HANDLE h, ACE_Message_Block&)
	{
		this->handle(h);
		if (this->reader_.open(*this) != 0)
		{
			ACE_LOG_MSG->log(LM_INFO, ACE_TEXT("%p\n"), 
				ACE_TEXT("HA_Proactive_Service open"));
			delete this;
			return;
		}

		ACE_Message_Block *mb = new ACE_Message_Block(buffer, 1024);

		if (this->reader_.read(*mb, mb->space()) != 0)
		{
			ACE_LOG_MSG->log(LM_INFO, "Begin read fail\n");
			delete this;
			return;
		}
		return;
	}


	// 異步讀完成後會調用此函數
	virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
	{
		ACE_Message_Block &mb = result.message_block();
		if (!result.success() || result.bytes_transferred() == 0)
		{
			mb.release();
			delete this;
			return;
		}
		mb.copy("");    // 結束标記'\0'
		ACE_LOG_MSG->log(LM_INFO, "rev:\t%s\n", mb.rd_ptr());
		mb.release();

		ACE_Message_Block *nmb = new ACE_Message_Block(buffer, 1024);
		if (this->reader_.read(*nmb, nmb->space()) != 0)
			return;
	}

private:
	ACE_Asynch_Read_Stream reader_;
	char buffer[1024];
};



int main(int argc, char *argv[])
{

	ACE_OS_Object_Manager::instance()->starting_up();
	// 指定log資訊到proactor_tcp_server.txt檔案
	ACE_OSTREAM_TYPE *output = new std::ofstream("proactor_tcp_server.txt");
	ACE_LOG_MSG->msg_ostream(output, 1);
	ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);

	ACE_Asynch_Acceptor<Service_Proactor> acceptor;
	if (acceptor.open(ACE_INET_Addr(3000, "127.0.0.1")) == -1)
		return -1;

	// 執行順序:打開連接配接成功後,執行異步讀操作,
	// 當接收到資料時,handle_events()傳回1,然後ACE架構将調用handle_read_stream
	while (true)
	{
		ACE_UINT16 ret = ACE_Proactor::instance()->handle_events();
		ACE_LOG_MSG->log(LM_INFO, "handle_events return %d \r\n", ret);
	}
		
	return 0;
}
           
// proactor_tcp_client.cpp 
//

#include "stdafx.h"

#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/streams.h"
#include <sstream>

#pragma comment(lib,"ACED.lib")

class Service_Proactor : public ACE_Service_Handler
{
public:
	~Service_Proactor()
	{
		if (this->handle() != ACE_INVALID_HANDLE)
			ACE_OS::closesocket(this->handle());
	}

	virtual void open(ACE_HANDLE h, ACE_Message_Block&)
	{
		this->handle(h);
		if (this->writer_.open(*this) != 0)
		{
			ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
				ACE_TEXT("HA_Proactive_Service open")));
			delete this;
			return;
		}

		for (ACE_INT16 i = 0; i < 10; ++i)
		{
			write_data("1234");
			ACE_LOG_MSG->log(LM_INFO, "write data : 1234\r\n");
			ACE_OS::sleep(2);
		}

		return;
	}

	void write_data(const char * data)
	{
		ACE_Message_Block *mb = new ACE_Message_Block(100);
		mb->copy(data);
		if (this->writer_.write(*mb, mb->length()) != 0)
		{
			ACE_OS::printf("Begin write fail\n");
			delete this;
			return;
		}
	}

	// 異步寫完成後會調用此函數
	virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
	{
		ACE_LOG_MSG->log(LM_INFO, "handle_write_stream() \r\n");
		
		ACE_Message_Block &mb = result.message_block();
		mb.release();
		
		return;
	}
protected:


private:
	ACE_Asynch_Write_Stream writer_;
};

int main(int argc, char *argv[])
{
	ACE_OS_Object_Manager::instance()->starting_up();
	ACE_OSTREAM_TYPE *output = new std::ofstream("proactor_tcp_client.txt");
	ACE_LOG_MSG->msg_ostream(output, 1);
	ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);

	ACE_INET_Addr addr(3000, "127.0.0.1");
	Service_Proactor *client = new Service_Proactor();
	ACE_Asynch_Connector<Service_Proactor> connector;

	connector.open();
	if (connector.connect(addr) == -1)
		return -1;

	while (true)
	{
		ACE_UINT16 ret = ACE_Proactor::instance()->handle_events();
		ACE_LOG_MSG->log(LM_INFO, "handle_events return %d \r\n", ret);
	}

	return 0;
}
           
ACE Proactor 異步操作

異步操作實作架構:

ACE Proactor 異步操作

1、

所有具體的異步操作,都繼承于抽象的ACE_Asynch_Operation,并主要實作open方法。

都擁有一個相應的具體異步操作實作Impl成員,這些Impl都有共同的抽象叫ACE_Asynch_Operation_Impl。

2、

這些具體的異步操作類都跟ACE_Handler相關聯,在異步操作類open時,需要指定相關聯的ACE_Handler。

當發生相關操作時,比如

ACE_Asynch_Read_Stream發生異步read、

ACE_Asynch_Write_Stream發生異步write

操作完成時就會調用相關聯的handle_read_stream、handle_write_stream。

同理其它異步操作也一樣的道理。

這事件機制的實作由Proactor,進行路由和分派。

是以需要對Proactor的handle_events進行一直阻塞調用。

當每次handle_events成功傳回一次後,就會發生一次異步完成通知調用操作(即handle_xxx_xxx被調用)。

3、

在具體的異步操作實作中的open時,就将會發生了proactor對相關句柄的注冊。

例如可能會發生對socket句柄、檔案句柄等發生注冊綁定。這注冊過程已封裝在架構裡頭,不用操心。

當這些句柄上所發生的操作完成時,由proactor的循環事件處理并分發通知調用各個相關的handle_xxx_xxx。

Proactor是單例,為大夥共享的。誰都可用它。

但通常我們隻要用它來啟動事件循環即可。其它的句柄注冊綁定啊、事件的完成通知等,架構已經完成了。

啟動事件,在相應的事件處理那裡做我們要做的工作即可。

啟動事件循環可以直接proactor_run_event_loop,

或者自己寫循環,調用handle_events。

int
ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle,
                                     const void *completion_key)
{
  ULONG_PTR comp_key (reinterpret_cast<ULONG_PTR> (completion_key));


  // No locking is needed here as no state changes.
  ACE_HANDLE cp = ::CreateIoCompletionPort (handle,
                                            this->completion_port_,
                                            comp_key,
                                            this->number_of_threads_);
 // ...
}