天天看點

vc使用hiredis的幾個填坑動作

1)hiredis使用了Win32_Interop一個庫,這個庫寫的賊亂,導緻與ws2_32.lib中的函數沖突,沒有辦法隻能重新封裝一個DLL,在其他的項目中使用DLL來處理;

2)hiredis本身非線程安全,是以多個線程需要使用連接配接池來操作;

3)hiredis預設使用MTD庫,應該在c++---->代碼生成的位置改為MDD庫;

4)  我也不知道redis用戶端多久會被被逾時替出來,是以需要使用ping指令檢測一下是否重連;

首先我們随便封裝一下基礎代碼:RedisClient

RedisClient.h

#pragma once

#include <stdio.h>  
#include <hiredis/hiredis.h> 
#include <iostream>
#include <string>
//#include <WinSock2.h>

//#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "hiredis.lib")
#pragma comment(lib, "Win32_Interop.lib")
#include <sstream>
#include <vector>
#include <map>
#include "CharVector.h"

namespace robin
{

	class RedisClient
	{
	public:
		RedisClient() : getReply(nullptr)
		{

		}
		~RedisClient() 
		{
			Close();
			printf("~RedisClient() \n");
		}

		bool isConnOk();
		bool Reconnect();
		bool Conn(const char * ip, int port = 6379);
		void Close();
		int  checkError();

		size_t SetHValue(std::string & table, std::string & field, const char * value, size_t len);
		size_t SetHValue(std::string & table, std::string & field, std::string & value);
		int64_t INCR(std::string & table, std::string & field, int count);
		size_t QueryKeys(std::string & table, std::vector<std::string>& vec);
		size_t QueryKeys(std::string & table, std::map<uint32_t, string>& map);
		size_t Query(std::string & table, std::string & field, std::string & value);
		size_t Query(std::string & table, std::string & field, CharVector & value);
		size_t QueryCount(string & table);

		//size_t SetHValue(const char * table, const char *  field, const char * value, size_t len);
		size_t Query(const char * table, const char * field, char ** pValue);
		size_t QueryCount(const char * table);
		int64_t INCR(const char * table, const char * field, int count);
		//CharVector buffer;

	private:
		redisContext* conn = nullptr;
		std::string _ip;
		int _port;

		redisReply * getReply;  // used for get
		
	};

}

           

RedisClient.cpp

#include "RedisClient.h"
//#include <winsock.h>
//#include <winsock2.h>
using namespace robin;

#ifndef _WIN32
#include <sys/time.h> /* for struct timeval */
#else
//struct timeval {
//	long    tv_sec;         /* seconds */
//	long    tv_usec;        /* and microseconds */
//};
#endif
#define MAX_CONN_TIMES 10

/* return  type
	REDIS_REPLY_STRING  : 1
  REDIS_REPLY_ARRAY : 2
  REDIS_REPLY_INTEGER :3
  REDIS_REPLY_NIL  : 4
  REDIS_REPLY_STATUS : 5
  REDIS_REPLY_ERROR : 6
*/
bool RedisClient::isConnOk()
{
	if (conn == nullptr)
		return false;

	if (checkError() > 0)
		return false;

	if ( !(conn->flags & REDIS_CONNECTED) )
	{
		return false;
	}

	bool ret = false;
	redisReply *reply = static_cast<redisReply*>(redisCommand(conn, "PING"));
	if (reply == nullptr)
		return false;

	if (reply->len > 0)
		ret = true;

	freeReplyObject(reply);

	return ret;
}

bool RedisClient::Reconnect()
{
	this->Close();
	int n = 0;
	bool ret = Conn(_ip.c_str(), _port);
	while (ret == false && n < MAX_CONN_TIMES)
	{
		ret = Conn(_ip.c_str(), _port);

		n++;
	}

	return ret;
}

bool RedisClient::Conn(const char * ip, int port)
{
	this->_ip = ip;
	this->_port = port;
	if (conn != nullptr)
		return 0;

	conn = redisConnect(ip, port);
	

	//struct timeval tm;
	//tm.tv_sec = 2;
	//tm.tv_sec = 0;
	//conn = redisConnectWithTimeout(ip, port, tm);
	if (conn->err)
	{
		printf("redis connection error:%s\n", conn->errstr);
		return false;
	}

	return true;
}

