天天看點

圖解Kafka的服務端的網絡通信模型

作者:石臻臻的雜貨鋪

作者:石臻臻, CSDN部落格之星Top5、Kafka Contributor 、nacos Contributor、華為雲 MVP ,騰訊雲TVP, 滴滴Kafka技術專家 、 KnowStreaming。

KnowStreaming 是滴滴開源的Kafka運維管控平台, 有興趣一起參與參與開發的同學,但是怕自己能力不夠的同學,可以聯系我,當你導師帶你參與開源! 。

  1. Kafka網絡模型使用的是什麼線程模型?
  2. 什麼是ControllerPlane(控制器面闆),什麼是DataPlane(資料面闆)?
  3. Kafka整個請求流程是什麼樣子的
  4. 與Kafka網絡通信相關的配置。

為更好的閱讀體驗,和及時的勘誤

請通路原文連結:圖解Kafka服務端網絡通信模型

1Kafka的網絡模型

Kafka中的網絡模型就是基于 主從Reactor多線程進行設計的, 在整體講述Kafka網絡模型之前,我們現在按照源碼中的相關類來講解一下他們分别都是用來做什麼的.

關鍵類解析

SocketServer

這個類是網絡通信的核心類,它持有這Acceptor和 Processor對象。

ConnectionQuotas

這個是控制連接配接數配額的類,

涉及到的Broker配置有:

屬性 描述 預設 max.connections.per.ip 來自每個IP位址的最大連接配接數。如果達到的限制,則來自該IP位址的新連結将被丢棄。 2147483647 max.connections.per.ip.overrides 針對指定IP或者主機設定最大連接配接數, 可以配置多個,用逗号隔開。例如:“主機名:100,127.0.0.1:200” , 這個配置會覆寫 max.connections.per.ip配置 ”“ max.connections Broker中的最大連接配接數, 當然也可以配置監聽器級别的限制,使用方法是在配置前面加上字首;例如:listener.name.具體的監聽器名稱.max.connections=xxx 。特别需要注意的是,就算Broker已經達到了最大連接配接數的限制了, 也應該允許 broker之間監聽器上的連接配接, 這種情況下,将會關閉另外一個監聽器上最近最少使用的連接配接。broker之間的監聽器是配置 inter.broker.listener.name 決定的 2147483647

AbstractServerThread

AbstractServerThread 類:這是 Acceptor 線程和 Processor 線程的抽象基類,它定義了一個抽象方法wakeup() ,主要是用來喚醒Acceptor 線程和 Processor 對應的Selector的, 當然還有一些共用方法

Acceptor 和 Processor

Acceptor 線程類:繼承自AbstractServerThread, 這是接收和建立外部 TCP 連接配接的線程。每個 SocketServer 執行個體一般會建立一個 Acceptor 線程(如果listeners配置了多個就會建立多個Acceptor)。它的唯一目的就是建立連接配接,并将接收到的 SocketChannel(SocketChannel通道用于傳輸資料) 傳遞給下遊的 Processor 線程處理,Processor主要是處理連接配接之後的事情,例如讀寫I/O。

涉及到的Broker配置有:

屬性 描述 預設 listeners 監聽器配置,可以配置多個,配置了幾個就會建立幾個Acceptor listeners = PLAINTEXT://:9092 socket.send.buffer.bytes SocketServer的 SO_SNDBUF 緩沖區。如果值為 -1,将使用作業系統預設值。 102400(100 kibibytes) socket.receive.buffer.bytes SocketServer sockets 的SO_RCVBUF 緩沖區,如果值為 -1,将使用作業系統預設值 102400 (100 kibibytes) num.network.threads 單個Acceptor建立Processor處理器的線程個數 3

