天天看點

Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)

Kafka如何實作請求隊列

  • 核心類
  • Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)

Kafka服務端,即Broker,負責消息的持久化,是個不斷接收外部請求、處理請求,然後發送處理結果的Java程序。

Broker的高處理性能在于高效儲存排隊中的請求。

Broker底層請求對象的模組化

請求隊列的實作原理

Broker請求處理方面的核心監控名額。

Broker與Clients主要基于Request/Response機制互動,是以看看如何模組化或定義Request和Response。

請求(Request)

定義了Kafka Broker支援的各類請求。

  • RequestChannel#Request
  • Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)
  • trait關鍵字類似于Java的interface。從代碼中,我們可以知道,

ShutdownRequest隻做标志位。當Broker程序關閉時,RequestHandler會發送ShutdownRequest到專屬請求處理線程。該線程接收到此請求後,會主動觸發Broker關閉流程。

Request才是真正的定義各類Clients端或Broker端請求的實作類。

屬性

processor

Processor線程的序号,即該請求由哪個Processor線程接收處理。

  • Broker端參數

    num.network.threads

    控制Broker每個監聽器上建立的Processor線程數
  • Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)
  • 假設listeners配置為PLAINTEXT://localhost:9092,SSL://localhost:9093,則預設情況下Broker啟動時會建立6個Processor線程,每3個為一組,分别給listeners參數中設定的兩個監聽器使用,每組的序号分别是0、1、2。

為什麼儲存Processor線程式号?

當Request被後面的I/O線程處理完成後,還要依靠Processor線程發送Response給請求方,是以,Request必須記錄它之前被哪個Processor線程接收。

Processor線程隻是網絡接收線程,并不會執行真正的I/O線程才負責的Request請求處理邏輯。

context

  • 用于辨別請求上下文資訊,RequestContext類維護Request的所有上下文資訊。
  • Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)

RequestContext類

Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)

startTimeNanos

維護Request對象被建立的時間,用于計算各種時間統計名額。

請求對象中的很多JMX(Java Management Extensions)名額,特别是時間類統計名額,都需要startTimeNanos字段,納秒機關的時間戳資訊,可實作細粒度時間統計精度。

memoryPool

一個非阻塞式記憶體緩沖區,用于避免Request對象無限使用記憶體。

記憶體緩沖區的接口類MemoryPool,實作類SimpleMemoryPool。可重點關注下SimpleMemoryPool#tryAllocate,怎麼為Request對象配置設定記憶體。

@Override
    public ByteBuffer tryAllocate(int sizeBytes) {
        if (sizeBytes < 1)
            throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
        if (sizeBytes > maxSingleAllocationSize)
            throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);

        long available;
        boolean success = false;
        //in strict mode we will only allocate memory if we have at least the size required.
        //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
        //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
        long threshold = strict ? sizeBytes : 1;
        while ((available = availableMemory.get()) >= threshold) {
            success = availableMemory.compareAndSet(available, available - sizeBytes);
            if (success)
                break;
        }

        if (success) {
            maybeRecordEndOfDrySpell();
        } else {
            if (oomTimeSensor != null) {
                startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
            }
            log.trace("refused to allocate buffer of size {}", sizeBytes);
            return null;
        }

        ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
        bufferToBeReturned(allocated);
        return allocated;
    }      

buffer

真正儲存Request對象内容的位元組緩沖區。Request發送方須按Kafka RPC協定規定格式向該緩沖區寫入位元組,否則抛InvalidRequestException。

  • 該邏輯由RequestContext#parseRequest實作。
  • Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)