天天看點

librdkafka編譯及簡單使用過程簡介1、編譯版本2、編譯openssl3、編譯 librdkafka4、librdkafka的使用5、kafka測試

librdkafka 使用了 C++11,使用 VS2010 無法編譯,需要使用更高版本的 VS 才能編譯,我這裡使用的是 VS2017。

1、編譯版本

編譯環境:windows VS2017

openssl 版本:openssl-1.0.2t(如果不想編譯,可下載下傳 Win32OpenSSL-1_0_2t.exe安裝,同時編譯庫檔案路徑不使用 …\lib\VC\static,改為…\lib\VC即可)

librdkafka 版本:librdkafka-1.2.1(下載下傳的是releases版本,相對送出版本較穩定,不建議下載下傳最新版本)

2、編譯openssl

1)安裝 ActivePerl 初始化的時候,需要用到 perl 解釋器

下載下傳一個安裝包,然後一直下一步就行,沒有特殊處理;

2)打開 VS2017 開發指令提示符,進入 openssl 解壓目錄

3)配置 config 腳本

在提示符中執行下例語句:

#  編譯release32位:
perl Configure VC-WIN32 no-asm --prefix=D:\openssl_win32

#  編譯release64位:
perl Configure VC-WIN64A

#  編譯debug32位:
perl Configure debug-VC-WIN32

#  編譯debug64位:
perl Configure debug-VC-WIN64A
           

我隻使用過 VC-WIN64A 和 VC-WIN32 ,–prefix 是指定頭檔案、庫檔案路徑生成路徑,no-asm 使用是因為編譯時報錯,錯誤如下:

tmp32dll\sha1-586.asm(1432) : error A2070:invalid instruction operands
tmp32dll\sha1-586.asm(1576) : error A2070:invalid instruction operands
NMAKE : fatal error U1077: “"E:\Visuol Studio 2012\VC\BIN\cl.EXE"”: 傳回代碼“0x1”
           

還有另外一個錯誤需要禁用IPV6,這個錯誤我沒有遇到,不過還是記錄一下:

#  使用
perl Configure VC-WIN32 -DOPENSSL_USE_IPV6=0
#  錯誤
tmp32dll\sha1-586.asm(1432) : error A2070:invalid instruction operands
tmp32dll\sha1-586.asm(1576) : error A2070:invalid instruction operandsN
MAKE : fatal error U1077: “"E:\Visuol Studio 2012\VC\BIN\cl.EXE"”: 傳回代碼“0x2”
           

4)建立 makefile 檔案

在提示符中執行下例語句:

#  建立32位makefile檔案
ms\do_ms.bat
#  建立64位makefile檔案
ms\do_win64a.ba
           

5)執行編譯指令

在編譯過程中,不論是32位還是64位編譯,編譯動态庫都報錯 (LIBEAY32.def : error LNK2001: 無法解析的外部符号 OPENSSL_rdtsc),編譯靜态庫可以成功

(1)、編譯動态庫:

在提示符中執行下例語句:

#  編譯dll
nmake -f ms\ntdll.mak
#  測試dll
nmake -f ms\ntdll.mak test
           

(2)、編譯靜态庫:

在提示符中執行下例語句:

#  編譯lib
nmake -f ms\nt.mak 
#  測試lib
nmake -f ms\nt.mak test
           

6)庫安裝

#  生成lib庫
nmake -f ms\nt.mak install
#  測試dll
nmake -f ms\ntdll.mak install
           

最終會在指定路徑,生成如下檔案:

librdkafka編譯及簡單使用過程簡介1、編譯版本2、編譯openssl3、編譯 librdkafka4、librdkafka的使用5、kafka測試

7)清理編譯内容

nmake -f ms\nt.mak clean
nmake -f ms\ntdll.mak clean
           

如果要重新編譯,隻需執行清理指令,再按1-6步驟重新執行即可。

3、編譯 librdkafka

