天天看點

Thrift輔助類,用于簡化Thrift程式設計

CThriftServerHelper用于服務端,CThriftClientHelper用于用戶端。

IDL定義:

service PackageManagerService

{

}

服務端使用示例:

CThriftServerHelper _thrift_server_helper;

return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads);

用戶端使用示例:

CThriftClientHelper thrift_client_helper(FLAGS_package_ip, FLAGS_package_port);

thrift_client_helper.connect(); // 注意需要處理異常TTransportException/TApplicationException/TException

#ifndef MOOON_NET_THRIFT_HELPER_H

#define MOOON_NET_THRIFT_HELPER_H

#include mooon/net/config.h>

#include mooon/sys/log.h>

#include mooon/utils/string_utils.h>

#include mooon/utils/scoped_ptr.h>

#include arpa/inet.h>

#include boost/scoped_ptr.hpp>

#include thrift/concurrency/PosixThreadFactory.h>

#include thrift/concurrency/ThreadManager.h>

#include thrift/protocol/TBinaryProtocol.h>

#include thrift/server/TNonblockingServer.h>

#include thrift/transport/TSocketPool.h>

#include thrift/transport/TTransportException.h>

#include vector>

NET_NAMESPACE_BEGIN

// 用來判斷thrift是否已經連接配接,包括兩種情況:

// 1.從未連接配接過,也就是還未打開過連接配接

// 2.連接配接被對端關閉了

inline bool thrift_not_connected(

        apache::thrift::transport::TTransportException::TTransportExceptionType type)

    return (apache::thrift::transport::TTransportException::NOT_OPEN == type)

        || (apache::thrift::transport::TTransportException::END_OF_FILE == type);

        apache::thrift::transport::TTransportException& ex)

    apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();

    return thrift_not_connected(type);

// thrift用戶端輔助類

//

// 使用示例:

// mooon::net::CThriftClientHelperExampleServiceClient> client(rpc_server_ip, rpc_server_port);

// try

// {

// client.connect();

// client->foo();

// }

// catch (apache::thrift::transport::TTransportException& ex)

// MYLOG_ERROR("thrift exception: (%d)%s\n", ex.getType(), ex.what());

// catch (apache::thrift::transport::TApplicationException& ex)

// MYLOG_ERROR("thrift exception: %s\n", ex.what());

// catch (apache::thrift::TException& ex)

// Transport除預設的TFramedTransport (TBufferTransports.h),還可選擇:

// TBufferedTransport (TBufferTransports.h)

// THttpTransport

// TZlibTransport

// TFDTransport (TSimpleFileTransport)

// Protocol除預設的apache::thrift::protocol::TBinaryProtocol,還可選擇:

// TCompactProtocol

// TJSONProtocol

// TDebugProtocol

template class ThriftClient,

          class Protocol=apache::thrift::protocol::TBinaryProtocol,

          class Transport=apache::thrift::transport::TFramedTransport>

class CThriftClientHelper

public:

    // host thrift服務端的IP位址

    // port thrift服務端的端口号

    // connect_timeout_milliseconds 連接配接thrift服務端的逾時毫秒數

    // receive_timeout_milliseconds 接收thrift服務端發過來的資料的逾時毫秒數

    // send_timeout_milliseconds 向thrift服務端發送資料時的逾時毫秒數

    CThriftClientHelper(const std::string &host, uint16_t port,

                        int connect_timeout_milliseconds=2000,

                        int receive_timeout_milliseconds=2000,

                        int send_timeout_milliseconds=2000);

    // 支援指定多個servers,運作時随機選擇一個,當一個異常時自動選擇其它

    // num_retries 重試次數

    // retry_interval 重試間隔,機關為秒

    // max_consecutive_failures 單個Server最大連續失敗次數

    // randomize_ 是否随機選擇一個Server

    // always_try_last 是否總是重試最後一個Server

    CThriftClientHelper(const std::vectorstd::pairstd::string, int> >& servers,

                        int send_timeout_milliseconds=2000,

                        int num_retries=1, int retry_interval=60,

                        int max_consecutive_failures=1,

                        bool randomize=true, bool always_try_last=true

                        );

    ~CThriftClientHelper();

    // 連接配接thrift服務端

    //

    // 出錯時,可抛出以下幾個thrift異常:

    // apache::thrift::transport::TTransportException

    // apache::thrift::TApplicationException

    // apache::thrift::TException

    void connect();

    bool is_connected() const;

    // 斷開與thrift服務端的連接配接

    void close();

    apache::thrift::transport::TSocket* get_socket() { return _socket.get(); }

    const apache::thrift::transport::TSocket get_socket() const { return _socket.get(); }

    ThriftClient* get() { return _client.get(); }

    ThriftClient* get() const { return _client.get(); }

    ThriftClient* operator ->() { return get(); }

    ThriftClient* operator ->() const { return get(); }

    // 取thrift服務端的IP位址

    const std::string& get_host() const;

    // 取thrift服務端的端口号

    uint16_t get_port() const;

    // 傳回可讀的辨別,常用于記錄日志

    std::string str() const

    {

        return utils::CStringUtils::format_string("thrift://%s:%u", get_host().c_str(), get_port());

    }

