天天看点

librdkafka consumer封装的一点总结

关于librdkafka producer可以看这里

consumer相较于producer需要注意的问题就少得多了

首先是初始化

string errstr = "";
		unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
		unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
		/*设置broker list*/
		if (conf->set("metadata.broker.list", config_.brokers, errstr) != RdKafka::Conf::CONF_OK) {
			//std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
		}

		/*设置group id*/
		if (conf->set("group.id", config_.group_id, errstr) != RdKafka::Conf::CONF_OK) {
			//std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
		}

		//关闭自动提交offset,改提交offset为手动提交
		/*
		if (conf->set("enable.auto.offset.store", "false", errstr) != RdKafka::Conf::CONF_OK) {
			fprintf(stderr, "Failed to set enable.auto.offset.store: %s\n", errstr.c_str());
		}
		*/
		/*设置回调*/
		conf->set("event_cb", &event_cb_, errstr);
		conf->set("rebalance_cb", &rebalance_cb_, errstr);
		conf->set("default_topic_conf", tconf.get(), errstr);
		consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
		if (!consumer_) {
			printf_s("KafkaConsumer create failed: tag[%s], error[%s]\n", config_.tag, errstr.c_str());
			return 0;
		}
		vector<string> topics = { config_.topic };
		RdKafka::ErrorCode err = consumer_->subscribe(topics);
		if (err) {
			//std::cerr << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl;
			printf_s("KafkaConsumer subscribe failed: tag[%s], error[%s]\n", config_.tag, RdKafka::err2str(err).c_str());
			consumer_.reset();
			return 0;
		}
           

为了订阅topic所有分区的数据,这里我用的是RdKafka::KafkaConsumer,librdkafka的高级消费者,如上所示配置好kafka对应的brokers、group_id和topics即可。

如果配置为手动提交,需要在消费之后调用commitAsync或者commitSync,通知服务器保存offset

实践中调用consume过于频繁会导致服务器连接中断,如果此时offset未保存,重新开启的时候消费到重复数据,这点需要注意一下。

负载均衡的实现

class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
	ExampleRebalanceCb(const KafkaReaderConf& config) :config_(config) {};
private:
	static string part_list_print(const std::vector<RdKafka::TopicPartition*>&partitions) {
		string str;
		char tmp[128];
		for (unsigned int i = 0; i < partitions.size(); i++)
		{
			sprintf_s(tmp, sizeof(tmp),"%s[%d],", partitions[i]->topic().c_str(), partitions[i]->partition());
			str.append(tmp);
		}
		return str;
	}

public:
	void rebalance_cb(RdKafka::KafkaConsumer *consumer,
		RdKafka::ErrorCode err,
		std::vector<RdKafka::TopicPartition*> &partitions) {
		string part_list = part_list_print(partitions);
		LogLocalText(LOG_LOCAL_ERROR_DEBUG,"RdKafka RebalanceCb [%s]:%s, Partitions %s", config_.tag, RdKafka::err2str(err).c_str(), part_list.c_str());
		if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
			consumer->assign(partitions);
		}
		else {
			consumer->unassign();
		}
	}
private:
	KafkaReaderConf config_;
};
           

需要在consumer的config初始化时,注册RdKafka::RebalanceCb,实现方法如上,在每次回调时,重新设置consumer的offset。这样当启用多个consumer的时候,consumer会自动分配分区和对应的offset。注意consumer个数最好是分区的整数倍,超过分区个数时多余的consumer会消费不到数据。