[TOC]
在KafkaServer中的入口在:
首先根据相关参数,实例化KafkaApis,然后实例化KafkaRequestHandlerPool。下面我们首先看下KafkaRequestHandlerPool。
主要是启动了numThreads个数的线程,然后线程中执行的内容是KafkaRequestHandler。
在run方法中,我们可以看到,主要处理消息的地方是api.handle(req)。下面我们主要看下这块的内容。
直接看代码:
这块比较简单,主要的是Request的数据结构,还有后续的处理方法。下面我们逐步来分析。
所有的请求,最终都会变成这个RequestChannel.Request。所以我们先看下这个Request。
主要有几个部分,
首先是requestId,是一个short类型的值。
然后是header,即消息头,是一个RequestHeader
最后是body,是消息的内容,类型为AbstractRequest
这个requestId表示的是api的类型,KafkaApis需要根据这个requestId,来判断调用哪个方法处理消息。
我们看下RequestHeader的结构。
主要是四个变量,apiKey,APIVersion,clientId,correlationId。
消息体,对应的类为AbstractRequest。主要的内容是根据版本号和apiKey来解析出消息的具体内容。
这块的请求类型很多,想要了解具体结构的,可以到每个类中具体看。