天天看点

zeromq 发布订阅 无数据丢失性能测试

sub接收端代码:

#include <zmq.hpp>
#include <assert.h>
#include <iostream>
#include <string>

using namespace std;
#define BAG_LARGE_NUM			14

long long int gettime(){  
	struct timeval tv = {0, 0};  
	gettimeofday(&tv, NULL);  
	return tv.tv_sec * 1000000 + tv.tv_usec;  
}

int main(int vArgc, char** vArgv)
{
	char* ProtocoltConSub			= vArgv[1];
	char* ProtocoltConReq			= vArgv[2];
	cout<<"ProtocoltConSub: "<<ProtocoltConSub<<endl;
	cout<<"ProtocoltConReq: "<<ProtocoltConReq<<endl;
	cout<<"***********************Client Start********************"<<endl;
	zmq::context_t		tContext(1);
	zmq::socket_t		tSocketSub(tContext, ZMQ_SUB);
	zmq::socket_t		tSocketReq(tContext, ZMQ_REQ);
	tSocketSub.setsockopt(ZMQ_SUBSCRIBE, "", 0);
	tSocketSub.connect(ProtocoltConSub);
	tSocketReq.connect(ProtocoltConReq);

	long long				tTimeStart, tTimeEnd;
	zmq::message_t		tMsgReq(32), tMsgSub(32);
	//同步pub sub
	tSocketReq.send(tMsgReq);
	tMsgReq.rebuild();
	tSocketReq.recv(&tMsgReq);
	tMsgSub.rebuild();
	tSocketSub.recv(&tMsgSub);  //等待pub端所有req
	for(int i=0; i<BAG_LARGE_NUM; i++)
	{
		long long tLoopTimes = atoi((char*)tMsgReq.data());
		tTimeStart = gettime();
		for (int k=0; tLoopTimes; k++)
		{
			if(0 == k%500)
			{//sub pub同步
				tMsgReq.rebuild(32);
				tSocketReq.send(tMsgReq);
				tMsgReq.rebuild();
				tSocketReq.recv(&tMsgReq);
			}
			tMsgSub.rebuild();
			tSocketSub.recv(&tMsgSub);
		}
		tTimeEnd	= gettime();
		cout<<"bag large: "<<tMsgSub.size()<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tLoopTimes/(tTimeEnd-tTimeStart) <<endl;
		//更新tLoopTimes
		tMsgReq.rebuild(32);
		tSocketReq.send(tMsgReq);
		tMsgReq.rebuild();
		tSocketReq.recv(&tMsgReq);
	}
	cout<<"finish"<<endl;
	return 0;
}
           

./Client tcp://localhost:5555 tcp://localhost:5556

pub发布端代码:

#include <zmq.hpp>
#include <assert.h>
#include <iostream>

using namespace std;
#define BAG_LARGE_NUM			14

long long int gettime(){  
	struct timeval tv = {0, 0};  
	gettimeofday(&tv, NULL);  
	return tv.tv_sec * 1000000 + tv.tv_usec;  
}

int main(int vArgc, char** vArgv)
{
	char* tProtocolPub		= vArgv[1];
	char* tProtocolRep		= vArgv[2];
	int tClientNum			= atoi(vArgv[3]);
	long long tLoopL		= atoi(vArgv[4]);
	long long tLoopB		= atoi(vArgv[5]);

	cout<<"tProtocolPub: "<<tProtocolPub<<endl;
	cout<<"tProtocolRep: "<<tProtocolRep<<endl;
	cout<<"tLoopL: "<<tLoopL<<endl;
	cout<<"tLoopB: "<<tLoopB<<endl;
	cout<<"ClientNum: "<<tClientNum<<endl;
	cout<<"**********************Server Start*******************"<<endl;
	zmq::context_t		tContext(1);
	zmq::socket_t		tSocketPub(tContext, ZMQ_PUB);
	zmq::socket_t		tSocketRep(tContext, ZMQ_REP);
	tSocketPub.bind(tProtocolPub);
	tSocketRep.bind(tProtocolRep);

	std::string tData;
	zmq::message_t tMsgRecv(32), tMsgSend(32), tMsg(32);
	long long tBagLarge[BAG_LARGE_NUM] = {1, 4, 8, 32, 64, 128, 256, 512,1024,1024*8, 1024*64, 1024*128, 1024*512, 1024*1024};
	long long tChansportLoop = tLoopL;
	long long			tTimeStart, tTimeEnd;
	for (int i=0; i<tClientNum; i++)
	{//pub sub同步
		tMsgRecv.rebuild();
		tSocketRep.recv(&tMsgRecv, 0);
		tMsgSend.rebuild(sizeof(tBagLarge[0]));
		tSocketRep.send(tMsgSend);
	}
	
	for (int i = 0; i<BAG_LARGE_NUM; i++)
	{
		tTimeStart = gettime();
		for (long long t=0; t<tChansportLoop; t++)
		{
			if(0 == t%500)
			{
				for (int j=0; j<tClientNum; j++)
				{//pub sub同步
					tMsg.rebuild();
					tSocketRep.recv(&tMsg, 0);
					tMsg.rebuild(32);
					tSocketRep.send(tMsg, 0);
				}
			}
			tMsgSend.rebuild(tBagLarge[i]);
			tSocketPub.send(tMsgSend, 0);
		}
		tTimeEnd = gettime();
		cout<<"bag large: "<<tBagLarge[i]<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tChansportLoop/(tTimeEnd-tTimeStart) <<endl;
		//next chansport loop
		tMsg.rebuild();
		tSocketRep.recv(&tMsg);
		tMsg.rebuild(tBagLarge[i+1]);
		tSocketRep.send(tMsg);
	}

	cout<<"finish"<<endl;
	return 0;
}
           

脚本命令:./Server tcp://*:5555 tcp://*:5556 3 10000 100000