CThriftServerHelper用于服務端,CThriftClientHelper用于用戶端。
IDL定義:
service PackageManagerService
{
}
服務端使用示例:
CThriftServerHelper _thrift_server_helper;
return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads);
用戶端使用示例:
CThriftClientHelper thrift_client_helper(FLAGS_package_ip, FLAGS_package_port);
thrift_client_helper.connect(); // 注意需要處理異常TTransportException/TApplicationException/TException
#ifndef MOOON_NET_THRIFT_HELPER_H
#define MOOON_NET_THRIFT_HELPER_H
#include mooon/net/config.h>
#include mooon/sys/log.h>
#include mooon/utils/string_utils.h>
#include mooon/utils/scoped_ptr.h>
#include arpa/inet.h>
#include boost/scoped_ptr.hpp>
#include thrift/concurrency/PosixThreadFactory.h>
#include thrift/concurrency/ThreadManager.h>
#include thrift/protocol/TBinaryProtocol.h>
#include thrift/server/TNonblockingServer.h>
#include thrift/transport/TSocketPool.h>
#include thrift/transport/TTransportException.h>
#include vector>
NET_NAMESPACE_BEGIN
// 用來判斷thrift是否已經連接配接,包括兩種情況:
// 1.從未連接配接過,也就是還未打開過連接配接
// 2.連接配接被對端關閉了
inline bool thrift_not_connected(
apache::thrift::transport::TTransportException::TTransportExceptionType type)
return (apache::thrift::transport::TTransportException::NOT_OPEN == type)
|| (apache::thrift::transport::TTransportException::END_OF_FILE == type);
apache::thrift::transport::TTransportException& ex)
apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();
return thrift_not_connected(type);
// thrift用戶端輔助類
//
// 使用示例:
// mooon::net::CThriftClientHelperExampleServiceClient> client(rpc_server_ip, rpc_server_port);
// try
// {
// client.connect();
// client->foo();
// }
// catch (apache::thrift::transport::TTransportException& ex)
// MYLOG_ERROR("thrift exception: (%d)%s\n", ex.getType(), ex.what());
// catch (apache::thrift::transport::TApplicationException& ex)
// MYLOG_ERROR("thrift exception: %s\n", ex.what());
// catch (apache::thrift::TException& ex)
// Transport除預設的TFramedTransport (TBufferTransports.h),還可選擇:
// TBufferedTransport (TBufferTransports.h)
// THttpTransport
// TZlibTransport
// TFDTransport (TSimpleFileTransport)
// Protocol除預設的apache::thrift::protocol::TBinaryProtocol,還可選擇:
// TCompactProtocol
// TJSONProtocol
// TDebugProtocol
template class ThriftClient,
class Protocol=apache::thrift::protocol::TBinaryProtocol,
class Transport=apache::thrift::transport::TFramedTransport>
class CThriftClientHelper
public:
// host thrift服務端的IP位址
// port thrift服務端的端口号
// connect_timeout_milliseconds 連接配接thrift服務端的逾時毫秒數
// receive_timeout_milliseconds 接收thrift服務端發過來的資料的逾時毫秒數
// send_timeout_milliseconds 向thrift服務端發送資料時的逾時毫秒數
CThriftClientHelper(const std::string &host, uint16_t port,
int connect_timeout_milliseconds=2000,
int receive_timeout_milliseconds=2000,
int send_timeout_milliseconds=2000);
// 支援指定多個servers,運作時随機選擇一個,當一個異常時自動選擇其它
// num_retries 重試次數
// retry_interval 重試間隔,機關為秒
// max_consecutive_failures 單個Server最大連續失敗次數
// randomize_ 是否随機選擇一個Server
// always_try_last 是否總是重試最後一個Server
CThriftClientHelper(const std::vectorstd::pairstd::string, int> >& servers,
int send_timeout_milliseconds=2000,
int num_retries=1, int retry_interval=60,
int max_consecutive_failures=1,
bool randomize=true, bool always_try_last=true
);
~CThriftClientHelper();
// 連接配接thrift服務端
//
// 出錯時,可抛出以下幾個thrift異常:
// apache::thrift::transport::TTransportException
// apache::thrift::TApplicationException
// apache::thrift::TException
void connect();
bool is_connected() const;
// 斷開與thrift服務端的連接配接
void close();
apache::thrift::transport::TSocket* get_socket() { return _socket.get(); }
const apache::thrift::transport::TSocket get_socket() const { return _socket.get(); }
ThriftClient* get() { return _client.get(); }
ThriftClient* get() const { return _client.get(); }
ThriftClient* operator ->() { return get(); }
ThriftClient* operator ->() const { return get(); }
// 取thrift服務端的IP位址
const std::string& get_host() const;
// 取thrift服務端的端口号
uint16_t get_port() const;
// 傳回可讀的辨別,常用于記錄日志
std::string str() const
{
return utils::CStringUtils::format_string("thrift://%s:%u", get_host().c_str(), get_port());
}
private:
void init();
int _connect_timeout_milliseconds;
int _receive_timeout_milliseconds;
int _send_timeout_milliseconds;
// TSocket隻支援一個server,而TSocketPool是TSocket的子類支援指定多個server,運作時随機選擇一個
boost::shared_ptrapache::thrift::transport::TSocket> _socket;
boost::shared_ptrapache::thrift::transport::TTransport> _transport;
boost::shared_ptrapache::thrift::protocol::TProtocol> _protocol;
boost::shared_ptrThriftClient> _client;
};
////////////////////////////////////////////////////////////////////////////////
// thrift服務端輔助類
// mooon::net::CThriftServerHelperCExampleHandler, ExampleServiceProcessor> _thrift_server;
// _thrift_server.serve(listen_port);
// ProtocolFactory除了預設的TBinaryProtocolFactory,還可選擇:
// TCompactProtocolFactory
// TJSONProtocolFactory
// TDebugProtocolFactory
// 隻支援TNonblockingServer一種Server
template class ThriftHandler,
class ServiceProcessor,
class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory>
class CThriftServerHelper
// 啟動rpc服務,請注意該調用是同步阻塞的,是以需放最後調用
// port thrift服務端的監聽端口号
// num_threads thrift服務端開啟的線程數
// 參數num_io_threads,隻有當Server為TNonblockingServer才有效
void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);
void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached);
// 要求ThriftHandler類有方法attach(void*)
void serve(uint16_t port, void* attached, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
void stop();
ThriftHandler* get()
return _handler.get();
ThriftHandler* get() const
void init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads);
boost::shared_ptrThriftHandler> _handler;
boost::shared_ptrapache::thrift::TProcessor> _processor;
boost::shared_ptrapache::thrift::protocol::TProtocolFactory> _protocol_factory;
boost::shared_ptrapache::thrift::server::ThreadManager> _thread_manager;
boost::shared_ptrapache::thrift::concurrency::PosixThreadFactory> _thread_factory;
boost::shared_ptrapache::thrift::server::TServer> _server;
// 被thrift回調的寫日志函數,由set_thrift_debug_log_function()調用它
inline void write_thrift_debug_log(const char* log)
MYLOG_DEBUG("%s", log);
inline void write_thrift_info_log(const char* log)
MYLOG_INFO("%s", log);
inline void write_thrift_error_log(const char* log)
MYLOG_ERROR("%s", log);
// 将thrift輸出寫入到日志檔案中
inline void set_thrift_debug_log_function()
if (::mooon::sys::g_logger != NULL)
apache::thrift::GlobalOutput.setOutputFunction(write_thrift_debug_log);
inline void set_thrift_info_log_function()
apache::thrift::GlobalOutput.setOutputFunction(write_thrift_info_log);
inline void set_thrift_error_log_function()
apache::thrift::GlobalOutput.setOutputFunction(write_thrift_error_log);
template class ThriftClient, class Protocol, class Transport>
CThriftClientHelperThriftClient, Protocol, Transport>::CThriftClientHelper(
const std::string &host, uint16_t port,
int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)
: _connect_timeout_milliseconds(connect_timeout_milliseconds),
_receive_timeout_milliseconds(receive_timeout_milliseconds),
_send_timeout_milliseconds(send_timeout_milliseconds)
set_thrift_debug_log_function();
_socket.reset(new apache::thrift::transport::TSocket(host, port));
init();
const std::vectorstd::pairstd::string, int> >& servers,
int connect_timeout_milliseconds,
int receive_timeout_milliseconds,
int send_timeout_milliseconds,
int num_retries, int retry_interval,
int max_consecutive_failures,
bool randomize, bool always_try_last)
apache::thrift::transport::TSocketPool* socket_pool = new apache::thrift::transport::TSocketPool(servers);
socket_pool->setNumRetries(num_retries);
socket_pool->setRetryInterval(retry_interval);
socket_pool->setMaxConsecutiveFailures(max_consecutive_failures);
socket_pool->setRandomize(randomize);
socket_pool->setAlwaysTryLast(always_try_last);
_socket.reset(socket_pool);
void CThriftClientHelperThriftClient, Protocol, Transport>::init()
_socket->setConnTimeout(_connect_timeout_milliseconds);
_socket->setRecvTimeout(_receive_timeout_milliseconds);
_socket->setSendTimeout(_send_timeout_milliseconds);
// Transport預設為apache::thrift::transport::TFramedTransport
_transport.reset(new Transport(_socket));
// Protocol預設為apache::thrift::protocol::TBinaryProtocol
_protocol.reset(new Protocol(_transport));
// 服務端的Client
_client.reset(new ThriftClient(_protocol));
CThriftClientHelperThriftClient, Protocol, Transport>::~CThriftClientHelper()
close();
void CThriftClientHelperThriftClient, Protocol, Transport>::connect()
if (!_transport->isOpen())
// 如果Transport為TFramedTransport,則實際調用:TFramedTransport::open -> TSocketPool::open
_transport->open();
// 當"TSocketPool::open: all connections failed"時,
// TSocketPool::open就抛出異常TTransportException,異常類型為TTransportException::NOT_OPEN
bool CThriftClientHelperThriftClient, Protocol, Transport>::is_connected() const
return _transport->isOpen();
void CThriftClientHelperThriftClient, Protocol, Transport>::close()
if (_transport->isOpen())
_transport->close();
const std::string& CThriftClientHelperThriftClient, Protocol, Transport>::get_host() const
return _socket->getHost();
uint16_t CThriftClientHelperThriftClient, Protocol, Transport>::get_port() const
return static_castuint16_t>(_socket->getPort());
template class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
serve("0.0.0.0", port, num_worker_threads, num_io_threads);
void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
init("0.0.0.0", port, num_worker_threads, num_io_threads);
// 這裡也可直接調用serve(),但推薦run()
// !!!注意調用run()的程序或線程會被阻塞
_server->run();
_thread_manager->join();
void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached)
init(ip, port, num_worker_threads, num_io_threads);
// 關聯
if (attached != NULL)
_handler->attach(attached);
void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, void* attached, uint8_t num_worker_threads, uint8_t num_io_threads)
void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::stop()
_server->stop();
_thread_manager->stop();
void CThriftServerHelperThriftHandler, ServiceProcessor, ProtocolFactory>::init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
_handler.reset(new ThriftHandler);
_processor.reset(new ServiceProcessor(_handler));
// ProtocolFactory預設為apache::thrift::protocol::TBinaryProtocolFactory
_protocol_factory.reset(new ProtocolFactory());
_thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);
_thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());
_thread_manager->threadFactory(_thread_factory);
_thread_manager->start();
apache::thrift::server::TNonblockingServer* server = new apache::thrift::server::TNonblockingServer(_processor, _protocol_factory, port, _thread_manager);
server->setNumIOThreads(num_io_threads);
_server.reset(server);
// 不要調用_server->run(),交給serve()來調用,
// 因為一旦調用了run()後,調用線程或程序就被阻塞了。
NET_NAMESPACE_END
#endif // MOOON_NET_THRIFT_HELPER_H