MOOON-agent系統設計與使用說明
易劍 2012/6/16
目錄
<a href="#_Toc27785%20" target="_blank">1. 設計目标 1</a>
<a href="#_Toc6200%20" target="_blank">2. 應用場景 2</a>
<a href="#_Toc13878%20" target="_blank">3. 主要功能 2</a>
<a href="#_Toc15879%20" target="_blank">4. 系統骨架 3</a>
<a href="#_Toc29285%20" target="_blank">5. 資源接口 3</a>
<a href="#_Toc24137%20" target="_blank">6. 内置CommandProcessor 3</a>
<a href="#_Toc31314%20" target="_blank">7. 程式設計接口 3</a>
<a href="#_Toc22286%20" target="_blank">7.1. agent.h 4</a>
<a href="#_Toc32391%20" target="_blank">7.2. message.h 5</a>
<a href="#_Toc7812%20" target="_blank">7.3. message_command.h 5</a>
<a href="#_Toc19715%20" target="_blank">7.4. command_processor.h 6</a>
<a href="#_Toc24851%20" target="_blank">8. 程式設計示例 7</a>
一個通用的agent架構,提供程式設計接口,并内置通用的功能。

1) 自動上報心跳
2) 支援以域名方式指定center或者IP清單方式指定center
3) 與center斷開後自動重連接配接
4) 支援多種重連接配接center政策,如輪詢方式
5) 自動上報資料到center
6) 可選多種心跳方式,簡單心跳不附帶資料,富心跳可攜帶系統狀态資料,如CPU使用率、記憶體使用情況等
7) 提供擷取CPU使用率、記憶體使用情況和流量等接口
8) 内置配置等CommandProessor,常用需求不用程式設計直接使用
9) 非單例,單個程序可建立多個agent執行個體
暫略。
除宏外,是以内容均位于agent名字空間内。
/***
* 常量定義
*/
enum
{
REPORT_MAX = 10240 /** 一次report的最大位元組數 */
};
<b></b>
<b>class IAgent</b>
<b>{</b>
<b>public:</b>
<b> virtual ~IAgent() {}</b>
<b> virtual void set_center(const std::string& domainname_or_iplist, uint16_t port) = 0; </b>
<b> /***</b>
<b> * 上報資料給center,report調用隻是将資料存放上報隊列中,由agent異步上報</b>
<b> * @data 需要上報的資料</b>
<b> * @data_size 需要上報的資料位元組數</b>
<b> * @timeout_millisecond 逾時毫秒數,</b>
<b> * 當隊列滿時,如果逾時毫秒數為0,則直接傳回,資料不會被放入上報隊列中;</b>
<b> * 當隊列滿時,如果timeout_millisecond不為0,則等待指定的時長,如果在指定的時長内,</b>
<b> * 上報隊列一直是滿的,則傳回,并且資料不會被放入上報隊列中</b>
<b> */</b>
<b> virtual bool report(const char* data, size_t data_size, uint32_t timeout_millisecond=0) = 0;</b>
<b> virtual bool report(uint32_t timeout_millisecond, const char* format, ...) = 0;</b>
<b> virtual bool register_command_processor(ICommandProcessor* processor) = 0;</b>
<b> virtual void deregister_command_processor(ICommandProcessor* processor) = 0;</b>
<b>};</b>
* 日志器,是以分發器執行個體共享
* 如需要記錄日志,則在調用create之前,應當先設定好日志器
extern sys::ILogger* logger;
* 用來建立agent執行個體,注意agent不是單例,允許一個程序内有多個執行個體
* @queue_size 上報隊列大小,如果隊列滿,會導緻消息丢失或report調用阻塞
* @connect_timeout_milliseconds 與center連接配接的逾時毫秒數,如果在這個時間内沒有資料上報,
* 則會自動發送心跳消息,否則不會發送心跳消息
extern IAgent* create(uint32_t queue_size, uint32_t connect_timeout_milliseconds);
/** 銷毀一個agent執行個體 */
extern void destroy(IAgent* agent);
#pragma pack(4) // 網絡消息按4位元組對齊
* Agent消息頭
typedef struct TAgentMessageHeader
NUInt32 size; /** 消息包位元組數 */
NUInt32 command; /** 消息的指令字 */
}agent_message_header_t;
* 簡單的心跳消息,僅一個消息頭
typedef struct TSimpleHeartbeatMessage
agent_message_header_t header;
}simple_heartbeat_message_t;
* 上報消息
typedef struct TReportMessage
char data[0]; /** 需要上報的内容 */
}report_message_t;
#pragma pack()
* 上行消息指令字
typedef enum TUplinkMessageCommand
U_SIMPLE_HEARTBEAT_MESSAGE = 1, /** 簡單心跳消息 */
U_REPORT_MESSAGE = 2 /** 上報消息 */
}uplink_message_command_t;
* 下行消息指令字,由ICommandProcessor處理
typedef enum TDownlinkMessageCommand
}downlink_message_command_t;
* 消息上下文結構
* 由于是異步接收消息的,是以需要一個上下文結構來儲存最新狀态
typedef struct TMessageContext
size_t total_size; /** 消息體的位元組數 */
size_t finished_size; /** 已經收到的消息體位元組數 */
TMessageContext(size_t total_size_, size_t finished_size_)
:total_size(total_size_)
,finished_size(finished_size_)
{
}
}message_context_t;
class ICommandProcessor
{
public:
virtual ~ICommandProcessor() {}
/***
* 傳回該CommandProcessor處理的指令字
*/
virtual uint32_t get_command() const = 0;
* 有消息需要處理時的回調函數
* 請注意消息的接收是異步的,每收到一點消息資料,都會回調on_message
* 整個消息包接收完成的條件是msg_ctx.total_size和msg_ctx.finished_size+buffer_size兩者相等
* @buffer 目前收到的消息體資料
* @buffer_size 目前收到的消息體資料位元組數
* @return 如果消息處理成功,則傳回true,否則傳回false,當傳回false時,會導緻連接配接被斷開進行重連接配接
virtual bool on_message(const TMessageContext& msg_ctx, const char* buffer, size_t buffer_size) = 0;
// 指令字1的CommandProcessor
class CCommandProcessor1: public ICommandProcessor
private:
virtual uint32_t get_command() const
return 1;
virtual bool on_message(const TMessageContext& msg_ctx, const char* buffer, size_t buffer_size)
fprintf(stdout, "[%zu:%zu] %.*s\n", msg_ctx.total_size, msg_ctx.finished_size, (int)buffer_size, buffer);
return true;
// 指令字2的CommandProcessor
class CCommandProcessor2: public CCommandProcessor1
return 2;
// 指令字3的CommandProcessor
class CCommandProcessor3: public CCommandProcessor1
return 3;
class CMainHelper: public sys::IMainHelper
CMainHelper()
:_agent(NULL)
virtual bool init(int argc, char* argv[])
uint32_t queue_size = 100;
uint32_t connect_timeout_milliseconds = 2000;
_agent = agent::create(queue_size, connect_timeout_milliseconds);
if (NULL == _agent)
{
return false;
}
_agent->register_command_processor(&_command_processor1);
_agent->register_command_processor(&_command_processor2);
_agent->register_command_processor(&_command_processor3);
_agent->set_center(ArgsParser::center_ip->get_value(),
ArgsParser::center_port->get_value());
std::string report("test");
while (true)
sys::CUtil::millisleep(3000);
_agent->report(report.data(), report.size());
virtual void fini()
agent::destroy(_agent);
_agent = NULL;
virtual int get_exit_signal() const
return SIGTERM;
}
agent::IAgent* _agent;
CCommandProcessor1 _command_processor1;
CCommandProcessor2 _command_processor2;
CCommandProcessor3 _command_processor3;
// 入口函數
extern "C" int main(int argc, char* argv[])
if (!ArgsParser::parse(argc, argv))
fprintf(stderr, "Args error: %s.\n", ArgsParser::g_error_message.c_str());
exit(1);
CMainHelper main_helper;
return sys::main_template(&main_helper, argc, argv);