size_t RedisClient::SetHValue(std::string & table, std::string & field, const char * value, size_t len)
{
	redisReply* reply = static_cast<redisReply*>(
		redisCommand(conn, "HSET  %b %b %b", table.c_str(), table.size(),
			field.c_str(),
			field.size(),
			value, len)
		);
	if (reply == nullptr)
		return 0;

	size_t ret = 0;
	if (reply->type == REDIS_REPLY_INTEGER)
	{
		ret = reply->integer;
	}
	freeReplyObject(reply);
	return ret;

}

size_t RedisClient::SetHValue(std::string & table, std::string & field, std::string & value)
{
	if (conn == nullptr)
		return -1;

	std::string cmd;

	std::stringstream stream;
	stream << "HSET " << table << " "<< field << " " << value;
	cmd = stream.str();
	redisReply* reply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));
	if (reply == nullptr)
		return 0;

	size_t ret = 0;
	if (reply->type == REDIS_REPLY_INTEGER)
	{
		ret = reply->integer;
	}
	freeReplyObject(reply);
	return ret;
}

int64_t RedisClient::INCR(const char * table, const char * field, int count)
{
	if (conn == nullptr)
		return -1;

	std::string cmd = "";
	std::stringstream stream;


	stream << "hincrby " << table << " " << field << " " << count;


	cmd = stream.str();
	redisReply* reply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));
	if (reply == nullptr)
		return 0;

	int64_t ret = reply->integer;
	freeReplyObject(reply);

	return ret;
}

int64_t RedisClient::INCR(std::string & table, std::string & field, int count)
{
	return INCR(table.c_str(), field.c_str(), count);
}
size_t RedisClient::QueryKeys(std::string & table, std::map<uint32_t, string>& map)
{
	std::string cmd = "";
	std::stringstream stream;
	stream << " HKEYS " << table;
	cmd = stream.str();

	redisReply *reply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));
	size_t len = 0;
	if (reply == nullptr)
		return 0;

	for (size_t i = 0; i < reply->elements; i++)
	{
		redisReply * item = *(reply->element + i);
		uint32_t id = atoi(item->str);
		map.insert(std::pair<uint32_t, string>(id, item->str));
		if (id > len)
			len = id;
	}

	freeReplyObject(reply);
	return len;

}


size_t RedisClient::QueryKeys(std::string & table, std::vector<std::string>& vec)
{
	std::string cmd = "";
	std::stringstream stream;
	stream << " HKEYS " << table;
	cmd = stream.str();

	redisReply *reply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));
	if (reply == nullptr)
		return 0;

	size_t len = reply->elements;
	for (size_t i = 0; i < reply->elements; i++)
	{
		redisReply * item = *(reply->element + i);
		vec.push_back(item->str);
		//item->str;

	}

	freeReplyObject(reply);
	return len;
}

size_t RedisClient::QueryCount(const char * table)
{
	std::string cmd = "";
	std::stringstream stream;
	stream << " HLEN " << table;
	cmd = stream.str();

	redisReply *reply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));

	if (reply == nullptr)
		return 0;

	//_atoi64(char *)
	//char *end;
	//strtoll(reply->str, &end, 10);
	size_t len = reply->integer;
		

	freeReplyObject(reply);
	return len;
}
size_t RedisClient::QueryCount(string & table)
{
	return QueryCount(table.c_str());
}

size_t RedisClient::Query(std::string & table, std::string & field, std::string & value)
{
	std::string cmd = "";
	std::stringstream stream;
	stream << " HGET " << table << " " << field;
	cmd = stream.str();
	redisReply *reply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));

	if (reply == nullptr)
		return 0;

	size_t len = reply->len;
	value = reply->str;

	freeReplyObject(reply);
	return len;
}
size_t RedisClient::Query(std::string & table, std::string & field, CharVector & value)
{
	std::string cmd = "";
	std::stringstream stream;
	stream << " HGET " << table << " " << field;
	cmd = stream.str();
	redisReply *reply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));
	if (reply == nullptr)
		return 0;

	size_t len = reply->len;
	value.reserve(len);
	value.append(reply->str, len);

	freeReplyObject(reply);
	return len;
}

size_t RedisClient::Query(const char * table, const char * field, char ** pValue)
{
	std::string cmd = "";
	std::stringstream stream;
	stream << " HGET " << table << " " << field;
	cmd = stream.str();

	if (getReply)
		freeReplyObject(getReply);

	getReply = static_cast<redisReply*>(redisCommand(conn, cmd.c_str()));
	if (getReply == nullptr)
		return 0;

	*pValue = getReply->str;
	size_t len = getReply->len;
	return len;
}

void RedisClient::Close()
{
	if (getReply != nullptr)
	{
		freeReplyObject(getReply);
		getReply = nullptr;
	}
	if (conn != nullptr)
	{
		redisFree(conn);
		conn = nullptr;
	}
}

int RedisClient::checkError()
{
	if (conn->err == REDIS_ERR_IO || conn->err == REDIS_ERR_EOF)
	{
		return 1;
	}

	return 0;
}

           

之後随便封裝一個管理的接口:

myRedis.h

#pragma once
#include <stdint.h>

//#define DLLIMPORT extern "C" __declspec(dllimport)
#define DLLIMPORT extern "C" 

DLLIMPORT void      myRedisPut(uint64_t idx);
DLLIMPORT uint64_t  myRedisGet(const char *ip, int port);
DLLIMPORT size_t    myRedisClear();

DLLIMPORT size_t hashSetBytes(uint64_t idx, const char * table, const char * field, const char * value, size_t len);
DLLIMPORT size_t hashGetBytes(uint64_t idx, const char * table, const char * field, char ** pValue);
DLLIMPORT size_t hashQueryLen(uint64_t idx, const char * table);
DLLIMPORT int64_t hashIncBy(uint64_t idx, const char * table, const char * field, int count);
           

在dll中定義如下:myRedis.cpp

#include "../testRedis/RedisClient.h"
#include <mutex>
#include <deque>

#define DLLEXPORT extern "C" __declspec(dllexport) 

using namespace robin;
using RedisClient_Ptr = shared_ptr<RedisClient>;
static std::mutex poolMutex;

static std::map<uint64_t, RedisClient_Ptr>  usedMap;
static std::deque<RedisClient_Ptr> idleQue;

// find out an unused id
// if 0, then close and free
uint64_t findId()
{
	for (uint64_t i = 1; i < 1000; i++)
	{
		auto it = usedMap.find(i);
		if (it == usedMap.end())
		{
			return i;
		}
	}

	return 0;
}

// read write to get pointer
RedisClient_Ptr findClient(uint64_t idx)
{
	std::lock_guard < std::mutex > guard(poolMutex);
	auto it = usedMap.find(idx);
	if (it == usedMap.end())
		return nullptr;

	 return usedMap[idx];
}

///
DLLEXPORT size_t myRedisClear()
{
	std::lock_guard < std::mutex > guard(poolMutex);
	size_t n = idleQue.size();
	printf("idleQueSize =%zu , using=%zu \n", idleQue.size(), usedMap.size());
	idleQue.clear();
	usedMap.clear();
	return n;
}


DLLEXPORT void myRedisPut(uint64_t idx)
{

	std::lock_guard < std::mutex > guard(poolMutex);
	auto it = usedMap.find(idx);
	if (it == usedMap.end())
		return;

	RedisClient_Ptr client = usedMap[idx];
	idleQue.push_back(client);
	usedMap.erase(idx);
	//printf("put back %ju, using=%ju, idle=%ju \n", idx, usedMap.size(), idleQue.size());

}

// return id
DLLEXPORT uint64_t  myRedisGet(const char *ip, int port)
{
	RedisClient_Ptr client;
	uint64_t idx;
	// mutex
	{
		std::lock_guard < std::mutex > guard(poolMutex);
		idx = findId();
		if (idx == 0)
			return idx;

		if (idleQue.size() > 0)
		{
			client = idleQue[0];
			idleQue.pop_front();
			usedMap.insert(std::make_pair(idx, client));
		}
		else
		{
			client = make_shared<RedisClient>();
			usedMap.insert(std::make_pair(idx, client));
		}
	}

	bool ret = client->isConnOk();
	if (ret == false)
	{
		client->Close();
		ret = client->Conn(ip, port);
	}

	if (ret == false)
	{
		myRedisPut(idx);
		return 0;
	}
	
	return idx;
}