1)錯誤修改

1、若直接運作,會報以下錯誤

修改方法:

找到 項目檔案.csproj,打開後,移除下面的内容

<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
  <PropertyGroup>
    <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them.  For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
  </PropertyGroup>
  <Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
</Target>
           

2、未包含 zlib.h 檔案

這個是應該 NuGet 管理包的問題,我也不是很熟悉,我找到的解決方法是從 win32/packages/ 檔案中找到 zlib 庫檔案及頭檔案,添加到附加庫目錄中。

2)添加庫檔案及頭檔案

librdkafka編譯及簡單使用過程簡介1、編譯版本2、編譯openssl3、編譯 librdkafka4、librdkafka的使用5、kafka測試
librdkafka編譯及簡單使用過程簡介1、編譯版本2、編譯openssl3、編譯 librdkafka4、librdkafka的使用5、kafka測試
librdkafka編譯及簡單使用過程簡介1、編譯版本2、編譯openssl3、編譯 librdkafka4、librdkafka的使用5、kafka測試

3)編譯 librdkafkacpp

若庫檔案編譯隻編譯 librdkafka,在後續使用代碼編譯報錯:

librdkafka.dll : fatal error LNK1107: 檔案無效或損壞: 無法在 0x3A8 處讀取
           

後續代碼中的庫應使用 librdkafkacpp 編譯出來的 librdkafka.dll 與 librdkafkacpp.dll 動态庫。

4、librdkafka的使用

編譯完的 librdkafka 庫還是無法在 VS2018 中使用,可以編譯通過,但是運作報錯。

1)建立項目,添加庫檔案

頭檔案:頭檔案:\src-cpp\rdkafkacpp.h \src\rdkafka.h ;

庫檔案:配置 librdkafka.dll 與 librdkafkacpp.dll 動态庫(debug 和 release 路徑中 kafka 庫檔案還需要 libzstd.dll 和 zlib.dll);

2、生産者代碼

KafkaProducerClient.h

#ifndef KAFKAPRODUCERCLIENT_H
#define KAFKAPRODUCERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <list>
#include <kafka/rdkafkacpp.h>
#include <vector>
#include <fstream>

using namespace std;
using std::string;
using std::list;
using std::vector;
using std::fstream;

class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
public:
	void dr_cb(RdKafka::Message &message) {
		std::cout << "Message delivery for (" << message.len() << " bytes): " <<
			message.errstr() << std::endl;
		if (message.key())
			std::cout << "Key: " << *(message.key()) << ";" << std::endl;
	}
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {
public:
	void event_cb(RdKafka::Event &event) {
		switch (event.type())
		{
		case RdKafka::Event::EVENT_ERROR:
			std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
				event.str() << std::endl;
			if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
				break;
		case RdKafka::Event::EVENT_STATS:
			std::cerr << "\"STATS\": " << event.str() << std::endl;
			break;
		case RdKafka::Event::EVENT_LOG:
			fprintf(stderr, "LOG-%i-%s: %s\n",
				event.severity(), event.fac().c_str(), event.str().c_str());
			break;
		default:
			std::cerr << "EVENT " << event.type() <<
				" (" << RdKafka::err2str(event.err()) << "): " <<
				event.str() << std::endl;
			break;
		}
	}
};
class KafkaProducerClient
{
public:
	KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0);
	virtual ~KafkaProducerClient();
	bool Init();
	void Send(const string &msg);
	void Stop();
private:
	RdKafka::Producer *m_pProducer;
	RdKafka::Topic *m_pTopic;
	KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
	KafkaProducerEventCallBack m_producerEventCallBack;
	std::string m_strTopics;
	std::string m_strBroker;
	bool m_bRun;
	int m_nPpartition;
};
#endif // KAFKAPRODUCERCLIENT_H
           

KafkaProducerClient.cpp

#include <stdafx.h>
#include "KafkaProducerClient.h"

KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)
    : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
	m_pTopic = NULL;
	m_pProducer = NULL;
	m_nPpartition = 0;
}

KafkaProducerClient::~KafkaProducerClient()
{
    Stop();
}

bool KafkaProducerClient::Init()
{
    string errstr = "";

    /*
     * Create configuration objects
     */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    /*Set configuration properties,設定broker list*/
    if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
    }
    /* Set delivery report callback */
    conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
    conf->set("event_cb", &m_producerEventCallBack, errstr);

    /*
     * Create producer using accumulated global configuration.
    */
    m_pProducer = RdKafka::Producer::create(conf, errstr);
    if (!m_pProducer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        return false;
    }
    std::cout << "% Created producer " << m_pProducer->name() << std::endl;

    /*
     * Create topic handle.
    */
    m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
                                      tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return false;
    }
    return true;
}
void KafkaProducerClient::Send(const string &msg)
{
    if (!m_bRun)
        return;
    /*
     * Produce message
    */
    RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
                                                   RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                   const_cast<char *>(msg.c_str()), msg.size(),
                                                   NULL, NULL);
    if (resp != RdKafka::ERR_NO_ERROR)
        std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
    else
        std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;

    m_pProducer->poll(0);

    /* Wait for messages to be delivered */  //firecat add
    while (m_bRun && m_pProducer->outq_len() > 0) {
        std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
        m_pProducer->poll(100);
    }
}

void KafkaProducerClient::Stop()
{
    delete m_pTopic;
    delete m_pProducer;
}
           

KafkaProducer.cpp

#include "stdafx.h"
#include <iostream>
#include "KafkaProducerClient.h"

int _tmain(int argc, _TCHAR* argv[])
{
	KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("10.10.10.182:9092", "test", 1);
	KafkaprClient_->Init();

	char str_msg[] = "Hello Kafka!";

	while (fgets(str_msg, sizeof(str_msg), stdin))
	{
		size_t len = strlen(str_msg);
		if (str_msg[len - 1] == '\n')
		{
			str_msg[--len] = '\0';
		}

		if (strcmp(str_msg, "end") == 0)
		{
			break;
		}

		KafkaprClient_->Send(str_msg);
	}

	return 0;
}
           

3、生産者代碼

KafkaConsumerClient.h

#ifndef KAFKACONSUMERCLIENT_H
#define KAFKACONSUMERCLIENT_H

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <kafka/rdkafkacpp.h>
#include <vector>
#include <fstream>

using namespace std;

class KafkaConsumerClient {
public:
	KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
	virtual ~KafkaConsumerClient();
	//初始化
	bool Init();
	//開始擷取消息
	void Start(int timeout_ms);
	//停止
	void Stop();

private:
	void Msg_consume(RdKafka::Message* message, void* opaque);

private:
	std::string m_strBrokers;
	std::string m_strTopics;
	std::string m_strGroupid;
	int64_t m_nLastOffset;
	RdKafka::Consumer *m_pKafkaConsumer;
	RdKafka::Topic    *m_pTopic;
	int64_t           m_nCurrentOffset;
	int32_t           m_nPartition;
	bool m_bRun;
};
#endif // KAFKACONSUMERCLIENT_H
           

KafkaConsumerClient.cpp

#include <stdafx.h>
#include "KafkaConsumerClient.h"


KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
    :m_strBrokers(brokers),
      m_strTopics(topics),
      m_strGroupid(groupid),
      m_nPartition(nPartition),
      m_nCurrentOffset(offset)
{
	m_nLastOffset = 0;
	m_pKafkaConsumer = NULL;
	m_pTopic         = NULL;
	m_nCurrentOffset  = RdKafka::Topic::OFFSET_BEGINNING;
	m_nPartition      = 0;
	m_bRun = false;
}

KafkaConsumerClient::~KafkaConsumerClient()
{
    Stop();
}

