由于業務需要,需要在librdkafka開源庫的基礎上增加SASL認證。相關的資料又比較少,特此記錄,以便後用。本此librdkafka的使用時在windows平台,。
一、版本和準備工作
1、librdkafka
由于之前一直使用的是librdkafka0.9版本,編譯之後死活認證失敗,最後拉取最新版本,測試可用。點選此處 下載下傳最新librdkafka源碼
2、openssl
由于librdkafka編譯時是需要ssl的,是以需要OpenSSL的庫支援。可以根據自己的情況選擇自己編譯或者直接下載下傳已經在各平台編譯好的庫,此處下載下傳openssl源碼,我這裡選擇已經編譯好的庫及頭檔案,點選此處下載下傳openssl庫,提取碼:gf4h
3.sasl
sasl的庫在編譯librdkafka時不是必須的,但是我們要支援sasl認證,是以也是需要該庫檔案的。同OpenSSL一樣,sasl也是開源的,此處下載下傳cyrus-sasl源碼,大家根據自己的情況自己編譯或者下載下傳相應的檔案,我這裡同樣使用的是下載下傳好的檔案,點選此處下載下傳windows平台sasl安裝包,提取碼:0tnc
下載下傳 完cyrus-sasl 之後,根據自己需要安裝x86或者x64的版本,安裝時預設下一步,我這邊安裝結構如下:
根據自己需要使用dll或者lib庫,我選擇了lib庫檔案,留着待用,後續編譯時會使用到。
二、編譯
1、準備工作
解壓并打開已經下載下傳的librdkafka目錄,選擇win32目錄。打開.sln檔案,準備編譯,我這邊使用的VS2015社群版,其他的windows編譯器理論上是能夠編譯的(除vc6),如下結構:
工程結構,根據自己需要編譯,一般隻需要編譯librdkafka工程即可。由于項目中使用到了librdkafkacpp的dll,是以我編譯了紅色框内的所有工程。
2、工程設定
根據自己需要選擇dll或者lib檔案,後面我選擇的都是lib庫。添加openssl的頭檔案和lib檔案,添加cyrus-sasl 的庫檔案(前面安裝得到的lib檔案),如圖示:
1)添加openssl的頭檔案
2)添加openssl和sasl的lib庫檔案
3、編譯librdkafka之後,根據需要再編譯其他的即可
到此,windows平台下支援sasl認證的librdkafka庫編譯完成了。
三、題外
由于項目中使用到了librdkafka中自帶的dll再次封裝庫librdkafkacpp,檢視了一下封裝,對于使用的人來感覺說并不是很友好,需要拿到封裝後的細節。測試demo:
#include "rdkafkacpp.h"
#include <iostream>
#include <thread>
using namespace std;
int consumer()
{
std::string strErr;
std::string strBootstrp = "172.20.36.103:9092";
std::string strProtocol = "sasl_plaintext";
std::string strMechanisms = "PLAIN";
std::string strUsername = "alice";
std::string strPassword = "geting";
std::string strTopic = "test";
std::string strGrouId = "group2";
std::string strApiVersion = "true";
std::string strOffset = "earliest";
vector<string> vectTopics;
vectTopics.push_back(strTopic);
//建立一個配置
RdKafka::Conf* pConf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (RdKafka::Conf::CONF_OK != pConf->set("bootstrap.servers", strBootstrp, strErr))
{
cout << "bootstrap.servers 初始化失敗" << endl;
}
if (RdKafka::Conf::CONF_OK != pConf->set("security.protocol", strProtocol, strErr))
{
cout << "security.protocol 初始化失敗" << endl;
}
if (RdKafka::Conf::CONF_OK != pConf->set("sasl.mechanisms", strMechanisms, strErr))
{
cout << "sasl.mechanisms 初始化失敗" << endl;
}
if (RdKafka::Conf::CONF_OK != pConf->set("sasl.username", strUsername, strErr))
{
cout << "sasl.username 初始化失敗" << endl;
}
if (RdKafka::Conf::CONF_OK != pConf->set("sasl.password", strPassword, strErr))
{
cout << "sasl.password 初始化失敗" << endl;
}
if (RdKafka::Conf::CONF_OK != pConf->set("group.id", strGrouId, strErr))
{
cout << "group.id 初始化失敗" << endl;
}
if (RdKafka::Conf::CONF_OK != pConf->set("api.version.request", strApiVersion, strErr))
{
cout << "api.version.request 初始化失敗" << endl;
}
if (RdKafka::Conf::CONF_OK != pConf->set("auto.offset.reset", strOffset, strErr))
{
cout << "auto.offset.reset 初始化失敗" << endl;
}
RdKafka::KafkaConsumer* pConsumer = RdKafka::KafkaConsumer::create(pConf, strErr);
if (nullptr == pConsumer)
{
cout << "消費對象初始化失敗,錯誤資訊:" << strErr << endl;
return -1;
}
RdKafka::ErrorCode err = pConsumer->subscribe(vectTopics);
if (RdKafka::ErrorCode::ERR_NO_ERROR != err)
{
cout << "訂閱主題失敗,錯誤碼:" << err << endl;
return -1;
}
while (true)
{
RdKafka::Message *msg = pConsumer->consume(1000);
if (RdKafka::ErrorCode::ERR_NO_ERROR != msg->err())
{
if (RdKafka::ErrorCode::ERR__TIMED_OUT != msg->err())
{
cout << "消費逾時,錯誤碼:" << msg->err() << ", " << msg->errstr() << endl;
}
this_thread::sleep_for(chrono::milliseconds(5));
continue;
}
cout << "\nkey:" << msg->key()
<< "\nkey_len:" << msg->key_len()
<< "\nmsg_len:" << msg->len()
<< "\noffset:" << msg->offset()
<< "\npayload:" << msg->payload()
<< "\ntopic_name:" << msg->topic_name()
<< "\npartion::" << msg->partition() << endl;
this_thread::sleep_for(chrono::seconds(1));
}
}
int main()
{
//消費
consumer();
system("pause");
return 0;
}
推薦另一個同樣是開源的,并且在librdkafka開源例子中推薦的一個封裝,此處擷取源碼,從例子上看比較友好。