<b>recv_machine.h</b>
點選(此處)折疊或打開
#ifndef MOOON_AGENT_RECV_MACHINE_H
#define MOOON_AGENT_RECV_MACHINE_H
#include agent/message.h>
AGENT_NAMESPACE_BEGIN
class CAgentThread;
class CRecvMachine
{
private:
/***
* 接收狀态值
*/
typedef enum recv_state_t
{
rs_header, /** 接收消息頭狀态 */
rs_body /** 接收消息體狀态 */
}TRecvState;
* 接收狀态上下文
struct RecvStateContext
const char* buffer; /** 目前的資料buffer */
size_t buffer_size; /** 目前的資料位元組數 */
RecvStateContext(const char* buf=NULL, size_t buf_size=0)
:buffer(buf)
,buffer_size(buf_size)
{
}
RecvStateContext(const RecvStateContext& other)
:buffer(other.buffer)
,buffer_size(other.buffer_size)
RecvStateContext& operator =(const RecvStateContext& other)
buffer = other.buffer;
buffer_size = other.buffer_size;
return *this;
};
public:
CRecvMachine(CAgentThread* thread);
util::handle_result_t work(const char* buffer, size_t buffer_size);
void reset();
void set_next_state(recv_state_t next_state)
_recv_state = next_state;
_finished_size = 0;
}
util::handle_result_t handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
util::handle_result_t handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
util::handle_result_t handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
CAgentThread* _thread; /** 需要通過CAgentThread取得CProcessorManager */
agent_message_header_t _header; /** 消息頭,這個大小是固定的 */
recv_state_t _recv_state; /** 目前的接收狀态 */
size_t _finished_size; /** 目前狀态已經接收到的位元組數,注意不是總的已經接收到的位元組數,隻針對目前狀态 */
};
AGENT_NAMESPACE_END
#endif // MOOON_AGENT_RECV_MACHINE_H
<b>recv_machine.cpp</b>
#include "recv_machine.h"
#include "agent_thread.h"
CRecvMachine::CRecvMachine(CAgentThread* thread)
:_thread(thread)
set_next_state(rs_header);
}
// 狀态機入口函數
// 狀态機工作原理:-> rs_header -> rs_body -> rs_header
// -> rs_header -> rs_error -> rs_header
// -> rs_header -> rs_body -> rs_error -> rs_header
// 參數說明:
// buffer - 本次收到的資料,注意不是總的
// buffer_size - 本次收到的資料位元組數
util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)
RecvStateContext next_ctx(buffer, buffer_size);
util::handle_result_t hr = util::handle_continue;
// 狀态機循環條件為:util::handle_continue == hr
while (util::handle_continue == hr)
RecvStateContext cur_ctx(next_ctx);
switch (_recv_state)
case rs_header:
hr = handle_header(cur_ctx, &next_ctx);
break;
case rs_body:
hr = handle_body(cur_ctx, &next_ctx);
default:
hr = handle_error(cur_ctx, &next_ctx);
return hr;
void CRecvMachine::reset()
// 處理消息頭部
// cur_ctx - 目前上下文,
// cur_ctx.buffer為目前收到的資料buffer,包含了消息頭,但也可能包含了消息體。
// cur_ctx.buffer_size為目前收到位元組數
// next_ctx - 下一步上下文,
// 由于cur_ctx.buffer可能包含了消息體,是以在一次接收receive動作後,
// 會涉及到消息頭和消息體兩個狀态,這裡的next_ctx實際為下一步handle_body的cur_ctx
util::handle_result_t CRecvMachine::handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
if (_finished_size + cur_ctx.buffer_size sizeof(agent_message_header_t))
memcpy(reinterpret_castchar*>(&_header) + _finished_size
,cur_ctx.buffer
,cur_ctx.buffer_size);
_finished_size += cur_ctx.buffer_size;
return util::handle_continue;
else
size_t need_size = sizeof(agent_message_header_t) - _finished_size;
,need_size);
// TODO: Check header here
size_t remain_size = cur_ctx.buffer_size - need_size;
if (remain_size > 0)
next_ctx->buffer = cur_ctx.buffer + need_size;
next_ctx->buffer_size = cur_ctx.buffer_size - need_size;
// 隻有當包含消息體時,才需要進行狀态切換,
// 否則維持rs_header狀态不變
if (_header.size > 0)
// 切換狀态
set_next_state(rs_body);
else
CProcessorManager* processor_manager = _thread->get_processor_manager();
if (!processor_manager->on_message(_header, 0, NULL, 0))
{
return util::handle_error;
}
return (remain_size > 0)
? util::handle_continue // 控制work過程是否繼續循環
: util::handle_finish;
// 處理消息體
// cur_ctx.buffer為目前收到的資料buffer,包含了消息體,但也可能包含了消息頭。
// 由于cur_ctx.buffer可能包含了消息頭,是以在一次接收receive動作後,
// 會涉及到消息頭和消息體兩個狀态,這裡的next_ctx實際為下一步handle_header的cur_ctx
util::handle_result_t CRecvMachine::handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
CProcessorManager* processor_manager = _thread->get_processor_manager();
if (_finished_size + cur_ctx.buffer_size _header.size)
if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))
return util::handle_error;
size_t need_size = _header.size - _finished_size;
if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, need_size))
// 切換狀态
set_next_state(rs_header);
return util::handle_continue;
return util::handle_finish;
util::handle_result_t CRecvMachine::handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
//AGENT_LOG_ERROR("Network error.\n");
set_next_state(rs_header); // 無條件切換到rs_header,這個時候應當斷開連接配接重連接配接
return util::handle_error;