天天看点

thrift中TNonblockingServer的简单用法

    最近在项目中需要把客户端的一些信息发送到服务器上,听起来是个很简单的需求,但是实际考虑下,觉得如果自己手工实现,工作量也不小,而且尽是些繁琐且无聊的事情,遂考虑用现成的库来实现。对比了protocol buffer与thrift后,本着偷懒到底的原则,选择了thrift,因为thrift本身提供了RPC框架,而protocol buffer仅是个序列化的库而已。

   首先是编译thrift,这里参考官方说明,需要先装boost库,但是如果要使用nonblocking server的话,还要再把libevent库也装上。

    thrift提供了三种服务模型,分别是TSimpleServer, TThreadPoolServer和TNonblockingServer,除去第一个一般仅做测试用,后两个都可以在实际生产中拿来用。在客户端不多的情况下,可以选用TThreadPoolServer,但是要注意TThreadPoolServer的客户端只要不从服务器上断开连接,就会一直占据服务器的一个线程,当服务器线程池所有线程都在被使用时,新到来的客户端将排在队列里等待,直到有客户端断开连接,使服务器端线程池出现空闲线程方可继续被提供服务,所以使用这种模型时,一定要注意客户端不使用时不要长时间连接服务器,如果确实有这种需求,请使用TNonblockingServer。

    说实话,单纯从代码量上来讲,使用Nonblocking server并不比ThreadPool server多了多少,谁让代码都是由thrift程序生成的,用户只需填上实际处理的代码即可。

    下面用一个简单的例子说明

    clientInfo.thrift

namespace cpp vnmp

enum ClientType {
    DOM_MANAGER,
    DOM_SERVICE
}

enum RegistResult {
    SUCCESS,
    NAME_EXISTED,
    INVALIE_PARA,
}

struct ClientInfo {
    1: string name,
    2: string realIP,
    3: string vpnIP,
    4: ClientType type,
    5: optional string description,
}


service Regist {
    RegistResult registClient(1:ClientInfo clientInfo),
    bool heartbeat(1:string name, 2:ClientType type)
}
           

对这个文件做一个简单的说明,client需要把自己的信息ClientInfo发送到server上注册,调用registClient方法,heartbeat方法是用来做心跳的。

执行 thrift -r --gen cpp clientInfo.thrift

如果没有语法错误的话,在gen-cpp目录下会生成  

clientInfo_constants.h clientInfo_constants.cpp 

clientInfo_types.h clientInfo_types.cpp

Regist_server.skeleton.cpp

其中的skeleton文件包含了一个简单的TSimpleServer实现,是可以直接编译使用的,这个文件也就是我们要修改的文件,建议另外建一个main文件,并将其中内容拷过来,其他几个强烈建议不要做修改,一来没需要,二来如果做了修改,下次执行thrift文件时,也会被新生成的文件覆盖,这也是我前面建议另外建一个main文件还不是直接修改skeketon文件的原因。

下面是main文件的主要内容,略去了头文件的包含和命名空间的使用等等,这里假定读者已有了一定的boost基础。

class RegistHandler : virtual public RegistIf {
public:

    RegistHandler() {
        // Your initialization goes here
    }

    RegistResult::type registClient(const ClientInfo& clientInfo) {
        // Your implementation goes here
        printf("registClient\n");
    }

    void heartbeat(const std::string& name, const ClientType::type type) {
        // Your implementation goes here
        printf("heartbeat\n");
    }

};

int main(int argc, char **argv) {
    int port = 9090;
    shared_ptr<RegistHandler> handler(new RegistHandler());
    shared_ptr<TProcessor> processor(new RegistProcessor(handler));
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15);
    shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory > (new PosixThreadFactory());
    threadManager->threadFactory(threadFactory);
    threadManager->start();
    TNonblockingServer server(processor, protocolFactory, port, threadManager);
    server.serve();
    return 0;
}
           

regist service的实际处理方法写在registHandler对应的方法里。主要是main方法做个简单说明:

这里使用了thrift库自带的ThreadManager,建立了一个拥有15个线程的线程池,也就是说这个NonblockingServer拥有15个工作线程。

除了shared_ptr来自boost,其他均是thrift自带,命名空间是apache::thrift 

client端

int main(int argc, char** argv) {
    boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));
    boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));

    RegistClient client(protocol);

    transport->open();
    //insert your code here
    //      ....
    transport->close();
    return 0;
}
           

注意对于nonblocking server,client端的TTransport只能选用TFramedTransport;如果通信过程中出现异常,会抛出异常,可以用try catch捕获并做处理。

继续阅读