天天看點

【kafka】__consumer_offsets部分分區異常導緻消費不到資料問題排查一、問題描述二、簡單分析三、深入分析四、回到現網五、參考資料

記一次kafka消費異常問題的排查

文章目錄

  • 一、問題描述
    • 問題描述
  • 二、簡單分析
    • 1、describe對應消費組
    • 2、問題搜尋
  • 三、深入分析
    • 1、開啟用戶端debug日志
    • 2、服務端如何響應請求
      • 請求對應的入口函數
      • handleGroupCoordinatorRequest邏輯
      • partitionFor相關的邏輯:
      • offsetsTopicMetadata的邏輯
  • 四、回到現網
    • 1、`__consumer_offsets`分區資訊驗證
    • 2、問題複現
    • 3、問題思考
  • 五、參考資料

一、問題描述

問題描述

部分消費組無法通過broker(new-consumer)正常消費資料,更改消費組名後恢複正常。

group名(可能涉及業務資訊,group名非真實名):

  • group1-打馬賽克

  • group2-打馬賽克

kafka版本:

0.9.0.1

二、簡單分析

1、describe對應消費組

describe對應消費組時抛如下異常:

Error while executing consumer group command The group coordinator is not available.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
           

2、問題搜尋

搜尋到業界有類似問題,不過都沒有解釋清楚為什麼出現這種問題,以及如何徹底解決(重新開機不算)!

  • http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html

三、深入分析

日志是程式員的第一手分析資料。Kafka服務端因為現網有大量服務在營運,不适合開啟debug日志,是以我們隻能從用戶端入手。

1、開啟用戶端debug日志

将用戶端日志等級開成debug級别,發現持續循環地滾動如下日志:

19:52:41.785 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker 43
19:52:41.788 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1587642761788, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]1b68ddbd, request=RequestSend(header={api_key=10,api_version=0,correlation_id=30,client_id=consumer-1}, body={group_id=30cab231-05ed-43ef-96aa-a3ca1564baa3}), createdTimeMs=1587642761785, sendTimeMs=1587642761785), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
19:52:41.875 TKD [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=31,client_id=consumer-1}, body={topics=[topic打馬賽克]}), isInitiatedByNetworkClient, createdTimeMs=1587642761875, sendTimeMs=0) to node 43
           

我們大緻可以看出循環在做着幾件事情(先後不一定準确):

  • 從某個broker

    Issuing group metadata request

  • 擷取

    Group metadata

  • 發起

    metadata request

我們聚焦到擷取

Group metadata

的error關鍵字

responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}

,大緻得出是kafka服務端沒有給出coordinator的node結點資訊。

2、服務端如何響應請求

請求對應的入口函數

首先我們需要檢視

api_key=10

請求對應的服務端源碼:

需要從

kafka.server.KafkaApis

中尋找對應的api接口函數

def handle(request: RequestChannel.Request) {
  ……
        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
  ……
    }
           

handleGroupCoordinatorRequest邏輯

def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
    val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
    val responseHeader = new ResponseHeader(request.header.correlationId)

    if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
    } else {
      val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)

      // get metadata (and create the topic if necessary)
      val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
        // 第一個可能存在的問題:offsetsTopicMetadata的errCode不為空
      val responseBody = if (offsetsTopicMetadata.errorCode != Errors.NONE.code) {
        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
      } else {
        val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata
          .find(_.partitionId == partition)
          .flatMap {
            partitionMetadata => partitionMetadata.leader
          }
        // 第二個可能存在的問題:coordinatorEndpoint為空
        coordinatorEndpoint match {
          case Some(endpoint) =>
            new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
          case _ =>
            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
        }
      }

      trace("Sending consumer metadata %s for correlation id %d to client %s."
        .format(responseBody, request.header.correlationId, request.header.clientId))
      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
    }
  }
           

其中

error_code=15

對應的是

Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code

從源碼不難看出,導緻

Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code

可能點有二:

  • 疑似問題點一:offsetsTopicMetadata的errCode不為空
    offsetsTopicMetadata.errorCode != Errors.NONE.code
               
    offsetsTopicMetadata的errCode不為空,意味着整個

    __consumer_offsets

    中繼資料擷取都有問題。但是現場隻是部分group有問題,這裡出問題的可能性不大。
  • 疑似問題點二:coordinatorEndpoint為空
    val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata
            .find(_.partitionId == partition)
            .flatMap {
              partitionMetadata => partitionMetadata.leader
            }
               

    offsetsTopicMetadata

    擷取到的中繼資料,過濾出

    coordinator.partitionFor(groupCoordinatorRequest.groupId)

    分區的leader。而

    coordinator.partitionFor(groupCoordinatorRequest.groupId)

    正是與group名相關!這裡出問題的可能性極大!

partitionFor相關的邏輯:

def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
           

即取group名的正hashCode模

