天天看点

Thrift辅助类,用于简化Thrift编程

CThriftServerHelper用于服务端,CThriftClientHelper用于客户端。

IDL定义:

service PackageManagerService

{

}

服务端使用示例:

CThriftServerHelper _thrift_server_helper;

return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads);

客户端使用示例:

CThriftClientHelper thrift_client_helper(FLAGS_package_ip, FLAGS_package_port);

thrift_client_helper.connect(); // 注意需要处理异常TTransportException/TApplicationException/TException

#ifndef MOOON_NET_THRIFT_HELPER_H

#define MOOON_NET_THRIFT_HELPER_H

#include mooon/net/config.h>

#include mooon/sys/log.h>

#include mooon/utils/string_utils.h>

#include mooon/utils/scoped_ptr.h>

#include arpa/inet.h>

#include boost/scoped_ptr.hpp>

#include thrift/concurrency/PosixThreadFactory.h>

#include thrift/concurrency/ThreadManager.h>

#include thrift/protocol/TBinaryProtocol.h>

#include thrift/server/TNonblockingServer.h>

#include thrift/transport/TSocketPool.h>

#include thrift/transport/TTransportException.h>

#include vector>

NET_NAMESPACE_BEGIN

// 用来判断thrift是否已经连接,包括两种情况:

// 1.从未连接过,也就是还未打开过连接

// 2.连接被对端关闭了

inline bool thrift_not_connected(

        apache::thrift::transport::TTransportException::TTransportExceptionType type)

    return (apache::thrift::transport::TTransportException::NOT_OPEN == type)

        || (apache::thrift::transport::TTransportException::END_OF_FILE == type);

        apache::thrift::transport::TTransportException& ex)

    apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();

    return thrift_not_connected(type);

// thrift客户端辅助类

//

// 使用示例:

// mooon::net::CThriftClientHelperExampleServiceClient> client(rpc_server_ip, rpc_server_port);

// try

// {

// client.connect();

// client->foo();

// }

// catch (apache::thrift::transport::TTransportException& ex)

// MYLOG_ERROR("thrift exception: (%d)%s\n", ex.getType(), ex.what());

// catch (apache::thrift::transport::TApplicationException& ex)

// MYLOG_ERROR("thrift exception: %s\n", ex.what());

// catch (apache::thrift::TException& ex)

// Transport除默认的TFramedTransport (TBufferTransports.h),还可选择:

// TBufferedTransport (TBufferTransports.h)

// THttpTransport

// TZlibTransport

// TFDTransport (TSimpleFileTransport)

// Protocol除默认的apache::thrift::protocol::TBinaryProtocol,还可选择:

// TCompactProtocol

// TJSONProtocol

// TDebugProtocol

template class ThriftClient,

          class Protocol=apache::thrift::protocol::TBinaryProtocol,

          class Transport=apache::thrift::transport::TFramedTransport>

class CThriftClientHelper

public:

    // host thrift服务端的IP地址

    // port thrift服务端的端口号

    // connect_timeout_milliseconds 连接thrift服务端的超时毫秒数

    // receive_timeout_milliseconds 接收thrift服务端发过来的数据的超时毫秒数

    // send_timeout_milliseconds 向thrift服务端发送数据时的超时毫秒数

    CThriftClientHelper(const std::string &host, uint16_t port,

                        int connect_timeout_milliseconds=2000,

                        int receive_timeout_milliseconds=2000,

                        int send_timeout_milliseconds=2000);

    // 支持指定多个servers,运行时随机选择一个,当一个异常时自动选择其它

    // num_retries 重试次数

    // retry_interval 重试间隔,单位为秒

    // max_consecutive_failures 单个Server最大连续失败次数

    // randomize_ 是否随机选择一个Server

    // always_try_last 是否总是重试最后一个Server

    CThriftClientHelper(const std::vectorstd::pairstd::string, int> >& servers,

                        int send_timeout_milliseconds=2000,

                        int num_retries=1, int retry_interval=60,

                        int max_consecutive_failures=1,

                        bool randomize=true, bool always_try_last=true

                        );

    ~CThriftClientHelper();

    // 连接thrift服务端

    //

    // 出错时,可抛出以下几个thrift异常:

    // apache::thrift::transport::TTransportException

    // apache::thrift::TApplicationException

    // apache::thrift::TException

    void connect();

    bool is_connected() const;

    // 断开与thrift服务端的连接

    void close();

    apache::thrift::transport::TSocket* get_socket() { return _socket.get(); }

    const apache::thrift::transport::TSocket get_socket() const { return _socket.get(); }

    ThriftClient* get() { return _client.get(); }

    ThriftClient* get() const { return _client.get(); }

    ThriftClient* operator ->() { return get(); }

    ThriftClient* operator ->() const { return get(); }

    // 取thrift服务端的IP地址

    const std::string& get_host() const;

    // 取thrift服务端的端口号

    uint16_t get_port() const;

    // 返回可读的标识,常用于记录日志

    std::string str() const

    {

        return utils::CStringUtils::format_string("thrift://%s:%u", get_host().c_str(), get_port());

    }

private:

    void init();

    int _connect_timeout_milliseconds;

    int _receive_timeout_milliseconds;

    int _send_timeout_milliseconds;

    // TSocket只支持一个server,而TSocketPool是TSocket的子类支持指定多个server,运行时随机选择一个

    boost::shared_ptrapache::thrift::transport::TSocket> _socket;

    boost::shared_ptrapache::thrift::transport::TTransport> _transport;

    boost::shared_ptrapache::thrift::protocol::TProtocol> _protocol;

    boost::shared_ptrThriftClient> _client;

};

////////////////////////////////////////////////////////////////////////////////

// thrift服务端辅助类

// mooon::net::CThriftServerHelperCExampleHandler, ExampleServiceProcessor> _thrift_server;

// _thrift_server.serve(listen_port);

// ProtocolFactory除了默认的TBinaryProtocolFactory,还可选择:

// TCompactProtocolFactory

// TJSONProtocolFactory

// TDebugProtocolFactory

// 只支持TNonblockingServer一种Server

template class ThriftHandler,

          class ServiceProcessor,

          class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory>

class CThriftServerHelper

    // 启动rpc服务,请注意该调用是同步阻塞的,所以需放最后调用

    // port thrift服务端的监听端口号

    // num_threads thrift服务端开启的线程数

    // 参数num_io_threads,只有当Server为TNonblockingServer才有效

    void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);

    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);

    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached);

    // 要求ThriftHandler类有方法attach(void*)

    void serve(uint16_t port, void* attached, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);

    void stop();

    ThriftHandler* get()

        return _handler.get();

    ThriftHandler* get() const

    void init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads);

    boost::shared_ptrThriftHandler> _handler;

    boost::shared_ptrapache::thrift::TProcessor> _processor;

    boost::shared_ptrapache::thrift::protocol::TProtocolFactory> _protocol_factory;

    boost::shared_ptrapache::thrift::server::ThreadManager> _thread_manager;

    boost::shared_ptrapache::thrift::concurrency::PosixThreadFactory> _thread_factory;

    boost::shared_ptrapache::thrift::server::TServer> _server;

// 被thrift回调的写日志函数,由set_thrift_debug_log_function()调用它

inline void write_thrift_debug_log(const char* log)

    MYLOG_DEBUG("%s", log);

inline void write_thrift_info_log(const char* log)

    MYLOG_INFO("%s", log);

inline void write_thrift_error_log(const char* log)

    MYLOG_ERROR("%s", log);

// 将thrift输出写入到日志文件中

inline void set_thrift_debug_log_function()

    if (::mooon::sys::g_logger != NULL)

        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_debug_log);

inline void set_thrift_info_log_function()

        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_info_log);

inline void set_thrift_error_log_function()

        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_error_log);

template class ThriftClient, class Protocol, class Transport>

