天天看點

librdkafka consumer封裝的一點總結

關于librdkafka producer可以看這裡

consumer相較于producer需要注意的問題就少得多了

首先是初始化

string errstr = "";
		unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
		unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
		/*設定broker list*/
		if (conf->set("metadata.broker.list", config_.brokers, errstr) != RdKafka::Conf::CONF_OK) {
			//std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
		}

		/*設定group id*/
		if (conf->set("group.id", config_.group_id, errstr) != RdKafka::Conf::CONF_OK) {
			//std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
		}

		//關閉自動送出offset,改送出offset為手動送出
		/*
		if (conf->set("enable.auto.offset.store", "false", errstr) != RdKafka::Conf::CONF_OK) {
			fprintf(stderr, "Failed to set enable.auto.offset.store: %s\n", errstr.c_str());
		}
		*/
		/*設定回調*/
		conf->set("event_cb", &event_cb_, errstr);
		conf->set("rebalance_cb", &rebalance_cb_, errstr);
		conf->set("default_topic_conf", tconf.get(), errstr);
		consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
		if (!consumer_) {
			printf_s("KafkaConsumer create failed: tag[%s], error[%s]\n", config_.tag, errstr.c_str());
			return 0;
		}
		vector<string> topics = { config_.topic };
		RdKafka::ErrorCode err = consumer_->subscribe(topics);
		if (err) {
			//std::cerr << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl;
			printf_s("KafkaConsumer subscribe failed: tag[%s], error[%s]\n", config_.tag, RdKafka::err2str(err).c_str());
			consumer_.reset();
			return 0;
		}
           

為了訂閱topic所有分區的資料,這裡我用的是RdKafka::KafkaConsumer,librdkafka的進階消費者,如上所示配置好kafka對應的brokers、group_id和topics即可。

如果配置為手動送出,需要在消費之後調用commitAsync或者commitSync,通知伺服器儲存offset

實踐中調用consume過于頻繁會導緻伺服器連接配接中斷,如果此時offset未儲存,重新開啟的時候消費到重複資料,這點需要注意一下。

負載均衡的實作

class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
	ExampleRebalanceCb(const KafkaReaderConf& config) :config_(config) {};
private:
	static string part_list_print(const std::vector<RdKafka::TopicPartition*>&partitions) {
		string str;
		char tmp[128];
		for (unsigned int i = 0; i < partitions.size(); i++)
		{
			sprintf_s(tmp, sizeof(tmp),"%s[%d],", partitions[i]->topic().c_str(), partitions[i]->partition());
			str.append(tmp);
		}
		return str;
	}

public:
	void rebalance_cb(RdKafka::KafkaConsumer *consumer,
		RdKafka::ErrorCode err,
		std::vector<RdKafka::TopicPartition*> &partitions) {
		string part_list = part_list_print(partitions);
		LogLocalText(LOG_LOCAL_ERROR_DEBUG,"RdKafka RebalanceCb [%s]:%s, Partitions %s", config_.tag, RdKafka::err2str(err).c_str(), part_list.c_str());
		if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
			consumer->assign(partitions);
		}
		else {
			consumer->unassign();
		}
	}
private:
	KafkaReaderConf config_;
};
           

需要在consumer的config初始化時,注冊RdKafka::RebalanceCb,實作方法如上,在每次回調時,重新設定consumer的offset。這樣當啟用多個consumer的時候,consumer會自動配置設定分區和對應的offset。注意consumer個數最好是分區的整數倍,超過分區個數時多餘的consumer會消費不到資料。