天天看點

ZooKeeper到底為Kafka的做了什麼犧牲?(下)3 Broker如何處理用戶端的更新中繼資料請求?4 最佳實踐

3 Broker如何處理用戶端的更新中繼資料請求?

Broker處理所有RPC請求的入口方法

KafkaApis#handleTopicMetadataRequest

  • 處理更新中繼資料的方法
  • ZooKeeper到底為Kafka的做了什麼犧牲?(下)3 Broker如何處理用戶端的更新中繼資料請求?4 最佳實踐
  • handleTopicMetadataRequest(RequestChannel.Request):
def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
    ...
    // 需要擷取哪些topic的中繼資料
    val topics = if (metadataRequest.isAllTopics)
      metadataCache.getAllTopics()
    // 不會披露未經Describe授權的主題的存在,是以甚至都沒有檢查它們是否存在
    val unauthorizedForDescribeTopicMetadata =
      // 對于所有主題,請勿包括未經授權的主題
      // 在舊版本的協定中,每次都擷取所有主題的中繼資料
      if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
        Set.empty[MetadataResponseTopic]
      else
        unauthorizedForDescribeTopics.map(topic =>
          metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList()))
        // 從中繼資料緩存過濾出相關主題的中繼資料
        getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
          errorUnavailableEndpoints, errorUnavailableListeners)

    var clusterAuthorizedOperations = Int.MinValue
    if (request.header.apiVersion >= 8) {
      // 擷取叢集授權的操作
      if (metadataRequest.data.includeClusterAuthorizedOperations) {
        ...
      // 擷取主題授權操作
      if (metadataRequest.data.includeTopicAuthorizedOperations) {
        ...
    val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
    // 擷取所有Broker清單
    val brokers = metadataCache.getAliveBrokers
    // 建構Response并發送
    sendResponseMaybeThrottle(request, requestThrottleMs =>
    ...
  }      

先根據請求中的topic清單

去本地中繼資料緩存MetadataCache中過濾出相應主題的中繼資料,即 topics 子樹的子集

然後再去本地中繼資料緩存中擷取所有Broker的集合, 即 ids 子樹

最後把這兩部分合在一起,作為響應傳回給用戶端。

Kafka在每個Broker中都維護了一份和zk中一樣的中繼資料緩存,并非每次client請求中繼資料就去讀一次zk。由于zk的Watcher機制,Kafka可感覺到zk中的中繼資料變化,進而及時更新Broker的中繼資料緩存。

4 最佳實踐

目前Kafka叢集的可用性高度耦合zk,若zk叢集不能提供服務,整個Kafka叢集就無法服務了,Kafka的開發者也意識到了這個問題,目前正在讨論開發一個中繼資料服務來替代 zk。

若需部署大規模Kafka叢集,推薦拆分成多個互相獨立的小叢集部署,每個小叢集都使用一組獨立的zk提供服務。這樣,每個zk中存儲的資料相對較少,且若某zk叢集異常,隻會影響一個小Kafka叢集,盡量減小了影響範圍。

參考

https://www.bcoder.top/2019/12/14/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8BKafka%E5%8D%8F%E8%B0%83%E6%9C%8D%E5%8A%A1ZooKeeper/