groupMetadataTopicPartitionCount

(即

__consumer_offsets

對應的分區數)。

注:可能涉及業務資訊,group名非真實名。而結果是正式group名算出的結果。

scala> "group1-打馬賽克".hashCode % 50
res2: Int = 43

scala> "group2-打馬賽克".hashCode % 50
res3: Int = 43
           

我們發現2個異常的消費組,其

partitionFor

後的值均為43,我們初步判斷分區可能與

__consumer_offsets

的43分區相關!

接下來我們就要看下

offsetsTopicMetadata

相關的邏輯,來确認異常。

offsetsTopicMetadata的邏輯

getOrCreateGroupMetadataTopic

->

metadataCache.getTopicMetadata

->

getPartitionMetadata

private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[PartitionMetadata]] = {
    cache.get(topic).map { partitions =>
      partitions.map { case (partitionId, partitionState) =>
        val topicPartition = TopicAndPartition(topic, partitionId)

        val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
        val maybeLeader = aliveBrokers.get(leaderAndIsr.leader)

        val replicas = partitionState.allReplicas
        val replicaInfo = getAliveEndpoints(replicas, protocol)

        maybeLeader match {
          case None =>
            debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
            new PartitionMetadata(partitionId, None, replicaInfo, Seq.empty[BrokerEndPoint],
              Errors.LEADER_NOT_AVAILABLE.code)

          case Some(leader) =>
            val isr = leaderAndIsr.isr
            val isrInfo = getAliveEndpoints(isr, protocol)

            if (replicaInfo.size < replicas.size) {
              debug("Error while fetching metadata for %s: replica information not available for following brokers %s"
                .format(topicPartition, replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")))

              new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)), replicaInfo, isrInfo, Errors.REPLICA_NOT_AVAILABLE.code)
            } else if (isrInfo.size < isr.size) {
              debug("Error while fetching metadata for %s: in sync replica information not available for following brokers %s"
                .format(topicPartition, isr.filterNot(isrInfo.map(_.id).contains).mkString(",")))
              new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)), replicaInfo, isrInfo, Errors.REPLICA_NOT_AVAILABLE.code)
            } else {
              new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)), replicaInfo, isrInfo, Errors.NONE.code)
            }
        }
      }
    }
  }
           

offsetsTopicMetadata即對于topic下所有leader、replicaInfo、isr正常分區的中繼資料資訊,是以我們判斷

__consumer_offsets

43分區leader、replicaInfo、isr等可能存在異常,導緻

find(_.partitionId == partition)

時找不到根據hashCode取模後對應的分區。

四、回到現網

1、

__consumer_offsets