private:

    void init();

    int _connect_timeout_milliseconds;

    int _receive_timeout_milliseconds;

    int _send_timeout_milliseconds;

    // TSocket隻支援一個server,而TSocketPool是TSocket的子類支援指定多個server,運作時随機選擇一個

    boost::shared_ptrapache::thrift::transport::TSocket> _socket;

    boost::shared_ptrapache::thrift::transport::TTransport> _transport;

    boost::shared_ptrapache::thrift::protocol::TProtocol> _protocol;

    boost::shared_ptrThriftClient> _client;

};

////////////////////////////////////////////////////////////////////////////////

// thrift服務端輔助類

// mooon::net::CThriftServerHelperCExampleHandler, ExampleServiceProcessor> _thrift_server;

// _thrift_server.serve(listen_port);

// ProtocolFactory除了預設的TBinaryProtocolFactory,還可選擇:

// TCompactProtocolFactory

// TJSONProtocolFactory

// TDebugProtocolFactory

// 隻支援TNonblockingServer一種Server

template class ThriftHandler,

          class ServiceProcessor,

          class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory>

class CThriftServerHelper

    // 啟動rpc服務,請注意該調用是同步阻塞的,是以需放最後調用

    // port thrift服務端的監聽端口号

    // num_threads thrift服務端開啟的線程數

    // 參數num_io_threads,隻有當Server為TNonblockingServer才有效

    void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);

    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);

    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached);

    // 要求ThriftHandler類有方法attach(void*)

    void serve(uint16_t port, void* attached, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);

    void stop();

    ThriftHandler* get()

        return _handler.get();

    ThriftHandler* get() const

    void init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads);

    boost::shared_ptrThriftHandler> _handler;

    boost::shared_ptrapache::thrift::TProcessor> _processor;

    boost::shared_ptrapache::thrift::protocol::TProtocolFactory> _protocol_factory;

    boost::shared_ptrapache::thrift::server::ThreadManager> _thread_manager;

    boost::shared_ptrapache::thrift::concurrency::PosixThreadFactory> _thread_factory;

    boost::shared_ptrapache::thrift::server::TServer> _server;

// 被thrift回調的寫日志函數,由set_thrift_debug_log_function()調用它

inline void write_thrift_debug_log(const char* log)

    MYLOG_DEBUG("%s", log);

inline void write_thrift_info_log(const char* log)

    MYLOG_INFO("%s", log);

inline void write_thrift_error_log(const char* log)

    MYLOG_ERROR("%s", log);

// 将thrift輸出寫入到日志檔案中

inline void set_thrift_debug_log_function()

    if (::mooon::sys::g_logger != NULL)

        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_debug_log);

inline void set_thrift_info_log_function()

        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_info_log);

inline void set_thrift_error_log_function()

        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_error_log);

template class ThriftClient, class Protocol, class Transport>