CThriftClientHelperThriftClient, Protocol, Transport>::CThriftClientHelper(

        const std::string &host, uint16_t port,

        int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)

        : _connect_timeout_milliseconds(connect_timeout_milliseconds),

          _receive_timeout_milliseconds(receive_timeout_milliseconds),

          _send_timeout_milliseconds(send_timeout_milliseconds)

    set_thrift_debug_log_function();

    _socket.reset(new apache::thrift::transport::TSocket(host, port));

    init();

        const std::vectorstd::pairstd::string, int> >& servers,

        int connect_timeout_milliseconds,

        int receive_timeout_milliseconds,

        int send_timeout_milliseconds,

        int num_retries, int retry_interval,

        int max_consecutive_failures,

        bool randomize, bool always_try_last)

    apache::thrift::transport::TSocketPool* socket_pool = new apache::thrift::transport::TSocketPool(servers);

    socket_pool->setNumRetries(num_retries);

    socket_pool->setRetryInterval(retry_interval);

    socket_pool->setMaxConsecutiveFailures(max_consecutive_failures);

    socket_pool->setRandomize(randomize);

    socket_pool->setAlwaysTryLast(always_try_last);

    _socket.reset(socket_pool);

void CThriftClientHelperThriftClient, Protocol, Transport>::init()

    _socket->setConnTimeout(_connect_timeout_milliseconds);

    _socket->setRecvTimeout(_receive_timeout_milliseconds);

    _socket->setSendTimeout(_send_timeout_milliseconds);

    // Transport默认为apache::thrift::transport::TFramedTransport

    _transport.reset(new Transport(_socket));

    // Protocol默认为apache::thrift::protocol::TBinaryProtocol

    _protocol.reset(new Protocol(_transport));

    // 服务端的Client

    _client.reset(new ThriftClient(_protocol));

CThriftClientHelperThriftClient, Protocol, Transport>::~CThriftClientHelper()

    close();

void CThriftClientHelperThriftClient, Protocol, Transport>::connect()

    if (!_transport->isOpen())

        // 如果Transport为TFramedTransport,则实际调用:TFramedTransport::open -> TSocketPool::open

        _transport->open();

        // 当"TSocketPool::open: all connections failed"时,

        // TSocketPool::open就抛出异常TTransportException,异常类型为TTransportException::NOT_OPEN

bool CThriftClientHelperThriftClient, Protocol, Transport>::is_connected() const

    return _transport->isOpen();

void CThriftClientHelperThriftClient, Protocol, Transport>::close()

    if (_transport->isOpen())

        _transport->close();

const std::string& CThriftClientHelperThriftClient, Protocol, Transport>::get_host() const

    return _socket->getHost();

uint16_t CThriftClientHelperThriftClient, Protocol, Transport>::get_port() const

    return static_castuint16_t>(_socket->getPort());

template class ThriftHandler, class ServiceProcessor, class ProtocolFactory>

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

    serve("0.0.0.0", port, num_worker_threads, num_io_threads);

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

    init("0.0.0.0", port, num_worker_threads, num_io_threads);

    // 这里也可直接调用serve(),但推荐run()

    // !!!注意调用run()的进程或线程会被阻塞

    _server->run();

    _thread_manager->join();

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached)

    init(ip, port, num_worker_threads, num_io_threads);

    // 关联

    if (attached != NULL)

        _handler->attach(attached);

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, void* attached, uint8_t num_worker_threads, uint8_t num_io_threads)

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::stop()

    _server->stop();

    _thread_manager->stop();

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

    _handler.reset(new ThriftHandler);

    _processor.reset(new ServiceProcessor(_handler));

    // ProtocolFactory默认为apache::thrift::protocol::TBinaryProtocolFactory

    _protocol_factory.reset(new ProtocolFactory());

    _thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);

    _thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());

    _thread_manager->threadFactory(_thread_factory);

    _thread_manager->start();

    apache::thrift::server::TNonblockingServer* server = new apache::thrift::server::TNonblockingServer(_processor, _protocol_factory, port, _thread_manager);

    server->setNumIOThreads(num_io_threads);

    _server.reset(server);

    // 不要调用_server->run(),交给serve()来调用,

    // 因为一旦调用了run()后,调用线程或进程就被阻塞了。

NET_NAMESPACE_END

#endif // MOOON_NET_THRIFT_HELPER_H