天天看点

【Kafka源码】处理请求一、KafkaRequestHandlerPool二、KafkaApis.handle三、Request数据结构

[TOC]

在KafkaServer中的入口在:

首先根据相关参数,实例化KafkaApis,然后实例化KafkaRequestHandlerPool。下面我们首先看下KafkaRequestHandlerPool。

主要是启动了numThreads个数的线程,然后线程中执行的内容是KafkaRequestHandler。

在run方法中,我们可以看到,主要处理消息的地方是api.handle(req)。下面我们主要看下这块的内容。

直接看代码:

这块比较简单,主要的是Request的数据结构,还有后续的处理方法。下面我们逐步来分析。

所有的请求,最终都会变成这个RequestChannel.Request。所以我们先看下这个Request。

主要有几个部分,

首先是requestId,是一个short类型的值。

然后是header,即消息头,是一个RequestHeader

最后是body,是消息的内容,类型为AbstractRequest

这个requestId表示的是api的类型,KafkaApis需要根据这个requestId,来判断调用哪个方法处理消息。

我们看下RequestHeader的结构。

主要是四个变量,apiKey,APIVersion,clientId,correlationId。

消息体,对应的类为AbstractRequest。主要的内容是根据版本号和apiKey来解析出消息的具体内容。

这块的请求类型很多,想要了解具体结构的,可以到每个类中具体看。

继续阅读