關于librdkafka producer可以看這裡
consumer相較于producer需要注意的問題就少得多了
首先是初始化
string errstr = "";
unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
/*設定broker list*/
if (conf->set("metadata.broker.list", config_.brokers, errstr) != RdKafka::Conf::CONF_OK) {
//std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
}
/*設定group id*/
if (conf->set("group.id", config_.group_id, errstr) != RdKafka::Conf::CONF_OK) {
//std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
}
//關閉自動送出offset,改送出offset為手動送出
/*
if (conf->set("enable.auto.offset.store", "false", errstr) != RdKafka::Conf::CONF_OK) {
fprintf(stderr, "Failed to set enable.auto.offset.store: %s\n", errstr.c_str());
}
*/
/*設定回調*/
conf->set("event_cb", &event_cb_, errstr);
conf->set("rebalance_cb", &rebalance_cb_, errstr);
conf->set("default_topic_conf", tconf.get(), errstr);
consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
if (!consumer_) {
printf_s("KafkaConsumer create failed: tag[%s], error[%s]\n", config_.tag, errstr.c_str());
return 0;
}
vector<string> topics = { config_.topic };
RdKafka::ErrorCode err = consumer_->subscribe(topics);
if (err) {
//std::cerr << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl;
printf_s("KafkaConsumer subscribe failed: tag[%s], error[%s]\n", config_.tag, RdKafka::err2str(err).c_str());
consumer_.reset();
return 0;
}
為了訂閱topic所有分區的資料,這裡我用的是RdKafka::KafkaConsumer,librdkafka的進階消費者,如上所示配置好kafka對應的brokers、group_id和topics即可。
如果配置為手動送出,需要在消費之後調用commitAsync或者commitSync,通知伺服器儲存offset
實踐中調用consume過于頻繁會導緻伺服器連接配接中斷,如果此時offset未儲存,重新開啟的時候消費到重複資料,這點需要注意一下。
負載均衡的實作
class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
ExampleRebalanceCb(const KafkaReaderConf& config) :config_(config) {};
private:
static string part_list_print(const std::vector<RdKafka::TopicPartition*>&partitions) {
string str;
char tmp[128];
for (unsigned int i = 0; i < partitions.size(); i++)
{
sprintf_s(tmp, sizeof(tmp),"%s[%d],", partitions[i]->topic().c_str(), partitions[i]->partition());
str.append(tmp);
}
return str;
}
public:
void rebalance_cb(RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
string part_list = part_list_print(partitions);
LogLocalText(LOG_LOCAL_ERROR_DEBUG,"RdKafka RebalanceCb [%s]:%s, Partitions %s", config_.tag, RdKafka::err2str(err).c_str(), part_list.c_str());
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
}
else {
consumer->unassign();
}
}
private:
KafkaReaderConf config_;
};
需要在consumer的config初始化時,注冊RdKafka::RebalanceCb,實作方法如上,在每次回調時,重新設定consumer的offset。這樣當啟用多個consumer的時候,consumer會自動配置設定分區和對應的offset。注意consumer個數最好是分區的整數倍,超過分區個數時多餘的consumer會消費不到資料。