//
DLLEXPORT int hashSetBytes(uint64_t idx,  const char * table, const char * field, const char * value, size_t len)
{
	RedisClient_Ptr  client = findClient(idx);
	if (!client)
		return -1;

	std::string table1(table);
	std::string field1(field);

	return client->SetHValue(table1, field1, value, len);
}

DLLEXPORT size_t hashGetBytes(uint64_t idx, const char * table, const char * field, char ** pValue)
{
	RedisClient_Ptr  client = findClient(idx);
	if (!client)
		return -1;

	return client->Query(table, field, pValue);
}

DLLEXPORT size_t hashQueryLen(uint64_t idx, const char * table)
{
	RedisClient_Ptr  client = findClient(idx);
	if (!client)
		return -1;

	return client->QueryCount(table);
}


DLLEXPORT int64_t hashIncBy(uint64_t idx, const char * table, const char * field, int count)
{
	RedisClient_Ptr  client = findClient(idx);
	if (!client)
		return -1;

	std::string table1(table);
	std::string field1(field);
	return client->INCR(table1, field1, count);
}



           

注意:在dllmain中需要記得清理資源:

#include "pch.h"
#include "../testMyRedis/myRedis.h"

BOOL APIENTRY DllMain( HMODULE hModule,
                       DWORD  ul_reason_for_call,
                       LPVOID lpReserved
                     )
{
    switch (ul_reason_for_call)
    {
    case DLL_PROCESS_ATTACH:
    case DLL_THREAD_ATTACH:
		break;
    case DLL_THREAD_DETACH:
		break;
    case DLL_PROCESS_DETACH:
		myRedisClear();
        break;
    }
    return TRUE;
}
           

編譯好了dll檔案,就可以簡單的測試一下了:

// testMyRedis.cpp : 此檔案包含 "main" 函數。程式執行将在此處開始并結束。
//

#include <iostream>
#include <stdint.h>
#include <sstream>
#include <thread>

#include "myRedis.h"
#pragma comment(lib, "RedisDll.lib")
using namespace std;

void testFunc(int id)
{
	stringstream table;
	table << "x-node";
	table << id;

	for (int n = 0; n < 1000; n++)
	{
		uint64_t idx = myRedisGet("127.0.0.1", 6379);
		if (idx == 0)
		{
			std::cout << "error : get id = 0\n";
			return;
		}
		//std::cout << "get id = " << idx << endl;
		

		stringstream field;
		field << "num" << n;

		stringstream str;
		str << n;
		size_t ret = hashSetBytes(idx, table.str().c_str(), field.str().c_str(),
			str.str().c_str(), str.str().size());

		//cout << ret << endl;

		// get test
		char * buf = nullptr;
		ret = hashGetBytes(idx, table.str().c_str(), field.str().c_str(), &buf);
		if (ret < 1)
		{

		}
		else
		{
			//string str(buf, ret);
			//std::cout << "get str = " << str.c_str() << endl;
		}
		// inc

		ret = hashIncBy(idx, table.str().c_str(), "count", 1);

		myRedisPut(idx);
	}
	
	// test count == 100000
	uint64_t idx = myRedisGet("127.0.0.1", 6379);
	char * buf = nullptr;
	size_t ret = hashGetBytes(idx, table.str().c_str(), "count", &buf);
	if (ret < 1)
	{
		std::cout << table.str().c_str() << " count=error" << endl;
	}
	else
	{
		string str(buf, ret);
		std::cout << table.str().c_str() <<" count= " << str.c_str() << endl;
	}

	ret = hashQueryLen(idx, table.str().c_str());
	std::cout << table.str().c_str() << " elements= " << ret << endl;


}

int main()
{
	std::thread thread1 = thread(testFunc, 1);
	std::thread thread2 = thread(testFunc, 2);
	std::thread thread3 = thread(testFunc, 3);
	std::thread thread4 = thread(testFunc, 4);
	
	thread1.join();
	thread2.join();
	thread3.join();
	thread4.join();


	char c;
	//cin >> c;
	return 1;
}

           

各個寫入動作雖然沒有測試,但是結果好像還湊合:

x-node4 count= 1000
x-node4 elements= 1001
x-node2 count= 1000
x-node2 elements= 1001
x-node1 count= 1000
x-node1 elements= 1001
x-node3 count= 1000
x-node3 elements= 1001
idleQueSize =0 , using=4
~RedisClient()
~RedisClient()
~RedisClient()
~RedisClient()
           

繼續閱讀