Kafka如何實作請求隊列
- 核心類
-
Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)
Kafka服務端,即Broker,負責消息的持久化,是個不斷接收外部請求、處理請求,然後發送處理結果的Java程序。
Broker的高處理性能在于高效儲存排隊中的請求。
Broker底層請求對象的模組化
請求隊列的實作原理
Broker請求處理方面的核心監控名額。
Broker與Clients主要基于Request/Response機制互動,是以看看如何模組化或定義Request和Response。
請求(Request)
定義了Kafka Broker支援的各類請求。
- RequestChannel#Request
-
Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request) - trait關鍵字類似于Java的interface。從代碼中,我們可以知道,
ShutdownRequest隻做标志位。當Broker程序關閉時,RequestHandler會發送ShutdownRequest到專屬請求處理線程。該線程接收到此請求後,會主動觸發Broker關閉流程。
Request才是真正的定義各類Clients端或Broker端請求的實作類。
屬性
processor
Processor線程的序号,即該請求由哪個Processor線程接收處理。
- Broker端參數
控制Broker每個監聽器上建立的Processor線程數num.network.threads
-
Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request) - 假設listeners配置為PLAINTEXT://localhost:9092,SSL://localhost:9093,則預設情況下Broker啟動時會建立6個Processor線程,每3個為一組,分别給listeners參數中設定的兩個監聽器使用,每組的序号分别是0、1、2。
為什麼儲存Processor線程式号?
當Request被後面的I/O線程處理完成後,還要依靠Processor線程發送Response給請求方,是以,Request必須記錄它之前被哪個Processor線程接收。
Processor線程隻是網絡接收線程,并不會執行真正的I/O線程才負責的Request請求處理邏輯。
context
- 用于辨別請求上下文資訊,RequestContext類維護Request的所有上下文資訊。
-
Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)
RequestContext類
startTimeNanos
維護Request對象被建立的時間,用于計算各種時間統計名額。
請求對象中的很多JMX(Java Management Extensions)名額,特别是時間類統計名額,都需要startTimeNanos字段,納秒機關的時間戳資訊,可實作細粒度時間統計精度。
memoryPool
一個非阻塞式記憶體緩沖區,用于避免Request對象無限使用記憶體。
記憶體緩沖區的接口類MemoryPool,實作類SimpleMemoryPool。可重點關注下SimpleMemoryPool#tryAllocate,怎麼為Request對象配置設定記憶體。
@Override
public ByteBuffer tryAllocate(int sizeBytes) {
if (sizeBytes < 1)
throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
if (sizeBytes > maxSingleAllocationSize)
throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
long available;
boolean success = false;
//in strict mode we will only allocate memory if we have at least the size required.
//in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
//can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
long threshold = strict ? sizeBytes : 1;
while ((available = availableMemory.get()) >= threshold) {
success = availableMemory.compareAndSet(available, available - sizeBytes);
if (success)
break;
}
if (success) {
maybeRecordEndOfDrySpell();
} else {
if (oomTimeSensor != null) {
startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
}
log.trace("refused to allocate buffer of size {}", sizeBytes);
return null;
}
ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
bufferToBeReturned(allocated);
return allocated;
}
buffer
真正儲存Request對象内容的位元組緩沖區。Request發送方須按Kafka RPC協定規定格式向該緩沖區寫入位元組,否則抛InvalidRequestException。
- 該邏輯由RequestContext#parseRequest實作。
-
Kafka請求隊列源碼實作-RequestChannel請求通道(上)Kafka如何實作請求隊列請求(Request)