RequestChannel
實作了Kafka Request隊列。傳輸Request/Response的通道。有了Request和Response的基礎,下面我們可以學習RequestChannel類的實作了。
定義

RequestChannel類實作KafkaMetricsGroup trait,後者封裝許多實用名額監控方法:
-
newGauge
建立數值型監控名額
-
newHistogram
建立直方圖型監控名額
屬性
每個RequestChannel對象執行個體建立時,會定義隊列儲存Broker接收到的各類請求,這個隊列被稱為請求隊列或Request隊列。
Kafka使用Java提供的阻塞隊列ArrayBlockingQueue實作請求隊列,并利用它天然提供的線程安全保證多個線程能夠并發安全高效地通路請求隊列。
代碼中該隊列由變量requestQueue定義。
queueSize
Request隊列的最大長度。當Broker啟動時,SocketServer元件會建立RequestChannel對象
并把Broker端參數
queued.max.requests
指派給queueSize。預設情況每個RequestChannel上的隊列長度500。
processors
封裝RequestChannel的Processor線程池。每個Processor線程負責具體的請求處理邏輯。
Processor管理
剛才的processors即是被建立的Processor線程池,使用Java#ConcurrentHashMap儲存:
Key:processor序号
Value:具體的Processor線程對象
是以目前Kafka Broker端所有網絡線程都是在RequestChannel中維護的。
管理線程池
- RequestChannel中的addProcessor和removeProcessor。
-
Kafka請求隊列源碼實作-RequestChannel請求通道(下)RequestChannel監控名額 - 分别實作增加和移除線程。每當Broker啟動,都會調用addProcessor方法,向RequestChannel對象添加num.network.threads個Processor線程。
num.network.threads這個參數的更新模式(Update Mode)是Cluster-wide,即Kafka允許你動态修改此參數值。比如,Broker啟動時指定num.network.threads為8,之後你通過kafka-configs指令将其修改為3。顯然該操作會減少Processor線程池中的線程數量。在這個場景下,removeProcessor方法會被調用。
處理Request和Response
即收發Request和發送Response。
sendRequest和receiveRequest:
- 發送Request僅是将Request對象置于Request隊列
-
Kafka請求隊列源碼實作-RequestChannel請求通道(下)RequestChannel監控名額 - 接收Request則是從隊列中取出Request
-
Kafka請求隊列源碼實作-RequestChannel請求通道(下)RequestChannel監控名額 - 整個流程其實就是“生産者-消費者”模式,依靠ArrayBlockingQueue的線程安全確定整個過程的線程安全
-
Kafka請求隊列源碼實作-RequestChannel請求通道(下)RequestChannel監控名額 - 沒有所謂的接收Response,隻有發送Response,即sendResponse方法。sendResponse是啥意思呢?其實就是把Response對象發送出去,也就是将Response添加到Response隊列的過程。
-
Kafka請求隊列源碼實作-RequestChannel請求通道(下)RequestChannel監控名額 - 當Processor處理完某個Request後,會把自己的序号封裝進對應的Response對象。
一旦找出之前是由哪個Processor線程處理,代碼直接調用該Processor的enqueueResponse方法,将Response放入Response隊列中,等待後續發送。
監控名額
RequestChannel類定義封裝了與Request隊列相關的重要監控名額,以實時動态地監測Request和Response的性能表現。
具體名額項
object RequestMetrics {
val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
val RequestsPerSec = "RequestsPerSec"
val RequestQueueTimeMs = "RequestQueueTimeMs"
val LocalTimeMs = "LocalTimeMs"
val RemoteTimeMs = "RemoteTimeMs"
val ThrottleTimeMs = "ThrottleTimeMs"
val ResponseQueueTimeMs = "ResponseQueueTimeMs"
val ResponseSendTimeMs = "ResponseSendTimeMs"
val TotalTimeMs = "TotalTimeMs"
val RequestBytes = "RequestBytes"
val MessageConversionsTimeMs = "MessageConversionsTimeMs"
val TemporaryMemoryBytes = "TemporaryMemoryBytes"
val ErrorsPerSec = "ErrorsPerSec"
}
RequestsPerSec
每秒處理的Request數,用來評估Broker的繁忙狀态。
RequestQueueTimeMs
計算Request在Request隊列中的平均等候時間,機關是毫秒。倘若Request在隊列的等待時間過長,你通常需要增加後端I/O線程的數量,來加快隊列中Request的拿取速度。
LocalTimeMs
計算Request實際被處理的時間,機關是毫秒。一旦定位到這個監控項的值很大,你就需要進一步研究Request被處理的邏輯了,具體分析到底是哪一步消耗了過多的時間。
RemoteTimeMs
Kafka的讀寫請求(PRODUCE請求和FETCH請求)邏輯涉及等待其他Broker操作的步驟。RemoteTimeMs計算的,就是等待其他Broker完成指定邏輯的時間。因為等待的是其他Broker,是以被稱為Remote Time。這個監控項非常重要!Kafka生産環境中設定acks=all的Producer程式發送消息延時高的主要原因,往往就是Remote Time高。是以,如果你也碰到了這樣的問題,不妨先定位一下Remote Time是不是瓶頸。
TotalTimeMs
計算Request被處理的完整流程時間。這是最實用的監控名額,沒有之一!畢竟,我們通常都是根據TotalTimeMs來判斷系統是否出現問題的。一旦發現了問題,我們才會利用前面的幾個監控項進一步定位問題的原因。
RequestChannel定義了updateMetrics方法,用于實作監控項的更新