bool KafkaConsumerClient::Init() {
    std::string errstr;
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if (!conf) {
        std::cerr << "RdKafka create global conf failed" << endl;
        return false;
    }
    /*設定broker list*/
    if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
    }
    /*設定consumer group*/
    if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
    }
    std::string strfetch_num = "10240000";
    /*每次從單個分區中拉取消息的最大尺寸*/
    if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
    }
    /*建立kafka consumer執行個體*/ //Create consumer using accumulated global configuration.
    m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
    if (!m_pKafkaConsumer) {
        std::cerr << "failed to ceate consumer" << endl;
    }
    std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
    delete conf;
    /*建立kafka topic的配置*/
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (!tconf) {
        std::cerr << "RdKafka create topic conf failed" << endl;
        return false;
    }
    if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
    }
    /*
     * Create topic handle.
     */
    m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl;
    }
    delete tconf;
    /*
     * Start consumer for topic+partition at start offset
     */
    RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset);
    if (resp != RdKafka::ERR_NO_ERROR) {
        std::cerr << "failed to start consumer : " << errstr.c_str() << endl;
    }
    return true;
}
void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
      break;

    case RdKafka::ERR_NO_ERROR:
      /* Real message */
      std::cout << "Read msg at offset " << message->offset() << std::endl;
      if (message->key()) {
        std::cout << "Key: " << *message->key() << std::endl;
      }
      printf("%.*s\n",
        static_cast<int>(message->len()),
        static_cast<const char *>(message->payload()));

      m_nLastOffset = message->offset();
      break;

    case RdKafka::ERR__PARTITION_EOF:
      /* Last message */
      cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl;
      //Stop();
      break;
    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;

    default:
      /* Errors */
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
  }
}
void KafkaConsumerClient::Start(int timeout_ms){
    RdKafka::Message *msg = NULL;
    m_bRun = true;
    while (m_bRun) {
        msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);
        Msg_consume(msg, NULL);
        delete msg;
        m_pKafkaConsumer->poll(0);
    }

    m_pKafkaConsumer->stop(m_pTopic, m_nPartition);
    m_pKafkaConsumer->poll(1000);

    if (m_pTopic) {
        delete m_pTopic;
        m_pTopic = NULL;
    }

    if (m_pKafkaConsumer) {
        delete m_pKafkaConsumer;
        m_pKafkaConsumer = NULL;
    }

    /*銷毀kafka執行個體*/ //Wait for RdKafka to decommission.
    RdKafka::wait_destroyed(5000);
}

void KafkaConsumerClient::Stop()
{
    m_bRun = false;
}
           

KafkaConsumer.cpp

#include "stdafx.h"
#include <iostream>
#include "KafkaConsumerClient.h"

int _tmain(int argc, _TCHAR* argv[])
{
	KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("10.10.10.182:9092", "test", "0", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_END

	if (!KafkaConsumerClient_->Init())
	{
		fprintf(stderr, "kafka server initialize error\n");
		return -1;
	}

	KafkaConsumerClient_->Start(1000);

	return 0;
}
           

5、kafka測試

1、下載下傳 kafka

下載下傳 kafka,并解壓

2、啟動 zookeeper

正式運作需要安裝 zookeeper,若測試隻需在解壓的 kafka 路徑下執行:

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
           

zookeeper.properties是zookeeper配置檔案,預設 zookeeper 監聽本地2181端口

3、啟動 kafka,執行以下語句:

.\bin\windows\kafka-server-start.bat config\server.properties

# kafka檔案内還有測試的生成和消費
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

# 消息
.\bin\windows\kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
# 0.90版本之後消費者啟動:
.\bin\windows\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
           

4、啟動編寫的 kafka 程式,驗證庫檔案是否異常

測試結果:

librdkafka編譯及簡單使用過程簡介1、編譯版本2、編譯openssl3、編譯 librdkafka4、librdkafka的使用5、kafka測試