librdkafka 使用了 C++11,使用 VS2010 无法编译,需要使用更高版本的 VS 才能编译,我这里使用的是 VS2017。
1、编译版本
编译环境:windows VS2017
openssl 版本:openssl-1.0.2t(如果不想编译,可下载 Win32OpenSSL-1_0_2t.exe安装,同时编译库文件路径不使用 …\lib\VC\static,改为…\lib\VC即可)
librdkafka 版本:librdkafka-1.2.1(下载的是releases版本,相对提交版本较稳定,不建议下载最新版本)
2、编译openssl
1)安装 ActivePerl 初始化的时候,需要用到 perl 解释器
下载一个安装包,然后一直下一步就行,没有特殊处理;
2)打开 VS2017 开发命令提示符,进入 openssl 解压目录
3)配置 config 脚本
在提示符中执行下例语句:
# 编译release32位:
perl Configure VC-WIN32 no-asm --prefix=D:\openssl_win32
# 编译release64位:
perl Configure VC-WIN64A
# 编译debug32位:
perl Configure debug-VC-WIN32
# 编译debug64位:
perl Configure debug-VC-WIN64A
我只使用过 VC-WIN64A 和 VC-WIN32 ,–prefix 是指定头文件、库文件路径生成路径,no-asm 使用是因为编译时报错,错误如下:
tmp32dll\sha1-586.asm(1432) : error A2070:invalid instruction operands
tmp32dll\sha1-586.asm(1576) : error A2070:invalid instruction operands
NMAKE : fatal error U1077: “"E:\Visuol Studio 2012\VC\BIN\cl.EXE"”: 返回代码“0x1”
还有另外一个错误需要禁用IPV6,这个错误我没有遇到,不过还是记录一下:
# 使用
perl Configure VC-WIN32 -DOPENSSL_USE_IPV6=0
# 错误
tmp32dll\sha1-586.asm(1432) : error A2070:invalid instruction operands
tmp32dll\sha1-586.asm(1576) : error A2070:invalid instruction operandsN
MAKE : fatal error U1077: “"E:\Visuol Studio 2012\VC\BIN\cl.EXE"”: 返回代码“0x2”
4)创建 makefile 文件
在提示符中执行下例语句:
# 创建32位makefile文件
ms\do_ms.bat
# 创建64位makefile文件
ms\do_win64a.ba
5)执行编译命令
在编译过程中,不论是32位还是64位编译,编译动态库都报错 (LIBEAY32.def : error LNK2001: 无法解析的外部符号 OPENSSL_rdtsc),编译静态库可以成功
(1)、编译动态库:
在提示符中执行下例语句:
# 编译dll
nmake -f ms\ntdll.mak
# 测试dll
nmake -f ms\ntdll.mak test
(2)、编译静态库:
在提示符中执行下例语句:
# 编译lib
nmake -f ms\nt.mak
# 测试lib
nmake -f ms\nt.mak test
6)库安装
# 生成lib库
nmake -f ms\nt.mak install
# 测试dll
nmake -f ms\ntdll.mak install
最终会在指定路径,生成如下文件:
7)清理编译内容
nmake -f ms\nt.mak clean
nmake -f ms\ntdll.mak clean
如果要重新编译,只需执行清理指令,再按1-6步骤重新执行即可。
3、编译 librdkafka
1)错误修改
1、若直接运行,会报以下错误
修改方法:
找到 项目文件.csproj,打开后,移除下面的内容
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
</Target>
2、未包含 zlib.h 文件
这个是应该 NuGet 管理包的问题,我也不是很熟悉,我找到的解决方法是从 win32/packages/ 文件中找到 zlib 库文件及头文件,添加到附加库目录中。
2)添加库文件及头文件
3)编译 librdkafkacpp
若库文件编译只编译 librdkafka,在后续使用代码编译报错:
librdkafka.dll : fatal error LNK1107: 文件无效或损坏: 无法在 0x3A8 处读取
后续代码中的库应使用 librdkafkacpp 编译出来的 librdkafka.dll 与 librdkafkacpp.dll 动态库。
4、librdkafka的使用
编译完的 librdkafka 库还是无法在 VS2018 中使用,可以编译通过,但是运行报错。
1)创建项目,添加库文件
头文件:头文件:\src-cpp\rdkafkacpp.h \src\rdkafka.h ;
库文件:配置 librdkafka.dll 与 librdkafkacpp.dll 动态库(debug 和 release 路径中 kafka 库文件还需要 libzstd.dll 和 zlib.dll);
2、生产者代码
KafkaProducerClient.h
#ifndef KAFKAPRODUCERCLIENT_H
#define KAFKAPRODUCERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <list>
#include <kafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using namespace std;
using std::string;
using std::list;
using std::vector;
using std::fstream;
class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
message.errstr() << std::endl;
if (message.key())
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
}
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
class KafkaProducerClient
{
public:
KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0);
virtual ~KafkaProducerClient();
bool Init();
void Send(const string &msg);
void Stop();
private:
RdKafka::Producer *m_pProducer;
RdKafka::Topic *m_pTopic;
KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
KafkaProducerEventCallBack m_producerEventCallBack;
std::string m_strTopics;
std::string m_strBroker;
bool m_bRun;
int m_nPpartition;
};
#endif // KAFKAPRODUCERCLIENT_H
KafkaProducerClient.cpp
#include <stdafx.h>
#include "KafkaProducerClient.h"
KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)
: m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
m_pTopic = NULL;
m_pProducer = NULL;
m_nPpartition = 0;
}
KafkaProducerClient::~KafkaProducerClient()
{
Stop();
}
bool KafkaProducerClient::Init()
{
string errstr = "";
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
/*Set configuration properties,设置broker list*/
if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
}
/* Set delivery report callback */
conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
conf->set("event_cb", &m_producerEventCallBack, errstr);
/*
* Create producer using accumulated global configuration.
*/
m_pProducer = RdKafka::Producer::create(conf, errstr);
if (!m_pProducer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return false;
}
std::cout << "% Created producer " << m_pProducer->name() << std::endl;
/*
* Create topic handle.
*/
m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
tconf, errstr);
if (!m_pTopic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
return false;
}
return true;
}
void KafkaProducerClient::Send(const string &msg)
{
if (!m_bRun)
return;
/*
* Produce message
*/
RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.c_str()), msg.size(),
NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR)
std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
else
std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
m_pProducer->poll(0);
/* Wait for messages to be delivered */ //firecat add
while (m_bRun && m_pProducer->outq_len() > 0) {
std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
m_pProducer->poll(100);
}
}
void KafkaProducerClient::Stop()
{
delete m_pTopic;
delete m_pProducer;
}
KafkaProducer.cpp
#include "stdafx.h"
#include <iostream>
#include "KafkaProducerClient.h"
int _tmain(int argc, _TCHAR* argv[])
{
KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("10.10.10.182:9092", "test", 1);
KafkaprClient_->Init();
char str_msg[] = "Hello Kafka!";
while (fgets(str_msg, sizeof(str_msg), stdin))
{
size_t len = strlen(str_msg);
if (str_msg[len - 1] == '\n')
{
str_msg[--len] = '\0';
}
if (strcmp(str_msg, "end") == 0)
{
break;
}
KafkaprClient_->Send(str_msg);
}
return 0;
}
3、生产者代码
KafkaConsumerClient.h
#ifndef KAFKACONSUMERCLIENT_H
#define KAFKACONSUMERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <kafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using namespace std;
class KafkaConsumerClient {
public:
KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
virtual ~KafkaConsumerClient();
//初始化
bool Init();
//开始获取消息
void Start(int timeout_ms);
//停止
void Stop();
private:
void Msg_consume(RdKafka::Message* message, void* opaque);
private:
std::string m_strBrokers;
std::string m_strTopics;
std::string m_strGroupid;
int64_t m_nLastOffset;
RdKafka::Consumer *m_pKafkaConsumer;
RdKafka::Topic *m_pTopic;
int64_t m_nCurrentOffset;
int32_t m_nPartition;
bool m_bRun;
};
#endif // KAFKACONSUMERCLIENT_H
KafkaConsumerClient.cpp
#include <stdafx.h>
#include "KafkaConsumerClient.h"
KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
:m_strBrokers(brokers),
m_strTopics(topics),
m_strGroupid(groupid),
m_nPartition(nPartition),
m_nCurrentOffset(offset)
{
m_nLastOffset = 0;
m_pKafkaConsumer = NULL;
m_pTopic = NULL;
m_nCurrentOffset = RdKafka::Topic::OFFSET_BEGINNING;
m_nPartition = 0;
m_bRun = false;
}
KafkaConsumerClient::~KafkaConsumerClient()
{
Stop();
}
bool KafkaConsumerClient::Init() {
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (!conf) {
std::cerr << "RdKafka create global conf failed" << endl;
return false;
}
/*设置broker list*/
if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
}
/*设置consumer group*/
if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
}
std::string strfetch_num = "10240000";
/*每次从单个分区中拉取消息的最大尺寸*/
if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
}
/*创建kafka consumer实例*/ //Create consumer using accumulated global configuration.
m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
if (!m_pKafkaConsumer) {
std::cerr << "failed to ceate consumer" << endl;
}
std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
delete conf;
/*创建kafka topic的配置*/
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (!tconf) {
std::cerr << "RdKafka create topic conf failed" << endl;
return false;
}
if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
}
/*
* Create topic handle.
*/
m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr);
if (!m_pTopic) {
std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl;
}
delete tconf;
/*
* Start consumer for topic+partition at start offset
*/
RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "failed to start consumer : " << errstr.c_str() << endl;
}
return true;
}
void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
printf("%.*s\n",
static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
m_nLastOffset = message->offset();
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl;
//Stop();
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
Stop();
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
Stop();
break;
}
}
void KafkaConsumerClient::Start(int timeout_ms){
RdKafka::Message *msg = NULL;
m_bRun = true;
while (m_bRun) {
msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);
Msg_consume(msg, NULL);
delete msg;
m_pKafkaConsumer->poll(0);
}
m_pKafkaConsumer->stop(m_pTopic, m_nPartition);
m_pKafkaConsumer->poll(1000);
if (m_pTopic) {
delete m_pTopic;
m_pTopic = NULL;
}
if (m_pKafkaConsumer) {
delete m_pKafkaConsumer;
m_pKafkaConsumer = NULL;
}
/*销毁kafka实例*/ //Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
}
void KafkaConsumerClient::Stop()
{
m_bRun = false;
}
KafkaConsumer.cpp
#include "stdafx.h"
#include <iostream>
#include "KafkaConsumerClient.h"
int _tmain(int argc, _TCHAR* argv[])
{
KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("10.10.10.182:9092", "test", "0", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_END
if (!KafkaConsumerClient_->Init())
{
fprintf(stderr, "kafka server initialize error\n");
return -1;
}
KafkaConsumerClient_->Start(1000);
return 0;
}
5、kafka测试
1、下载 kafka
下载 kafka,并解压
2、启动 zookeeper
正式运行需要安装 zookeeper,若测试只需在解压的 kafka 路径下执行:
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
zookeeper.properties是zookeeper配置文件,默认 zookeeper 监听本地2181端口
3、启动 kafka,执行以下语句:
.\bin\windows\kafka-server-start.bat config\server.properties
# kafka文件内还有测试的生成和消费
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
# 消息
.\bin\windows\kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
# 0.90版本之后消费者启动:
.\bin\windows\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
4、启动编写的 kafka 程序,验证库文件是否异常
测试结果: