本文开发环境:windows 10,c# ,rdkafka开发库
kafka主题的消费者对象数量是个重要指标,消费者数量与主题分区一一对应,如果消费者个数多了,多余的消费者对象不能发挥作用,浪费了系统资源;如果消费者个数少了,就达不到发挥kafka最大的性能。因此,消费者对象个数的确定,也就是主题分区个数的确定,是一个重要的工作。至于设置多少个主题分区,答案是:根据需要设置。意思是根据项目规模,系统运行环境等进行综合测试得到,每个系统都不同。需要自行测试后确定最佳主题分区数量。本文说明的内容,是在确定主题分区数量后,进行设置消费者对象数量的方法。
消费者对象消费者对象个数的获取可以有2种方式:
(1)kafka每个主题的分区个数,用kafkamanager等工具来可视化设置管理。项目系统中,可以在系统的配置文件中指定相同的消费者个数,但这样未免不够自动化,而且对维护人员的要求也比较高。因此,项目系统启动时,可以通过查询kafka的每个主题的当前分区数量,来决定创建系统创建的消费者个数。这种方式的优势是充分利用了kafka的可视化管理工具对kafka进行维护,项目系统中没有实现对主题分区的管理功能。本文描述的是这种场景的应用。
(2)在配置文件中指定了主题的分区数量,那么项目系统启动时,要依据配置文件配置的分区数量,自动配置主题的分区数量,然后再创建与分区个数相同的消费者对象个数。这种方式需要项目系统自行开发编写分区创建管理功能。这种场景今后再讨论。
对于第一个方式,项目系统自动根据kafka主题的分区个数,创建消费者对象个数,最关键的就是获取主题分区的数量。通过Producer或EventConsumer对象的Metadata接口获取主题及分区信息。代码如下:
using RdKafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace KafkaTest
{
class Program
{
static void Main(string[] args)
{
string strKafkaConsumeTopic = "MYTPOPIC";
Producer p = CreateKafkaProducer(strKafkaConsumeTopic);
Task<Metadata> metadata = p.Metadata();
Console.WriteLine("共有{0}个主题,各主题及分区如下:", metadata.Result.Topics.Count);
for(int ii=0;ii< metadata.Result.Topics.Count;ii++)
{
Console.WriteLine("第{0}个主题名称:{1},分区数:{2}", ii,metadata.Result.Topics[ii].Topic, metadata.Result.Topics[ii].Partitions.Count);
}
}
public static Producer CreateKafkaProducer(string strTopic)
{
string strKafkaBrboker = "127.0.0.1";
int intKafkaPort = 9092;
Producer p = null;
try
{
p = new Producer(strKafkaBrboker + ":" + intKafkaPort.ToString());
}
catch (Exception e)
{
System.Console.WriteLine(string.Format("创建kafka Producer对象错误:{0}", e.Message));
return null;
}
return p;
}
}
}
Producer对象的Metadata接口数据定义在Rdkafka::Handle类中,Handle类中还有其他相关数据,如客户端分组数据。Handle定义如下:
namespace RdKafka
{
public class Handle : IDisposable
{
public Handle();
public string Name { get; }
public string MemberId { get; }
public long OutQueueLength { get; }
public int LogLevel { set; }
public event EventHandler<ErrorArgs> OnError;
public event EventHandler<string> OnStatistics;
public virtual void Dispose();
public Task<GroupInfo> ListGroup(string group, TimeSpan timeout);
public Task<List<GroupInfo>> ListGroups(TimeSpan timeout);
public Task<Metadata> Metadata(bool allTopics = true, Topic onlyForTopic = null, bool includeInternal = false, TimeSpan timeout = default(TimeSpan));
public Task<Offsets> QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout = default(TimeSpan));
public struct ErrorArgs
{
public ErrorCode ErrorCode { get; set; }
public string Reason { get; set; }
}
}
}