CThriftClientHelperThriftClient, Protocol, Transport>::CThriftClientHelper(

        const std::string &host, uint16_t port,

        int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)

        : _connect_timeout_milliseconds(connect_timeout_milliseconds),

          _receive_timeout_milliseconds(receive_timeout_milliseconds),

          _send_timeout_milliseconds(send_timeout_milliseconds)

    set_thrift_debug_log_function();

    _socket.reset(new apache::thrift::transport::TSocket(host, port));

    init();

        const std::vectorstd::pairstd::string, int> >& servers,

        int connect_timeout_milliseconds,

        int receive_timeout_milliseconds,

        int send_timeout_milliseconds,

        int num_retries, int retry_interval,

        int max_consecutive_failures,

        bool randomize, bool always_try_last)

    apache::thrift::transport::TSocketPool* socket_pool = new apache::thrift::transport::TSocketPool(servers);

    socket_pool->setNumRetries(num_retries);

    socket_pool->setRetryInterval(retry_interval);

    socket_pool->setMaxConsecutiveFailures(max_consecutive_failures);

    socket_pool->setRandomize(randomize);

    socket_pool->setAlwaysTryLast(always_try_last);

    _socket.reset(socket_pool);

void CThriftClientHelperThriftClient, Protocol, Transport>::init()

    _socket->setConnTimeout(_connect_timeout_milliseconds);

    _socket->setRecvTimeout(_receive_timeout_milliseconds);

    _socket->setSendTimeout(_send_timeout_milliseconds);

    // Transport預設為apache::thrift::transport::TFramedTransport

    _transport.reset(new Transport(_socket));

    // Protocol預設為apache::thrift::protocol::TBinaryProtocol

    _protocol.reset(new Protocol(_transport));

    // 服務端的Client

    _client.reset(new ThriftClient(_protocol));

CThriftClientHelperThriftClient, Protocol, Transport>::~CThriftClientHelper()

    close();

void CThriftClientHelperThriftClient, Protocol, Transport>::connect()

    if (!_transport->isOpen())

        // 如果Transport為TFramedTransport,則實際調用:TFramedTransport::open -> TSocketPool::open

        _transport->open();

        // 當"TSocketPool::open: all connections failed"時,

        // TSocketPool::open就抛出異常TTransportException,異常類型為TTransportException::NOT_OPEN

bool CThriftClientHelperThriftClient, Protocol, Transport>::is_connected() const

    return _transport->isOpen();

void CThriftClientHelperThriftClient, Protocol, Transport>::close()

    if (_transport->isOpen())

        _transport->close();

const std::string& CThriftClientHelperThriftClient, Protocol, Transport>::get_host() const

    return _socket->getHost();

uint16_t CThriftClientHelperThriftClient, Protocol, Transport>::get_port() const

    return static_castuint16_t>(_socket->getPort());

template class ThriftHandler, class ServiceProcessor, class ProtocolFactory>

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

    serve("0.0.0.0", port, num_worker_threads, num_io_threads);

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

    init("0.0.0.0", port, num_worker_threads, num_io_threads);

    // 這裡也可直接調用serve(),但推薦run()

    // !!!注意調用run()的程序或線程會被阻塞

    _server->run();

    _thread_manager->join();

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached)

    init(ip, port, num_worker_threads, num_io_threads);

    // 關聯

    if (attached != NULL)

        _handler->attach(attached);

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, void* attached, uint8_t num_worker_threads, uint8_t num_io_threads)

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::stop()

    _server->stop();

    _thread_manager->stop();

void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

    _handler.reset(new ThriftHandler);

    _processor.reset(new ServiceProcessor(_handler));

    // ProtocolFactory預設為apache::thrift::protocol::TBinaryProtocolFactory

    _protocol_factory.reset(new ProtocolFactory());

    _thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);

    _thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());

    _thread_manager->threadFactory(_thread_factory);

    _thread_manager->start();

    apache::thrift::server::TNonblockingServer* server = new apache::thrift::server::TNonblockingServer(_processor, _protocol_factory, port, _thread_manager);

    server->setNumIOThreads(num_io_threads);

    _server.reset(server);

    // 不要調用_server->run(),交給serve()來調用,

    // 因為一旦調用了run()後,調用線程或程序就被阻塞了。

NET_NAMESPACE_END

#endif // MOOON_NET_THRIFT_HELPER_H