ApiVersions请求作用
当Broker接收到ApiVersionsRequest,它会返回Broker当前支持的请求类型列表,包括请求类型名称、支持的最早版本号和最新版本号。查看Kafka的bin目录,能找到kafka-broker-api-versions.sh脚本工具。它就是,构造ApiVersionsRequest对象,然后发送给对应的Broker。
若是ApiVersions类型请求,代码中为什么要判断一下它的版本呢?
和处理其他类型请求不同,Kafka必须保证版本号比最新支持版本还要高的ApiVersions请求也能被处理。这主要是考虑客户端和服务器端版本兼容。客户端发请求给Broker,可能不知道Broker到底支持哪些版本请求,它需使用ApiVersionsRequest去获取完整请求版本支持列表。若不做该判断,Broker可能无法处理客户端发送的ApiVersionsRequest。
metrics
metrics是Request相关的各种监控指标的一个管理类。它构建了一个Map,封装了所有请求JMX指标。
响应(Response)
定义了与Request对应的各类响应。
类设计
-
Response
定义Response的抽象父类。每个Response对象都包含对应Request对象。该类核心方法onComplete,用来实现每类Response被处理后需要执行的回调逻辑。
-
SendResponse
大多数Request处理完成后都需执行一段回调,SendResponse即保存返回结果的Response子类。核心字段onCompletionCallback,即指定处理完成之后的回调逻辑。
正常需要发送Response。
-
NoResponse
有些Request处理完成后无需单独执行额外的回调逻辑。NoResponse就是为这类Response准备的。
无需发送Response。
-
CloseConnectionResponse
出错后需要关闭TCP连接的场景,此时返回CloseConnectionResponse给Request发送方,显式地通知它关闭连接。
标识关闭连接通道的Response。
StartThrottlingResponse
通知Broker的Socket Server组件(后面几节课我会讲到它)某个TCP连接通信通道开始被限流(throttling)。
EndThrottlingResponse
与StartThrottlingResponse对应,通知Broker的SocketServer组件某个TCP连接通信通道的限流已结束。
后两个Response类不常用,仅在对Socket连接进行限流时,才会使用。
Response代码
abstract class Response(val request: Request) {
locally {
val nowNs = Time.SYSTEM.nanoseconds
request.responseCompleteTimeNanos = nowNs
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = nowNs
}
def processor: Int = request.processor
def responseString: Option[String] = Some("")
def onComplete: Option[Send => Unit] = None
override def toString: String
}
该抽象类只有一个属性字段:request。即每个Response对象都要保存它对应的Request对象。
onComplete方法是调用指定回调逻辑的地方。
SendResponse类就重写了该方法:
class SendResponse(request: Request,
val responseSend: Send,
val responseAsString: Option[String],
val onCompleteCallback: Option[Send => Unit])
extends Response(request) {
......
// 指定输入参数onCompleteCallback
override def onComplete: Option[Send => Unit] = onCompleteCallback
}
onComplete方法把函数赋值给另一个函数,并作为结果返回。好处在于可以灵活变更onCompleteCallback实现不同回调逻辑。