天天看点

依据kafka已配置的分区数量来配置消费者对象数量

本文开发环境:windows 10,c# ,rdkafka开发库

kafka主题的消费者对象数量是个重要指标,消费者数量与主题分区一一对应,如果消费者个数多了,多余的消费者对象不能发挥作用,浪费了系统资源;如果消费者个数少了,就达不到发挥kafka最大的性能。因此,消费者对象个数的确定,也就是主题分区个数的确定,是一个重要的工作。至于设置多少个主题分区,答案是:根据需要设置。意思是根据项目规模,系统运行环境等进行综合测试得到,每个系统都不同。需要自行测试后确定最佳主题分区数量。本文说明的内容,是在确定主题分区数量后,进行设置消费者对象数量的方法。

消费者对象消费者对象个数的获取可以有2种方式:

(1)kafka每个主题的分区个数,用kafkamanager等工具来可视化设置管理。项目系统中,可以在系统的配置文件中指定相同的消费者个数,但这样未免不够自动化,而且对维护人员的要求也比较高。因此,项目系统启动时,可以通过查询kafka的每个主题的当前分区数量,来决定创建系统创建的消费者个数。这种方式的优势是充分利用了kafka的可视化管理工具对kafka进行维护,项目系统中没有实现对主题分区的管理功能。本文描述的是这种场景的应用。

(2)在配置文件中指定了主题的分区数量,那么项目系统启动时,要依据配置文件配置的分区数量,自动配置主题的分区数量,然后再创建与分区个数相同的消费者对象个数。这种方式需要项目系统自行开发编写分区创建管理功能。这种场景今后再讨论。

对于第一个方式,项目系统自动根据kafka主题的分区个数,创建消费者对象个数,最关键的就是获取主题分区的数量。通过Producer或EventConsumer对象的Metadata接口获取主题及分区信息。代码如下:

using RdKafka;

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace KafkaTest

{

    class Program

    {

        static void Main(string[] args)

        {

            string strKafkaConsumeTopic = "MYTPOPIC";

            Producer p = CreateKafkaProducer(strKafkaConsumeTopic);

            Task<Metadata> metadata = p.Metadata();

            Console.WriteLine("共有{0}个主题,各主题及分区如下:",  metadata.Result.Topics.Count);

            for(int ii=0;ii< metadata.Result.Topics.Count;ii++)

            {

     Console.WriteLine("第{0}个主题名称:{1},分区数:{2}", ii,metadata.Result.Topics[ii].Topic, metadata.Result.Topics[ii].Partitions.Count);

            }

        }

        public static Producer CreateKafkaProducer(string strTopic)

        {

            string strKafkaBrboker = "127.0.0.1";

            int intKafkaPort = 9092;

            Producer p = null;

            try

            {

                p = new Producer(strKafkaBrboker + ":" + intKafkaPort.ToString());

            }

            catch (Exception e)

            {

                System.Console.WriteLine(string.Format("创建kafka Producer对象错误:{0}", e.Message));

                return null;

            }

            return p;

        }

    }

}

Producer对象的Metadata接口数据定义在Rdkafka::Handle类中,Handle类中还有其他相关数据,如客户端分组数据。Handle定义如下:

namespace RdKafka

{

    public class Handle : IDisposable

    {

        public Handle();

        public string Name { get; }

        public string MemberId { get; }

        public long OutQueueLength { get; }

        public int LogLevel { set; }

        public event EventHandler<ErrorArgs> OnError;

        public event EventHandler<string> OnStatistics;

        public virtual void Dispose();

        public Task<GroupInfo> ListGroup(string group, TimeSpan timeout);

        public Task<List<GroupInfo>> ListGroups(TimeSpan timeout);

        public Task<Metadata> Metadata(bool allTopics = true, Topic onlyForTopic = null, bool includeInternal = false, TimeSpan timeout = default(TimeSpan));

        public Task<Offsets> QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout = default(TimeSpan));

        public struct ErrorArgs

        {

            public ErrorCode ErrorCode { get; set; }

            public string Reason { get; set; }

        }

    }

}