天天看点

【Kafka源码】Kafka启动过程

一般来说,我们是通过命令来启动kafka,但是命令的本质还是调用代码中的main方法,所以,我们重点看下启动类Kafka。源码下下来之后,我们也可以通过直接运行Kafka.scala中的main方法(需要指定启动参数,也就是server.properties的位置)来启动Kafka。因为kafka依赖zookeeper,所以我们需要提前启动zookeeper,然后在server.properties中指定zk地址后,启动。

下面我们首先看一下main()方法:

def main(args: Array[String]): Unit = {

}

我们慢慢来分析下,首先是getPropsFromArgs(args),这一行很明确,就是从配置文件中读取我们配置的内容,然后赋值给serverProps。第二步,KafkaServerStartable.fromProps(serverProps),

object KafkaServerStartable {

def fromProps(serverProps: Properties) = {

这块主要是启动了一个内部的监控服务(内部状态监控)。

下面是一个在java中常见的钩子函数,在关闭时会启动一些销毁程序,保证程序安全关闭。之后就是我们启动的重头戏了:kafkaServerStartable.startup。跟进去可以很清楚的看到,里面调用的方法是KafkaServer中的startup方法,下面我们重点看下这个方法(比较长):

def startup() {

首先判断是否目前正在关闭中或者已经启动了,这两种情况直接抛出异常。然后是一个CAS的操作isStartingUp,防止线程并发操作启动,判断是否可以启动。如果可以启动,就开始我们的启动过程。

构造Metrics类

定义broker状态为启动中starting

启动定时器kafkaScheduler.startup()

构造zkUtils:利用参数中的zk信息,启动一个zk客户端

启动文件管理器:读取zk中的配置信息,包含__consumer_offsets和__system.topic__。重点是启动一些定时任务,来删除符合条件的记录(cleanupLogs),清理脏记录(flushDirtyLogs),把所有记录写到一个文本文件中,防止在启动时重启所有的记录文件(checkpointRecoveryPointOffsets)。

/**

Start the background threads to flush logs and do log cleanup

*/

下一步,获取brokerId

启动一个NIO socket服务

启动复制管理器:启动ISR超时处理线程

启动kafka控制器:注册session过期监听器,同时启动控制器leader选举

启动协调器

权限认证

开启线程,开始处理请求

开启配置监听,主要是监听zk节点数据变化,然后广播到所有机器

开启健康检查:目前只是把broker节点注册到zk上,注册成功就是活的,否则就是dead

注册启动数据信息

启动成功

等待关闭countDownLatch,如果shutdownLatch变为0,则关闭Kafka