天天看點

mooon-agent接收狀态機代碼摘要

<b>recv_machine.h</b>

點選(此處)折疊或打開

#ifndef MOOON_AGENT_RECV_MACHINE_H

#define MOOON_AGENT_RECV_MACHINE_H

#include agent/message.h&gt;

AGENT_NAMESPACE_BEGIN

class CAgentThread;

class CRecvMachine

{

private:

    /***

      * 接收狀态值

      */

    typedef enum recv_state_t

    {

        rs_header, /** 接收消息頭狀态 */

        rs_body /** 接收消息體狀态 */

    }TRecvState;

      * 接收狀态上下文

    struct RecvStateContext

        const char* buffer; /** 目前的資料buffer */

        size_t buffer_size; /** 目前的資料位元組數 */

        RecvStateContext(const char* buf=NULL, size_t buf_size=0)

         :buffer(buf)

         ,buffer_size(buf_size)

        {

        }

        RecvStateContext(const RecvStateContext&amp; other)

         :buffer(other.buffer)

         ,buffer_size(other.buffer_size)

        RecvStateContext&amp; operator =(const RecvStateContext&amp; other)

            buffer = other.buffer;

            buffer_size = other.buffer_size;

            return *this;

    };

public:

    CRecvMachine(CAgentThread* thread);

    util::handle_result_t work(const char* buffer, size_t buffer_size);

    void reset();

    void set_next_state(recv_state_t next_state)

        _recv_state = next_state;

        _finished_size = 0;

    }

    util::handle_result_t handle_header(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);

    util::handle_result_t handle_body(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);

    util::handle_result_t handle_error(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);

    CAgentThread* _thread; /** 需要通過CAgentThread取得CProcessorManager */

    agent_message_header_t _header; /** 消息頭,這個大小是固定的 */

    recv_state_t _recv_state; /** 目前的接收狀态 */

    size_t _finished_size; /** 目前狀态已經接收到的位元組數,注意不是總的已經接收到的位元組數,隻針對目前狀态 */

};

AGENT_NAMESPACE_END

#endif // MOOON_AGENT_RECV_MACHINE_H

<b>recv_machine.cpp</b>

#include "recv_machine.h"

#include "agent_thread.h"

CRecvMachine::CRecvMachine(CAgentThread* thread)

 :_thread(thread)

    set_next_state(rs_header);

}

// 狀态機入口函數

// 狀态機工作原理:-&gt; rs_header -&gt; rs_body -&gt; rs_header

// -&gt; rs_header -&gt; rs_error -&gt; rs_header

// -&gt; rs_header -&gt; rs_body -&gt; rs_error -&gt; rs_header

// 參數說明:

// buffer - 本次收到的資料,注意不是總的

// buffer_size - 本次收到的資料位元組數

util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)

    RecvStateContext next_ctx(buffer, buffer_size);

    util::handle_result_t hr = util::handle_continue;

    // 狀态機循環條件為:util::handle_continue == hr

    while (util::handle_continue == hr)

        RecvStateContext cur_ctx(next_ctx);

        switch (_recv_state)

        case rs_header:

            hr = handle_header(cur_ctx, &amp;next_ctx);

            break;

        case rs_body:

            hr = handle_body(cur_ctx, &amp;next_ctx);

        default:

            hr = handle_error(cur_ctx, &amp;next_ctx);

    return hr;

void CRecvMachine::reset()

// 處理消息頭部

// cur_ctx - 目前上下文,

// cur_ctx.buffer為目前收到的資料buffer,包含了消息頭,但也可能包含了消息體。

// cur_ctx.buffer_size為目前收到位元組數

// next_ctx - 下一步上下文,

// 由于cur_ctx.buffer可能包含了消息體,是以在一次接收receive動作後,

// 會涉及到消息頭和消息體兩個狀态,這裡的next_ctx實際為下一步handle_body的cur_ctx

util::handle_result_t CRecvMachine::handle_header(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)

    if (_finished_size + cur_ctx.buffer_size sizeof(agent_message_header_t))

        memcpy(reinterpret_castchar*&gt;(&amp;_header) + _finished_size

              ,cur_ctx.buffer

              ,cur_ctx.buffer_size);

        _finished_size += cur_ctx.buffer_size;

        return util::handle_continue;

    else

        size_t need_size = sizeof(agent_message_header_t) - _finished_size;

              ,need_size);

        // TODO: Check header here

        size_t remain_size = cur_ctx.buffer_size - need_size;

        if (remain_size &gt; 0)

            next_ctx-&gt;buffer = cur_ctx.buffer + need_size;

            next_ctx-&gt;buffer_size = cur_ctx.buffer_size - need_size;

        // 隻有當包含消息體時,才需要進行狀态切換,

        // 否則維持rs_header狀态不變

        if (_header.size &gt; 0)

            // 切換狀态

            set_next_state(rs_body);

        else

            CProcessorManager* processor_manager = _thread-&gt;get_processor_manager();

            if (!processor_manager-&gt;on_message(_header, 0, NULL, 0))

            {

                return util::handle_error;

            }

        return (remain_size &gt; 0)

              ? util::handle_continue // 控制work過程是否繼續循環

              : util::handle_finish;

// 處理消息體

// cur_ctx.buffer為目前收到的資料buffer,包含了消息體,但也可能包含了消息頭。

// 由于cur_ctx.buffer可能包含了消息頭,是以在一次接收receive動作後,

// 會涉及到消息頭和消息體兩個狀态,這裡的next_ctx實際為下一步handle_header的cur_ctx

util::handle_result_t CRecvMachine::handle_body(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)

    CProcessorManager* processor_manager = _thread-&gt;get_processor_manager();

    if (_finished_size + cur_ctx.buffer_size _header.size)

        if (!processor_manager-&gt;on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))

            return util::handle_error;

        size_t need_size = _header.size - _finished_size;

        if (!processor_manager-&gt;on_message(_header, _finished_size, cur_ctx.buffer, need_size))

        // 切換狀态

        set_next_state(rs_header);

            return util::handle_continue;

        return util::handle_finish;

util::handle_result_t CRecvMachine::handle_error(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)

    //AGENT_LOG_ERROR("Network error.\n");

    set_next_state(rs_header); // 無條件切換到rs_header,這個時候應當斷開連接配接重連接配接

    return util::handle_error;

繼續閱讀