天天看点

Thrift 双向通信实现(c++版)

注意:thrift的client是非线程安全的!!!

1. thrift文件

thrift IDL文件源码:

service HelloWorldBidirectionService{
	oneway void SayHello(1:string msg);
}
           

2. 开发环境设置

Thrift 双向通信实现(c++版)
Thrift 双向通信实现(c++版)

3. thrift服务端编写

thrift服务端修改,将HelloWorldBidirectionService_server.skeleton.cpp文件修改为:

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "HelloWorldBidirectionService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/server/TThreadPoolServer.h>

#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/PosixThreadFactory.h>  
#include <thrift/concurrency/StdThreadFactory.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::apache::thrift::concurrency;

using boost::shared_ptr;

static boost::shared_ptr<transport::TTransport> g_CurrentTransport = nullptr;

class HelloWorldBidirectionServiceHandler : virtual public HelloWorldBidirectionServiceIf {
public:
	HelloWorldBidirectionServiceHandler() {
		// Your initialization goes here
	}

	void SayHello(const std::string& msg) {
		// Your implementation goes here
		printf("%s\n", msg);

		SayToClient(msg);
	}
	void SayToClient(const std::string& msg)
	{
		shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(g_CurrentTransport));
		HelloWorldBidirectionServiceClient client(protocol);
		//Thread.Sleep(1000);
		client.SayHello(msg);
	}
};

class HelloWorldBidirectionProcessor : public TProcessorFactory
{
public:
	boost::shared_ptr<TProcessor> getProcessor(const TConnectionInfo& connInfo)
	{
		g_CurrentTransport = connInfo.transport;

		printf("接收到连接信号\n");

		shared_ptr<HelloWorldBidirectionServiceHandler> handler(new HelloWorldBidirectionServiceHandler());
		shared_ptr<TProcessor> processor(new HelloWorldBidirectionServiceProcessor(handler));
		return processor;
	}
};
static boost::shared_ptr<TProcessorFactory> getProcessorFactory()
{
	shared_ptr<TProcessorFactory> processorFactory(new HelloWorldBidirectionProcessor());
	return processorFactory;
}



int main_tt(int argc, char **argv) {
	int port = 9090;
	shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
	shared_ptr<TTransportFactory> transportFactory(new TTransportFactory());
	shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

	shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(10);//指定10个线程数  

	shared_ptr<StdThreadFactory> threadFactory = shared_ptr<StdThreadFactory>(new StdThreadFactory());
	threadManager->threadFactory(threadFactory);
	threadManager->start();

	shared_ptr<TThreadPoolServer> server(new TThreadPoolServer(getProcessorFactory(), serverTransport, transportFactory, protocolFactory, threadManager));

	/* shared_ptr<HelloWorldBidirectionServiceHandler> handler(new HelloWorldBidirectionServiceHandler());
	 shared_ptr<TProcessor> processor(new HelloWorldBidirectionServiceProcessor(handler));
	 shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
	 shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
	 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

	 TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);*/
	server->serve();
	return 0;
}
           

4. thrift 客户端编写

thrift 客户端修改,新建文件 HelloWorldBidirectionService_client.cpp,编码:

#include <iostream>

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>

#include "HelloWorldBidirectionService.h"

#include <boost/thread.hpp>


using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

//using namespace tutorial;
using boost::shared_ptr;
//using namespace boost;
using boost::thread;

#define BOOST_DATE_TIME_SOURCE
#define BOOST_THREAD_NO_LIB


class HelloWorldBidirectionFace :public HelloWorldBidirectionServiceIf
{
public:
	void SayHello(const std::string& msg) {
		// Your implementation goes here
		/*printf("%s\n", msg);
		printf("%s\n", &msg);*/

		cout << msg << endl;
		//cout << &msg << endl;
	}
};

static void Run(boost::shared_ptr<TTransport> sock)
{
	boost::shared_ptr<HelloWorldBidirectionFace> handler(new HelloWorldBidirectionFace());
	boost::shared_ptr<TProcessor> processor(new HelloWorldBidirectionServiceProcessor(handler));

	//boost::shared_ptr<HelloWorldBidirectionServiceProcessor> processor(new HelloWorldBidirectionFace());
	try
	{
		boost::shared_ptr<TProtocol> inProtocol(new TBinaryProtocol(sock));
		boost::shared_ptr<TProtocol> outProtocol(new TBinaryProtocol(sock));
		while (processor->process(inProtocol, outProtocol, "proc"))
		{
			printf("wait next msg\n");
		}
	}
	catch (TException& tx)
	{
		printf("connect close\n");
		cout << "ERROR: " << tx.what() << endl;
	}
}

static void RecFromConsole(boost::shared_ptr<HelloWorldBidirectionServiceClient> client)
{
	string str;
	cout << "input exit stop" << endl;
	cin >> str;
	while (str.compare("exit"))
	{
		client->SayHello(str);

		cin >> str;
	}
}


int main()
{
	boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
	boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
	boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
	boost::shared_ptr<HelloWorldBidirectionServiceClient> client(new HelloWorldBidirectionServiceClient(protocol));

	try {
		transport->open();

		boost::thread mythread(Run, socket);

		boost::thread mythread2(RecFromConsole, client);
		mythread2.join();

		transport->close();
	}
	catch (TException& tx) {
		cout << "ERROR: " << tx.what() << endl;
	}
}
           

6. 源码地址:

Thrift_HelloWorld_Cpp_Bidirection