Processor 線程類:這是處理單個 TCP 連接配接上所有請求的處理線程。每個 Acceptor 執行個體建立若幹個(num.network.threads)Processor 線程。Processor 線程負責将接收到的 SocketChannel(SocketChannel通道用于傳輸資料。), 注冊讀寫事件,當資料傳送過來的時候,會立即讀取Request資料,通過解析之後, 然後将其添加到 RequestChannel 的 requestQueue 隊列上,同時還負責将 Response 返還給 Request 發送方。

涉及到的Broker配置有:

屬性 描述 預設 socket.request.max.bytes Socket請求中的最大位元組數。 104857600(100 mebibytes) connections.max.idle.ms processor線程關閉空閑時間超過此值的連接配接 600000 (10 minutes) connection.failed.authentication.delay.ms 這是身份驗證失敗時連接配接關閉延遲的時間(以毫秒為機關)。這必須配置為小于 connections.max.idle.ms 以防止連接配接逾時。 100

簡單畫了一張兩個類之間的關系圖

圖解Kafka的服務端的網絡通信模型

在這裡插入圖檔描述

  1. 這兩個類都是 AbstractServerThead的實作類,超類是Runnable 可運作的。
  2. 每個Acceptor持有num.network.threads個 Processor 線程, 假如配置了多個listeners,那麼總共Processor線程數是 listeners*num.network.threads.
  3. Acceptor 建立的是ServerSocketChannel通道,這個通道是用來監聽新進來的TCP連結的通道,通過serverSocketChannel.accept()方法可以拿到SocketChannel通道用于傳輸資料。
  4. 每個Processor 線程都有一個唯一的id,并且通過Acceptor拿到的SocketChannel會被暫時放入到newConnections隊列中
  5. 每個Processor 都建立了自己的Selector
  6. Processor會不斷的從自身的newConnections隊列裡面擷取新SocketChannel,并注冊讀寫事件,如果有資料傳輸過來,則會讀取資料,并解析成Request請求。

既然兩個都是可執行線程,那我們看看兩個線程的run方法都做了哪些事情

Acceptor.run

