作為一個c++程式,我相信很多人都會使用c++的librdkafkacpp的接口,這些接口通過C++封裝,使用起來友善,不需要去了解底層的c接口的封裝及實作,是最理想的使用方式。
那為啥還要使用C接口呢?
這就涉及到C++的ABI二進制相容問題。
當我們編譯一個C++動态庫,受限于不同的編譯器版本,無論是gcc還是VC,都存在着相容問題。對于一個生産版本,如果在統一了編譯工具鍊的情景下,是可以使用C++版本,但如果考慮到更好的ABI相容,是以就選擇了C庫。其無論在VC還是gcc,都表現得很好。
C庫kafka的消費模式
三種消費模式的異同
librdKafka 提供了三種不同的消費模式,下面将講解這三種模式的差異及如何使用。
// 一次讀取一條資料
rd_kafka_consumer_poll();
rd_kafka_consume();
// 批量讀取,一次可以讀取多條資料
// 通過回調函數的批量讀取,将廢棄
rd_kafka_consume_callback();
rd_kafka_consume_batch();
// 通過Queue路由道特定的處理函數
rd_kafka_consume_callback_queue
rd_kafka_consume_batch_queue
1、這種模式,直接使用rd_kafka_consumer_poll()接口,其可以實作1次讀取一條,這種場景對于資料量不大,可以很好的使用,而且其用戶端API也實作了相關的消息确認和送出,大多數采用這種方式,這種方式的例程,我在前面的博文中有講到,有興趣的可以去看看
2、這種模式是批量讀取的模式,這種模式下有兩種接口,一種基于api的回調
rd_kafka_consume_callback,這個接口根據官方文檔是即将被廢棄的接口,是以不建議使用該接口,建議使用rd_kafka_consume_batch(),後面會給出這種模式下的使用例子。
3、使用Queue的方式,該方式也提供了兩種模式,一種回調,一種直接消費,回調的方式應該在性能上是最高的。這種方式與第二種方式的不同是,他能讓不同topic,不同patition的資料路由到一個隊列,也就是對于一個需要處理多種資料的消費者。
使用批量讀取的注意:
1、消息最好程式中自己做确認送出,這樣在kafka的服務,才不會看到滞後的消息消費
2、注意消息的銷毀,kafka在銷毀對象時會持有引用技術,消費完的消息,要destoy
3、對于topic+partition的消費模式,最好不要在運作熱新增分區或主題,否則用戶端處理起來比較麻煩。如果必要這麼做,要做好測試。保證資料不重複不丢消息。
代碼實作
KafkaConsumer.h
#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H
#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "rdkafka.h"
using namespace std;
struct TKafkaCfgInfo
{
string m_strBrokers;
string m_strGroupID;
string m_strTopics;
int m_nPartition;
int m_nBatchReadSize;
int m_nReadTimeout;
bool m_bIsLogKafka;
TKafkaCfgInfo()
: m_nPartition(-1)
, m_nBatchReadSize(-1)
, m_nReadTimeout(-1)
, m_bIsLogKafka(false)
{}
};
class CKafkaConsumer
{
public:
CKafkaConsumer();
virtual ~CKafkaConsumer();
// 初?始º?化¡¥Kafka配?置?
int InitCfg(const TKafkaCfgInfo& tCfg);
const TKafkaCfgInfo& GetCfg() { return m_tCfg; }
// 拉¤-取¨?Kafka消?息¡é
void pullMessage(rd_kafka_message_t** p, ssize_t& nCnt);
void SetConsumerOk(rd_kafka_message_t* pMsg);
protected:
rd_kafka_t* m_pkafka;
rd_kafka_topic_t* m_pKafkaTopic;
TKafkaCfgInfo m_tCfg;
};
#endif // KAFKACONSUMER_H
// KafkaConsumer.cpp
#include "KafkaConsumer.h"
#include <iostream>
using namespace std;
static void RebalanceCb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque)
{
測a試º?代䨲碼?
#if 0
if (partitions)
{
for (int i = 0; i < partitions->cnt; i++)
{
partitions->elems[i].offset = 0;
}
}
printf("RebalanceCb \r\n");
#endif
printf("RebalanceCb \n");
rd_kafka_assign(rk, partitions);
}
static void EventErrorCb(rd_kafka_t* rk, int err,
const char* reason,
void* opaque)
{
printf("Kafka EventErrorCb(%d): %s\n", err, reason);
}
CKafkaConsumer::CKafkaConsumer()
: m_pkafka(nullptr)
, m_pKafkaTopic(nullptr)
{
}
int CKafkaConsumer::InitCfg(const TKafkaCfgInfo& tCfg)
{
m_tCfg = tCfg;
rd_kafka_conf_t* pConf = rd_kafka_conf_new();
if (!pConf)
{
return -1;
}
char szErr[512] = { 0 };
if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_tCfg.m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
if (rd_kafka_conf_set(pConf, "group.id", m_tCfg.m_strGroupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
// rd_kafka_conf_set_rebalance_cb(pConf, &RebalanceCb);
rd_kafka_conf_set_error_cb(pConf, EventErrorCb);
if (rd_kafka_conf_set(pConf, "enable.auto.commit", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
if (rd_kafka_conf_set(pConf, "enable.auto.offset.store", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
// topic配?置?
/*
rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
/*if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "latest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_topic_conf_destroy(pTopicConf);
return -1;
}*/
// 創ä¡ä建¡§kafka實º¦Ì例¤y
m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
if (!m_pkafka)
{
return -1;
}
m_pKafkaTopic = rd_kafka_topic_new(m_pkafka, m_tCfg.m_strTopics.c_str(), nullptr);
if (!m_pKafkaTopic)
{
return -1;
}
rd_kafka_consume_start(m_pKafkaTopic, m_tCfg.m_nPartition, RD_KAFKA_OFFSET_STORED);
return 0;
}
void CKafkaConsumer::pullMessage(rd_kafka_message_t** pRet, ssize_t& nCnt)
{
nCnt = rd_kafka_consume_batch(m_pKafkaTopic, m_tCfg.m_nPartition, m_tCfg.m_nReadTimeout, pRet, m_tCfg.m_nBatchReadSize);
// rd_kafka_poll(m_pkafka, 0);
}
void CKafkaConsumer::SetConsumerOk(rd_kafka_message_t* pMsg)
{
rd_kafka_resp_err_t errCode = rd_kafka_offset_store(m_pKafkaTopic, m_tCfg.m_nPartition, pMsg->offset);
printf("rd_kafka_offset_store offset = %d\n", pMsg->offset);
if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
{
return;
}
errCode = rd_kafka_commit_message(m_pkafka, pMsg, 0);
if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
{
return;
}
}
CKafkaConsumer::~CKafkaConsumer()
{
if (m_pKafkaTopic)
{
rd_kafka_consume_stop(m_pKafkaTopic, m_tCfg.m_nPartition);
rd_kafka_topic_destroy(m_pKafkaTopic);
m_pKafkaTopic = nullptr;
}
if (m_pkafka)
{
// rd_kafka_consumer_close(m_pkafka); // rd_kafka_destroy調Ì¡Â用®?會¨¢調Ì¡Â用®?rd_kafka_consumer_close
rd_kafka_destroy(m_pkafka);
m_pkafka = nullptr;
}
}
// Test.cpp
#include "stdafx.h"
#include "KafkaConsumer.h"
#include <assert.h>
int _tmain(int argc, _TCHAR* argv[])
{
string brokers = "192.168.254.129:9092";
string strTopic = "test";
string group = "test1";
int nPartition = 0;
CKafkaConsumer consumer;
TKafkaCfgInfo t;
t.m_strBrokers = brokers;
t.m_strGroupID = group;
t.m_strTopics = strTopic;
t.m_nPartition = nPartition;
int nRet = consumer.InitCfg(t);
if (nRet != 0)
{
assert(0);
return 0;
}
int nBatchSize = 10;
rd_kafka_message_t** pRet = new rd_kafka_message_t * [nBatchSize];
while (1)
{
int nTimeout = 1000;
ssize_t nCnt = 0;
consumer.pullMessage(pRet, nCnt);
if (nCnt < 0)
{
fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
continue;
}
if (nCnt == 0)
{
// donothing
continue;
}
for (int nIndex = 0; nIndex < nCnt; nIndex++)
{
rd_kafka_message_t* pMsg = pRet[nIndex];
printf("%s\n", pMsg->payload);
consumer.SetConsumerOk(pMsg);
rd_kafka_message_destroy(pMsg);
}
}
delete[] pRet;
return 0;
}