天天看点

Kafka请求队列源码实现-RequestChannel请求通道(中)响应(Response)

ApiVersions请求作用

当Broker接收到ApiVersionsRequest,它会返回Broker当前支持的请求类型列表,包括请求类型名称、支持的最早版本号和最新版本号。查看Kafka的bin目录,能找到kafka-broker-api-versions.sh脚本工具。它就是,构造ApiVersionsRequest对象,然后发送给对应的Broker。

若是ApiVersions类型请求,代码中为什么要判断一下它的版本呢?

和处理其他类型请求不同,Kafka必须保证版本号比最新支持版本还要高的ApiVersions请求也能被处理。这主要是考虑客户端和服务器端版本兼容。客户端发请求给Broker,可能不知道Broker到底支持哪些版本请求,它需使用ApiVersionsRequest去获取完整请求版本支持列表。若不做该判断,Broker可能无法处理客户端发送的ApiVersionsRequest。

metrics

metrics是Request相关的各种监控指标的一个管理类。它构建了一个Map,封装了所有请求JMX指标。

响应(Response)

定义了与Request对应的各类响应。

类设计

  • Response

    定义Response的抽象父类。每个Response对象都包含对应Request对象。该类核心方法onComplete,用来实现每类Response被处理后需要执行的回调逻辑。

  • SendResponse

    大多数Request处理完成后都需执行一段回调,SendResponse即保存返回结果的Response子类。核心字段onCompletionCallback,即指定处理完成之后的回调逻辑。

正常需要发送Response。
  • NoResponse

    有些Request处理完成后无需单独执行额外的回调逻辑。NoResponse就是为这类Response准备的。

无需发送Response。
  • CloseConnectionResponse

    出错后需要关闭TCP连接的场景,此时返回CloseConnectionResponse给Request发送方,显式地通知它关闭连接。

标识关闭连接通道的Response。

StartThrottlingResponse

通知Broker的Socket Server组件(后面几节课我会讲到它)某个TCP连接通信通道开始被限流(throttling)。

EndThrottlingResponse

与StartThrottlingResponse对应,通知Broker的SocketServer组件某个TCP连接通信通道的限流已结束。

后两个Response类不常用,仅在对Socket连接进行限流时,才会使用。

Response代码

abstract class Response(val request: Request) {
  locally {
    val nowNs = Time.SYSTEM.nanoseconds
    request.responseCompleteTimeNanos = nowNs
    if (request.apiLocalCompleteTimeNanos == -1L)
      request.apiLocalCompleteTimeNanos = nowNs
  }
  def processor: Int = request.processor
  def responseString: Option[String] = Some("")
  def onComplete: Option[Send => Unit] = None
  override def toString: String
}      

该抽象类只有一个属性字段:request。即每个Response对象都要保存它对应的Request对象。

onComplete方法是调用指定回调逻辑的地方。

SendResponse类就重写了该方法:

class SendResponse(request: Request,
                     val responseSend: Send,
                     val responseAsString: Option[String],
                     val onCompleteCallback: Option[Send => Unit]) 
  extends Response(request) {
    ......
    // 指定输入参数onCompleteCallback
    override def onComplete: Option[Send => Unit] = onCompleteCallback
}      

onComplete方法把函数赋值给另一个函数,并作为结果返回。好处在于可以灵活变更onCompleteCallback实现不同回调逻辑。