def run(): Unit = {
    //将serverChannel 注冊到nioSelector上,并且對 Accept事件感興趣:表示伺服器監聽到了客戶連接配接,那麼伺服器可以接收這個連接配接了
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    try {
      var currentProcessorIndex = 0
      while (isRunning) {
        try {
          //傳回感興趣的事件數量  這裡是感興趣的是SelectionKey.OP_ACCEPT,監聽到新的連結
          val ready = nioSelector.select(500)
          if (ready > 0) {
            //擷取所有就緒通道
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            //周遊所有就緒通道
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                //隻處理   Accept事件,其他的事件則抛出異常,ServerSocketChannel是 監聽Tcp的連結通道
                if (key.isAcceptable) {
                  //根據Key 拿到SocketChannle = serverSocketChannel.accept(),然後再周遊
                  accept(key).foreach { socketChannel =>
                    
                    //将socketChannel配置設定給我們的 processor來處理,如果有多個socketChannel 則按照輪訓配置設定的原則
                    //如果一個processor 中能夠處理的newconnection 隊列滿了放不下了,則找下一個
                    // 如果所有的都放不下,則會一直循環直到有processor能夠處理。

                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      //輪訓每個processors來處理
                      processor = synchronized {
                        // adjust the index (if necessary) and retrieve the processor atomically for
                        // correct behaviour in case the number of processors is reduced dynamically
                        currentProcessorIndex = currentProcessorIndex % processors.length
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          省略
        }
      }
    } finally {
     省略
    }
  }
           
  1. 将ServerSocketChannel通道注冊到nioSelector 上,并關注事件SelectionKey.OP_ACCEPT serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  2. while循環,持續阻塞監聽事件,逾時時間500ms // 阻塞查詢Selector是否有監聽到新的事件

    val ready = nioSelector.select(500)

    // 如果有事件,則查詢具體的事件和通道

    if(ready>0>{

    //擷取所有就緒事件準備處理

    val keys = nioSelector.selectedKeys()

    }

  3. 周遊剛剛監聽到的事件, 如果該SelectionKey不包含OP_ACCEPT(建立連接配接)事件,則抛出異常,通常不會出現這個異常。 Unrecognized key state for acceptor thread
  4. 如果SelectionKey包含OP_ACCEPT(建立連接配接)事件,則可以通過這個SelectionKey拿到serverSocketChannel,通過serverSocketChannel 拿到socketChannel,并且将SocketChannel設定為非阻塞模式。

    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]

    // 調用accept方法就可以拿到ScoketChannel了。

    val socketChannel = serverSocketChannel.accept()

    //設定為非阻塞模式 就可以在異步模式下調用connect(), read() 和write()了。

    socketChannel.configureBlocking(false)

  5. 接下來,把上面拿到的SocketChannel以周遊的形式給Acceptor下面的Procesor, 讓Processor來執行後面的處理。配置設定的展現形式是, 将拿到的SocketChannel儲存在Processor中的newConnections阻塞隊列中,這個newConnections上限是20,在代碼裡面寫死了的,也就是說一個Processor同時最多隻能處理20個連接配接, 那麼所有的Processor能處理的最大連接配接就是Processor數量 * 20;如果你的連接配接請求并發度很高,可以嘗試調大num.network.threads
  6. 最後,如果newConnections隊列放入了一個新的SocketChannel,則會調用一下對應Processor執行個體的wakeup()方法。

Procesor.run

override def run(): Unit = {
    startupComplete()
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          // 将之前監聽到的TCP連結(暫時儲存在newConnections中) 開始注冊監聽OP_READ事件到每個Processor的 KSelector選擇器中。
          configureNewConnections()
          // register any new responses for writing
          processNewResponses()
          //在不阻塞的情況下對每個連接配接執行任何 I/O 操作。這包括完成連接配接、完成斷開連接配接、啟動新發送或在進行中的發送或接收上取得進展。
          // 當此調用完成時,使用者可以使用completedSends() 、 completedReceives() 、 connected() 、 disconnected()檢查已完成的發送、接收、連接配接或斷開連接配接。
          poll()
          // 把請求解析後放到 requestChannels 隊列中,異步處理
          processCompletedReceives()
          //處理已經發送完成的請求
          processCompletedSends()
          processDisconnected()
          closeExcessConnections()
        } catch {
          // We catch all the throwables here to prevent the processor thread from exiting. We do this because
          // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
          // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
          // be either associated with a specific socket channel or a bad request. These exceptions are caught and
          // processed by the individual methods above which close the failing channel and continue processing other
          // channels. So this catch block should only ever see ControlThrowables.
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
  }

           
  1. configureNewConnections(): 之前Acceptor監聽到的SocketChannel儲存在Procesor中的newConnections阻塞隊列中, 現在開始将newConnections阻塞隊列一個個取出來,向Procesor的Selector注冊SocketChannel通道,并且感興趣的事件為SelectionKey.OP_READ讀事件。
  2. processNewResponses() : 去Processor裡面的無邊界阻塞隊列responseQueue裡面擷取RequestChannel.Response資料, 如果有資料并且需要傳回Response的話, 則通過channel傳回資料. 具體的Channel是根據connectionId 擷取之前建構的KafkaChannel, KafkaChannel則會通過監聽SelectionKey.OP_WRITE。然後調用writeTo方法。 至于responseQueue這個隊列是什麼時候入隊的,我們後面再分析
  3. poll(): 這個方法裡面執行的就很多了, 這個方法底層調用的是selector.poll(); 将監聽到的事件批量處理,它才是執行I/O請求的最終地方, 它正對每個連接配接執行任何的I/O操作,這包括了 完成連接配接、完成斷開連接配接、啟動新發送等等。 像校驗身份資訊,還有handshake等等這些也都是在這裡執行的。
  4. processCompletedReceives(): 處理所有completedReceives(已完成接收的請求)進行接下來的處理, 處理的方式是解析一下收到的請求,最終調用了 requestChannel.sendRequest(req). 也就是說所有的請求最終通過解析放入到了RequestChannel中的requestQueue阻塞隊列中, 這個阻塞隊列的大小為queued.max.requests預設500;表示的是在阻塞網絡線程之前,資料平面允許的排隊請求數 PS: 這個completedReceives 是在 poll()方法中添加的元素。
  5. processCompletedSends(): 它負責處理 Response 的回調邏輯,通過周遊completedSends(已完成發送)集合 可以從inflightResponses中移除并拿到response對象,然後再調用回調邏輯。 PS: 這個completedSends 是在 poll()方法中添加的元素。
  6. processDisconnected(): 處理斷開連結的情況, connectionQuotas連接配接限流減掉這個連結,inflightResponses也移除對應連接配接。
  7. closeExcessConnections(): 關閉超限連接配接 ,當總連接配接數 >max.connections && (inter.broker.listener.name!=listener|| listeners 數量==1) 則需要關閉一些連接配接.

    簡單來說就是:就算Broker已經達到了最大連接配接數的限制了, 也應該允許 broker之間監聽器上的連接配接, 這種情況下,将會關閉另外一個監聽器上最近最少使用的連接配接。broker之間的監聽器是配置 inter.broker.listener.name 決定的 所謂優先關閉,是指在諸多 TCP 連接配接中找出最近未被使用的那個。這裡“未被使用”就是說,在最近一段時間内,沒有任何 Request 經由這個連接配接被發送到 Processor 線程。

RequestChannel

這個類儲存這所有的Processor,還有一個阻塞隊列儲存這待處理請求。這個隊列最大長度由queued.max.requests控制,當待處理請求超過這個數值的時候網絡就會阻塞

圖解Kafka的服務端的網絡通信模型

涉及到的Broker配置有:

屬性 描述 預設 queued.max.requests 在阻塞網絡線程之前,DataPlane允許的排隊請求數 500

KafkaApis

具體Request的處理類, 所有的請求方法處理邏輯都放在這個裡面。

KafkaRequestHandlerPool

KafkaRequestHandler的線程池,KafkaRequestHandler線程的數量由配置num.io.threads決定。

圖解Kafka的服務端的網絡通信模型

在這裡插入圖檔描述

涉及到的Broker配置有:

屬性 描述 預設 num.io.threads 伺服器用于處理請求的線程數,可能包括磁盤 I/O 8

KafkaRequestHandler

請求處理類, 每個Handler都會去 requestChannel的requestQueue隊列裡面poll請求, 然後去處理,最終調用的處理方法是KafkaApis.handle()

這幾個類之間的關系如下

圖解Kafka的服務端的網絡通信模型

在這裡插入圖檔描述

通信流程總結

圖解Kafka的服務端的網絡通信模型

在這裡插入圖檔描述

  1. KafkaServer啟動的時候,會根據listeners的配置來初始化對應的執行個體。
  2. 一個listeners對應一個Acceptor,一個Acceptor持有若幹個(num.network.threads)Processor執行個體。
  3. Acceptor 中的nioSelector注冊的是ServerSocketChannel通道,并監聽OP_ACCEPT事件,它隻負責 TCP 建立和連接配接,不包含讀寫資料。
  4. 當Acceptor監聽到新的連接配接之後,就會通過調用socketChannel = serverSocketChannel.accept()拿到SocketChannel,然後把SocketChannel儲存在Processor裡面的newConnection隊列中。 那麼具體儲存在哪個Processor中呢?當然是輪詢配置設定了,確定負載均衡嘛。當然每個Processor的newConnection隊列最大隻有20,并且是代碼寫死的。如果一個Processor滿了,則會尋找下一個存放,如果所有的都滿了,那麼就會阻塞。一個Acceptor的所有Processor最大能夠并發處理的請求是 20 * num.network.threads。
  5. Processor會持續的從自己的newConnection中poll資料,拿到SocketChannel之後,就把它注冊到自己的Selector中,并且監聽事件 OP_READ。 如果newConnection是空的話,poll的逾時時間是 300ms。
  6. 監聽到有新的事件,比較READ,則會讀取資料,并且解析成Request, 把Request放入到 RequestChannel中的requestQueue阻塞隊列中。所有的待處理請求都是臨時放在這裡面。這個隊列也有最大值queued.max.requests(預設500),超過這個大小就會阻塞。
  7. KafkaRequestHandlerPool中建立了很多(num.io.threads(預設8))的KafkaRequestHandler,用來處理Request, 他們都會一直從RequestChannel中的requestQueue隊列中poll新的Request,來進行處理。
  8. 處理Request的具體邏輯是KafkaApis裡面。當Request處理完畢之後,會調用requestChannel.sendResponse()傳回Response。
  9. 當然,請求Request和傳回Response必須是一一對應的, 你這個請求是哪個Processor監聽到的,則需要哪個Processor傳回, 他們通過id來辨別。
  10. Response也不是裡面傳回的,而是先放到Processor中的ResponseQueue隊列中,然後慢慢傳回給用戶端。

資料面闆(DataPlane)

資料面闆是用來處理 Broker與Broker/Client之間的網絡模型子產品, 與之相對的是控制器面闆

控制器面闆 是專門用于Controller與Broker之間的網絡通信子產品。

其實本質上他們都是一模一樣的, 但是為了将Controller的通信和普通通信隔離,才有這麼兩個概念。

上面的網絡通信模型就是以資料面闆來分析的,因為本質是一樣的, 隻是有一些配置不一樣。

那麼.資料面闆 就不詳細講了,我們主要講下控制器面闆的不一樣的地方

控制器面闆(ControllerPlane)

控制器面闆是用來專門處理 Controller相關請求的獨立通信子產品。

大家都知道,Controller是一個很重要的角色,基本上大部分協調整個叢集的相關請求都跟它有關系, 比如建立Topic、删除Topic、分區副本重配置設定、等等。他們都很重要

但是一般情況下資料面闆的請求很多,如果因為請求過多而導緻Controller相關請求被阻塞不能執行,那麼可能會造成一些影響, 是以我們可以讓Controller類的請求有一個單獨的通信子產品。

首先,要啟用控制器面闆,必須配置control.plane.listener.name. 并且這個監聽器名稱必須在listeners裡面有配置

否則的話,是不會專用的控制器連結的EndPoint的。

例如: Broker配置

## 所有的監聽器
isteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

## 監聽器對應的安全協定
listener.security.protocol.map = INTERNAL: PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL

## 控制器
control.plane.listener.name = CONTROLLER
           

在啟動時,代理将開始使用安全協定“SSL”監聽“192.1.1.8:9094”。 在控制器端,當它通過 zookeeper 發現代理釋出的端點時,它将使用 control.plane.listener.name 找到端點,它将用于建立與代理的連接配接。

  1. 必須配置control.plane.listener.name 才能使用獨立的控制器面闆
  2. 控制器面闆的RequestChannel中的requestQueue不是由queued.max.requests控制的,而是寫死的 20. 因為控制類請求不會有那麼大的并發
  3. 跟DataPlane相關隔離,互不影響。但是連接配接限流ConnectionQuotas是共享的,限流的時候,兩個是算在一起的
  4. 控制類面闆隻有一個Acceptor和一個Processor,這個跟資料面闆的差別是 DataPlane的Processor可以有多個。

涉及到的Broker配置有:

屬性 描述 預設 control.plane.listener.name 單獨控制器面闆的監聽器名稱,如果配置了,則Controller相關請求會有獨立的專用通信子產品 空

上面我們主要分析了一下, Kafka中的網絡通信模型, 那麼聰明的你應該肯定能夠看的出來,它是使用線程模型中的 Reactor模式來實作的。

2線程模型: Reactor模式

該子產品詳細請參考Reactor 模型

Reactor 模式,是指通過一個或多個輸入同時傳遞給服務處理器的服務請求的事件驅動處理模式。 服務端程式處理傳入多路請求,并将它們同步分派給請求對應的處理線程,Reactor 模式也叫 Dispatcher 模式。 即 I/O 多路複用統一監聽事件,收到事件後分發(Dispatch 給某程序),是編寫高性能網絡伺服器的必備技術之一。

根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實作:

  1. 單 Reactor 單線程;
  2. 單 Reactor 多線程;
  3. 主從 Reactor 多線程。

我們主要了解一下 主從Reactor 多線程

圖解Kafka的服務端的網絡通信模型

針對單 Reactor 多線程模型中,Reactor 在單線程中運作,高并發場景下容易成為性能瓶頸,可以讓 Reactor 在多線程中運作。

方案說明:

  • Reactor 主線程 MainReactor 對象通過 Select 監控建立連接配接事件,收到事件後通過 Acceptor 接收,處理建立連接配接事件;
  • Acceptor 處理建立連接配接事件後,MainReactor 将連接配接配置設定 Reactor 子線程給 SubReactor 進行處理;
  • SubReactor 将連接配接加入連接配接隊列進行監聽,并建立一個 Handler 用于處理各種連接配接事件;
  • 當有新的事件發生時,SubReactor 會調用連接配接對應的 Handler 進行響應;
  • Handler 通過 Read 讀取資料後,會分發給後面的 Worker 線程池進行業務處理;
  • Worker 線程池會配置設定獨立的線程完成真正的業務處理,如何将響應結果發給 Handler 進行處理;
  • Handler 收到響應結果後通過 Send 将響應結果傳回給 Client。

更詳細的介紹可以看 Reactor 模型

3問答

Kafka的網絡模型使用了Reactor模式的哪種實作方式?
  1. 單 Reactor 單線程;
  2. 單 Reactor 多線程;
  3. 主從 Reactor 多線程。

答案: 3 。 使用了主從Reactor多線程的實作方式.

圖解Kafka的服務端的網絡通信模型

在這裡插入圖檔描述

MainReactor(Acceptor)隻負責監聽OP_ACCEPT事件, 監聽到之後把SocketChannel 傳遞給 SubReactor(Processor), 每個Processor都有自己的Selector。SubReactor會監聽并處理其他的事件,并最終把具體的請求傳遞給KafkaRequestHandlerPool。

很典型的主從Reactor多線程模式。

什麼是ControllerPlane(控制器面闆),什麼是DataPlane(資料面闆)?

控制器面闆: 主要處理控制器類的的請求 資料面闆: 主要處理資料類的請求。

讓他們隔離,互不影響,比如說普通的請求太多,導緻了阻塞, 那麼Controller相關的請求也可能被阻塞了,是以讓他們隔離,不會互相影響。

但是預設情況下, ControllerPlane是沒有設定的,也就是Controller相關的請求還是走的DataPlane。 想要隔離的話必須設定control.plane.listener.name .

  1. 必須配置control.plane.listener.name
  2. 控制器面闆的RequestChannel中的requestQueue不是由queued.max.requests控制的,而是寫死的 20. 因為控制類請求不會有那麼大的并發
  3. 跟DataPlane相關隔離,互不影響。但是連接配接限流ConnectionQuotas是共享的,限流的時候,兩個是算在一起的
  4. 控制類面闆隻有一個Acceptor和一個Processor,這個跟資料面闆的差別是 DataPlane的Processor可以有多個。
Kafka整個請求流程是什麼樣子的

請看上面網絡通信總結部分。