天天看點

mooon-agent接收狀态機代碼摘要

<b>recv_machine.h</b>

#ifndef MOOON_AGENT_RECV_MACHINE_H  

#define MOOON_AGENT_RECV_MACHINE_H  

#include &lt;agent/message.h&gt;  

AGENT_NAMESPACE_BEGIN  

class CAgentThread;  

class CRecvMachine  

{  

private:  

    /***  

      * 接收狀态值  

      */  

    typedef enum recv_state_t  

    {  

        rs_header, /** 接收消息頭狀态 */  

        rs_body /** 接收消息體狀态 */  

    }TRecvState;  

      * 接收狀态上下文  

    struct RecvStateContext  

        const char* buffer; /** 目前的資料buffer */  

        size_t buffer_size; /** 目前的資料位元組數 */  

        RecvStateContext(const char* buf=NULL, size_t buf_size=0)  

         :buffer(buf)  

         ,buffer_size(buf_size)  

        {  

        }  

        RecvStateContext(const RecvStateContext&amp; other)  

         :buffer(other.buffer)  

         ,buffer_size(other.buffer_size)  

        RecvStateContext&amp; operator =(const RecvStateContext&amp; other)  

            buffer = other.buffer;  

            buffer_size = other.buffer_size;  

            return *this;  

    };  

public:  

    CRecvMachine(CAgentThread* thread);  

    util::handle_result_t work(const char* buffer, size_t buffer_size);  

    void reset();  

    void set_next_state(recv_state_t next_state)  

        _recv_state = next_state;  

        _finished_size = 0;  

    }  

    util::handle_result_t handle_header(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);  

    util::handle_result_t handle_body(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);  

    util::handle_result_t handle_error(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);  

private:   

    CAgentThread* _thread; /** 需要通過CAgentThread取得CProcessorManager */  

    agent_message_header_t _header; /** 消息頭,這個大小是固定的 */  

    recv_state_t _recv_state; /** 目前的接收狀态 */  

    size_t _finished_size; /** 目前狀态已經接收到的位元組數,注意不是總的已經接收到的位元組數,隻針對目前狀态 */  

};  

AGENT_NAMESPACE_END  

#endif // MOOON_AGENT_RECV_MACHINE_H  

recv_machine.cpp

#include "recv_machine.h" 

#include "agent_thread.h" 

CRecvMachine::CRecvMachine(CAgentThread* thread)  

 :_thread(thread)  

    set_next_state(rs_header);  

}  

// 狀态機入口函數  

// 狀态機工作原理:-&gt; rs_header -&gt; rs_body -&gt; rs_header  

// -&gt; rs_header -&gt; rs_error -&gt; rs_header  

// -&gt; rs_header -&gt; rs_body -&gt; rs_error -&gt; rs_header  

// 參數說明:  

// buffer - 本次收到的資料,注意不是總的  

// buffer_size - 本次收到的資料位元組數  

util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)  

{   

    RecvStateContext next_ctx(buffer, buffer_size);   

    util::handle_result_t hr = util::handle_continue;  

    // 狀态機循環條件為:util::handle_continue == hr  

    while (util::handle_continue == hr)  

    {   

        RecvStateContext cur_ctx(next_ctx);  

        switch (_recv_state)  

        case rs_header:  

            hr = handle_header(cur_ctx, &amp;next_ctx);  

            break;  

        case rs_body:  

            hr = handle_body(cur_ctx, &amp;next_ctx);  

        default:  

            hr = handle_error(cur_ctx, &amp;next_ctx);  

    return hr;  

void CRecvMachine::reset()  

// 處理消息頭部  

// cur_ctx - 目前上下文,  

// cur_ctx.buffer為目前收到的資料buffer,包含了消息頭,但也可能包含了消息體。  

// cur_ctx.buffer_size為目前收到位元組數  

// next_ctx - 下一步上下文,  

// 由于cur_ctx.buffer可能包含了消息體,是以在一次接收receive動作後,  

// 會涉及到消息頭和消息體兩個狀态,這裡的next_ctx實際為下一步handle_body的cur_ctx  

util::handle_result_t CRecvMachine::handle_header(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)  

    if (_finished_size + cur_ctx.buffer_size &lt; sizeof(agent_message_header_t))  

        memcpy(reinterpret_cast&lt;char*&gt;(&amp;_header) + _finished_size  

              ,cur_ctx.buffer  

              ,cur_ctx.buffer_size);  

        _finished_size += cur_ctx.buffer_size;  

        return util::handle_continue;  

    else 

        size_t need_size = sizeof(agent_message_header_t) - _finished_size;  

              ,need_size);  

        // TODO: Check header here  

        size_t remain_size = cur_ctx.buffer_size - need_size;  

        if (remain_size &gt; 0)  

            next_ctx-&gt;buffer = cur_ctx.buffer + need_size;  

            next_ctx-&gt;buffer_size = cur_ctx.buffer_size - need_size;  

        // 隻有當包含消息體時,才需要進行狀态切換,  

        // 否則維持rs_header狀态不變  

        if (_header.size &gt; 0)  

            // 切換狀态  

            set_next_state(rs_body);  

        else 

        {   

            CProcessorManager* processor_manager = _thread-&gt;get_processor_manager();   

            if (!processor_manager-&gt;on_message(_header, 0, NULL, 0))  

            {  

                return util::handle_error;  

            }  

        return (remain_size &gt; 0)  

              ? util::handle_continue // 控制work過程是否繼續循環  

              : util::handle_finish;  

// 處理消息體  

// cur_ctx.buffer為目前收到的資料buffer,包含了消息體,但也可能包含了消息頭。  

// 由于cur_ctx.buffer可能包含了消息頭,是以在一次接收receive動作後,  

// 會涉及到消息頭和消息體兩個狀态,這裡的next_ctx實際為下一步handle_header的cur_ctx  

util::handle_result_t CRecvMachine::handle_body(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)  

    CProcessorManager* processor_manager = _thread-&gt;get_processor_manager();  

    if (_finished_size + cur_ctx.buffer_size &lt; _header.size)  

        if (!processor_manager-&gt;on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))  

            return util::handle_error;  

        size_t need_size = _header.size - _finished_size;  

        if (!processor_manager-&gt;on_message(_header, _finished_size, cur_ctx.buffer, need_size))  

        // 切換狀态  

        set_next_state(rs_header);  

            return util::handle_continue;  

        return util::handle_finish;   

util::handle_result_t CRecvMachine::handle_error(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)  

    //AGENT_LOG_ERROR("Network error.\n");  

    set_next_state(rs_header); // 無條件切換到rs_header,這個時候應當斷開連接配接重連接配接  

    return util::handle_error;  

    本文轉自eyjian 51CTO部落格,原文連結:http://blog.51cto.com/mooon/910302,如需轉載請自行聯系原作者

繼續閱讀