分區資訊驗證

Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=uncompressed
        Topic: __consumer_offsets       Partition: 0    Leader: 18      Replicas: 18,2,17       Isr: 18
        Topic: __consumer_offsets       Partition: 1    Leader: 19      Replicas: 19,17,18      Isr: 19,18
        Topic: __consumer_offsets       Partition: 2    Leader: 20      Replicas: 20,18,19      Isr: 19,20,18
        Topic: __consumer_offsets       Partition: 3    Leader: 21      Replicas: 21,19,20      Isr: 19,20,21
        Topic: __consumer_offsets       Partition: 4    Leader: 22      Replicas: 22,20,21      Isr: 20,21,22
        Topic: __consumer_offsets       Partition: 5    Leader: 23      Replicas: 23,21,22      Isr: 23,21,22
        Topic: __consumer_offsets       Partition: 6    Leader: 24      Replicas: 24,22,23      Isr: 23,24,22
        Topic: __consumer_offsets       Partition: 7    Leader: 25      Replicas: 25,23,24      Isr: 23,25,24
        Topic: __consumer_offsets       Partition: 8    Leader: 26      Replicas: 26,24,25      Isr: 26,25,24
        Topic: __consumer_offsets       Partition: 9    Leader: 27      Replicas: 27,25,26      Isr: 27,26,25
        Topic: __consumer_offsets       Partition: 10   Leader: 28      Replicas: 28,26,27      Isr: 27,26,28
        Topic: __consumer_offsets       Partition: 11   Leader: 27      Replicas: 0,27,28       Isr: 27,28
        Topic: __consumer_offsets       Partition: 12   Leader: 28      Replicas: 1,28,0        Isr: 28
        Topic: __consumer_offsets       Partition: 13   Leader: -1      Replicas: 2,0,1 Isr: 
        Topic: __consumer_offsets       Partition: 14   Leader: -1      Replicas: 3,1,2 Isr: 
        Topic: __consumer_offsets       Partition: 15   Leader: -1      Replicas: 4,2,3 Isr: 
        Topic: __consumer_offsets       Partition: 16   Leader: -1      Replicas: 5,3,4 Isr: 
        Topic: __consumer_offsets       Partition: 17   Leader: -1      Replicas: 6,4,5 Isr: 
        Topic: __consumer_offsets       Partition: 18   Leader: -1      Replicas: 7,5,6 Isr: 
        Topic: __consumer_offsets       Partition: 19   Leader: 8       Replicas: 8,6,7 Isr: 8
        Topic: __consumer_offsets       Partition: 20   Leader: 9       Replicas: 9,7,8 Isr: 9,8
        Topic: __consumer_offsets       Partition: 21   Leader: 10      Replicas: 10,8,9        Isr: 10,8,9
        Topic: __consumer_offsets       Partition: 22   Leader: 11      Replicas: 11,9,10       Isr: 11,10,9
        Topic: __consumer_offsets       Partition: 23   Leader: 12      Replicas: 12,10,11      Isr: 11,12,10
        Topic: __consumer_offsets       Partition: 24   Leader: 13      Replicas: 13,11,12      Isr: 13,11,12
        Topic: __consumer_offsets       Partition: 25   Leader: 14      Replicas: 14,12,13      Isr: 13,12,14
        Topic: __consumer_offsets       Partition: 26   Leader: 15      Replicas: 15,13,14      Isr: 13,14,15
        Topic: __consumer_offsets       Partition: 27   Leader: 16      Replicas: 16,14,15      Isr: 14,16,15
        Topic: __consumer_offsets       Partition: 28   Leader: 42      Replicas: 17,15,42      Isr: 42,15
        Topic: __consumer_offsets       Partition: 29   Leader: 18      Replicas: 18,17,19      Isr: 19,18
        Topic: __consumer_offsets       Partition: 30   Leader: 19      Replicas: 19,18,20      Isr: 19,20,18
        Topic: __consumer_offsets       Partition: 31   Leader: 20      Replicas: 20,19,21      Isr: 19,20,21
        Topic: __consumer_offsets       Partition: 32   Leader: 21      Replicas: 21,20,22      Isr: 20,21,22
        Topic: __consumer_offsets       Partition: 33   Leader: 22      Replicas: 22,21,23      Isr: 23,21,22
        Topic: __consumer_offsets       Partition: 34   Leader: 23      Replicas: 23,22,24      Isr: 23,24,22
        Topic: __consumer_offsets       Partition: 35   Leader: 24      Replicas: 24,23,25      Isr: 23,25,24
        Topic: __consumer_offsets       Partition: 36   Leader: 25      Replicas: 25,24,26      Isr: 26,25,24
        Topic: __consumer_offsets       Partition: 37   Leader: 26      Replicas: 26,25,27      Isr: 27,26,25
        Topic: __consumer_offsets       Partition: 38   Leader: 27      Replicas: 27,26,28      Isr: 27,26,28
        Topic: __consumer_offsets       Partition: 39   Leader: 28      Replicas: 28,27,0       Isr: 27,28
        Topic: __consumer_offsets       Partition: 40   Leader: 28      Replicas: 0,28,1        Isr: 28
        Topic: __consumer_offsets       Partition: 41   Leader: -1      Replicas: 1,0,2 Isr: 
        Topic: __consumer_offsets       Partition: 42   Leader: -1      Replicas: 2,1,3 Isr: 
        Topic: __consumer_offsets       Partition: 43   Leader: -1      Replicas: 3,2,4 Isr: 
        Topic: __consumer_offsets       Partition: 44   Leader: -1      Replicas: 4,3,5 Isr: 
        Topic: __consumer_offsets       Partition: 45   Leader: -1      Replicas: 5,4,6 Isr: 
        Topic: __consumer_offsets       Partition: 46   Leader: -1      Replicas: 6,5,7 Isr: 
        Topic: __consumer_offsets       Partition: 47   Leader: 8       Replicas: 7,6,8 Isr: 8
        Topic: __consumer_offsets       Partition: 48   Leader: 8       Replicas: 8,7,9 Isr: 9,8
        Topic: __consumer_offsets       Partition: 49   Leader: 9       Replicas: 9,8,10        Isr: 10,9,8
           

43分區果然存在leader異常的情況

2、問題複現

我們使用UUID批量生成消費組名,使其hashCode取模後為異常分區的分區号,再使用其進行消費時均出現消費異常的問題。

3、問題思考

  • 為什麼

    __consumer_offsets

    部分分區會産生leader、replicaInfo、isr異常?

    與網絡抖動和一些叢集操作可能有關,需要具體問題具體分析

  • 如何将

    __consumer_offsets

    異常分區恢複正常?

    這裡不詳細介紹可以參考http://blog.itpub.net/31543630/viewspace-2212467/ 。

五、參考資料

  • Kafka new-consumer設計文檔 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design
  • Kafka無法消費?!我的分布式消息服務Kafka卻穩如泰山! http://blog.itpub.net/31543630/viewspace-2212467/
  • Problem with Kafka 0.9 Client http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html
  • ErrorMapping https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala