天天看点

Kafka请求队列源码实现-RequestChannel请求通道(上)Kafka如何实现请求队列请求(Request)

Kafka如何实现请求队列

  • 核心类
  • Kafka请求队列源码实现-RequestChannel请求通道(上)Kafka如何实现请求队列请求(Request)

Kafka服务端,即Broker,负责消息的持久化,是个不断接收外部请求、处理请求,然后发送处理结果的Java进程。

Broker的高处理性能在于高效保存排队中的请求。

Broker底层请求对象的建模

请求队列的实现原理

Broker请求处理方面的核心监控指标。

Broker与Clients主要基于Request/Response机制交互,所以看看如何建模或定义Request和Response。

请求(Request)

定义了Kafka Broker支持的各类请求。

  • RequestChannel#Request
  • Kafka请求队列源码实现-RequestChannel请求通道(上)Kafka如何实现请求队列请求(Request)
  • trait关键字类似于Java的interface。从代码中,我们可以知道,

ShutdownRequest只做标志位。当Broker进程关闭时,RequestHandler会发送ShutdownRequest到专属请求处理线程。该线程接收到此请求后,会主动触发Broker关闭流程。

Request才是真正的定义各类Clients端或Broker端请求的实现类。

属性

processor

Processor线程的序号,即该请求由哪个Processor线程接收处理。

  • Broker端参数

    num.network.threads

    控制Broker每个监听器上创建的Processor线程数
  • Kafka请求队列源码实现-RequestChannel请求通道(上)Kafka如何实现请求队列请求(Request)
  • 假设listeners配置为PLAINTEXT://localhost:9092,SSL://localhost:9093,则默认情况下Broker启动时会创建6个Processor线程,每3个为一组,分别给listeners参数中设置的两个监听器使用,每组的序号分别是0、1、2。

为什么保存Processor线程序号?

当Request被后面的I/O线程处理完成后,还要依靠Processor线程发送Response给请求方,因此,Request必须记录它之前被哪个Processor线程接收。

Processor线程只是网络接收线程,并不会执行真正的I/O线程才负责的Request请求处理逻辑。

context

  • 用于标识请求上下文信息,RequestContext类维护Request的所有上下文信息。
  • Kafka请求队列源码实现-RequestChannel请求通道(上)Kafka如何实现请求队列请求(Request)

RequestContext类

Kafka请求队列源码实现-RequestChannel请求通道(上)Kafka如何实现请求队列请求(Request)

startTimeNanos

维护Request对象被创建的时间,用于计算各种时间统计指标。

请求对象中的很多JMX(Java Management Extensions)指标,特别是时间类统计指标,都需要startTimeNanos字段,纳秒单位的时间戳信息,可实现细粒度时间统计精度。

memoryPool

一个非阻塞式内存缓冲区,用于避免Request对象无限使用内存。

内存缓冲区的接口类MemoryPool,实现类SimpleMemoryPool。可重点关注下SimpleMemoryPool#tryAllocate,怎么为Request对象分配内存。

@Override
    public ByteBuffer tryAllocate(int sizeBytes) {
        if (sizeBytes < 1)
            throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
        if (sizeBytes > maxSingleAllocationSize)
            throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);

        long available;
        boolean success = false;
        //in strict mode we will only allocate memory if we have at least the size required.
        //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
        //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
        long threshold = strict ? sizeBytes : 1;
        while ((available = availableMemory.get()) >= threshold) {
            success = availableMemory.compareAndSet(available, available - sizeBytes);
            if (success)
                break;
        }

        if (success) {
            maybeRecordEndOfDrySpell();
        } else {
            if (oomTimeSensor != null) {
                startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
            }
            log.trace("refused to allocate buffer of size {}", sizeBytes);
            return null;
        }

        ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
        bufferToBeReturned(allocated);
        return allocated;
    }      

buffer

真正保存Request对象内容的字节缓冲区。Request发送方须按Kafka RPC协议规定格式向该缓冲区写入字节,否则抛InvalidRequestException。

  • 该逻辑由RequestContext#parseRequest实现。
  • Kafka请求队列源码实现-RequestChannel请求通道(上)Kafka如何实现请求队列请求(Request)