記一次kafka消費異常問題的排查
文章目錄
- 一、問題描述
-
- 問題描述
- 二、簡單分析
-
- 1、describe對應消費組
- 2、問題搜尋
- 三、深入分析
-
- 1、開啟用戶端debug日志
- 2、服務端如何響應請求
-
- 請求對應的入口函數
- handleGroupCoordinatorRequest邏輯
- partitionFor相關的邏輯:
- offsetsTopicMetadata的邏輯
- 四、回到現網
-
- 1、`__consumer_offsets`分區資訊驗證
- 2、問題複現
- 3、問題思考
- 五、參考資料
一、問題描述
問題描述
部分消費組無法通過broker(new-consumer)正常消費資料,更改消費組名後恢複正常。
group名(可能涉及業務資訊,group名非真實名):
-
group1-打馬賽克
-
group2-打馬賽克
kafka版本:
0.9.0.1
二、簡單分析
1、describe對應消費組
describe對應消費組時抛如下異常:
Error while executing consumer group command The group coordinator is not available.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
2、問題搜尋
搜尋到業界有類似問題,不過都沒有解釋清楚為什麼出現這種問題,以及如何徹底解決(重新開機不算)!
- http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html
三、深入分析
日志是程式員的第一手分析資料。Kafka服務端因為現網有大量服務在營運,不适合開啟debug日志,是以我們隻能從用戶端入手。
1、開啟用戶端debug日志
将用戶端日志等級開成debug級别,發現持續循環地滾動如下日志:
19:52:41.785 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker 43
19:52:41.788 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1587642761788, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]1b68ddbd, request=RequestSend(header={api_key=10,api_version=0,correlation_id=30,client_id=consumer-1}, body={group_id=30cab231-05ed-43ef-96aa-a3ca1564baa3}), createdTimeMs=1587642761785, sendTimeMs=1587642761785), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
19:52:41.875 TKD [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=31,client_id=consumer-1}, body={topics=[topic打馬賽克]}), isInitiatedByNetworkClient, createdTimeMs=1587642761875, sendTimeMs=0) to node 43
我們大緻可以看出循環在做着幾件事情(先後不一定準确):
- 從某個broker
Issuing group metadata request
- 擷取
Group metadata
- 發起
metadata request
我們聚焦到擷取
Group metadata
的error關鍵字
responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}
,大緻得出是kafka服務端沒有給出coordinator的node結點資訊。
2、服務端如何響應請求
請求對應的入口函數
首先我們需要檢視
api_key=10
請求對應的服務端源碼:
需要從
kafka.server.KafkaApis
中尋找對應的api接口函數
def handle(request: RequestChannel.Request) {
……
case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
……
}
handleGroupCoordinatorRequest邏輯
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
// 第一個可能存在的問題:offsetsTopicMetadata的errCode不為空
val responseBody = if (offsetsTopicMetadata.errorCode != Errors.NONE.code) {
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
} else {
val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata
.find(_.partitionId == partition)
.flatMap {
partitionMetadata => partitionMetadata.leader
}
// 第二個可能存在的問題:coordinatorEndpoint為空
coordinatorEndpoint match {
case Some(endpoint) =>
new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
case _ =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
}
}
trace("Sending consumer metadata %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
}
其中
error_code=15
對應的是
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
從源碼不難看出,導緻
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
可能點有二:
- 疑似問題點一:offsetsTopicMetadata的errCode不為空
offsetsTopicMetadata的errCode不為空,意味着整個offsetsTopicMetadata.errorCode != Errors.NONE.code
中繼資料擷取都有問題。但是現場隻是部分group有問題,這裡出問題的可能性不大。__consumer_offsets
- 疑似問題點二:coordinatorEndpoint為空
從val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata .find(_.partitionId == partition) .flatMap { partitionMetadata => partitionMetadata.leader }
擷取到的中繼資料,過濾出offsetsTopicMetadata
分區的leader。而coordinator.partitionFor(groupCoordinatorRequest.groupId)
正是與group名相關!這裡出問題的可能性極大!coordinator.partitionFor(groupCoordinatorRequest.groupId)
partitionFor相關的邏輯:
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
即取group名的正hashCode模
groupMetadataTopicPartitionCount
(即
__consumer_offsets
對應的分區數)。
注:可能涉及業務資訊,group名非真實名。而結果是正式group名算出的結果。
scala> "group1-打馬賽克".hashCode % 50
res2: Int = 43
scala> "group2-打馬賽克".hashCode % 50
res3: Int = 43
我們發現2個異常的消費組,其
partitionFor
後的值均為43,我們初步判斷分區可能與
__consumer_offsets
的43分區相關!
接下來我們就要看下
offsetsTopicMetadata
相關的邏輯,來确認異常。
offsetsTopicMetadata的邏輯
getOrCreateGroupMetadataTopic
->
metadataCache.getTopicMetadata
->
getPartitionMetadata
private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[PartitionMetadata]] = {
cache.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
val topicPartition = TopicAndPartition(topic, partitionId)
val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
val maybeLeader = aliveBrokers.get(leaderAndIsr.leader)
val replicas = partitionState.allReplicas
val replicaInfo = getAliveEndpoints(replicas, protocol)
maybeLeader match {
case None =>
debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
new PartitionMetadata(partitionId, None, replicaInfo, Seq.empty[BrokerEndPoint],
Errors.LEADER_NOT_AVAILABLE.code)
case Some(leader) =>
val isr = leaderAndIsr.isr
val isrInfo = getAliveEndpoints(isr, protocol)
if (replicaInfo.size < replicas.size) {
debug("Error while fetching metadata for %s: replica information not available for following brokers %s"
.format(topicPartition, replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")))
new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)), replicaInfo, isrInfo, Errors.REPLICA_NOT_AVAILABLE.code)
} else if (isrInfo.size < isr.size) {
debug("Error while fetching metadata for %s: in sync replica information not available for following brokers %s"
.format(topicPartition, isr.filterNot(isrInfo.map(_.id).contains).mkString(",")))
new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)), replicaInfo, isrInfo, Errors.REPLICA_NOT_AVAILABLE.code)
} else {
new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)), replicaInfo, isrInfo, Errors.NONE.code)
}
}
}
}
}
offsetsTopicMetadata即對于topic下所有leader、replicaInfo、isr正常分區的中繼資料資訊,是以我們判斷
__consumer_offsets
43分區leader、replicaInfo、isr等可能存在異常,導緻
find(_.partitionId == partition)
時找不到根據hashCode取模後對應的分區。
四、回到現網
1、 __consumer_offsets
分區資訊驗證
__consumer_offsets
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=uncompressed
Topic: __consumer_offsets Partition: 0 Leader: 18 Replicas: 18,2,17 Isr: 18
Topic: __consumer_offsets Partition: 1 Leader: 19 Replicas: 19,17,18 Isr: 19,18
Topic: __consumer_offsets Partition: 2 Leader: 20 Replicas: 20,18,19 Isr: 19,20,18
Topic: __consumer_offsets Partition: 3 Leader: 21 Replicas: 21,19,20 Isr: 19,20,21
Topic: __consumer_offsets Partition: 4 Leader: 22 Replicas: 22,20,21 Isr: 20,21,22
Topic: __consumer_offsets Partition: 5 Leader: 23 Replicas: 23,21,22 Isr: 23,21,22
Topic: __consumer_offsets Partition: 6 Leader: 24 Replicas: 24,22,23 Isr: 23,24,22
Topic: __consumer_offsets Partition: 7 Leader: 25 Replicas: 25,23,24 Isr: 23,25,24
Topic: __consumer_offsets Partition: 8 Leader: 26 Replicas: 26,24,25 Isr: 26,25,24
Topic: __consumer_offsets Partition: 9 Leader: 27 Replicas: 27,25,26 Isr: 27,26,25
Topic: __consumer_offsets Partition: 10 Leader: 28 Replicas: 28,26,27 Isr: 27,26,28
Topic: __consumer_offsets Partition: 11 Leader: 27 Replicas: 0,27,28 Isr: 27,28
Topic: __consumer_offsets Partition: 12 Leader: 28 Replicas: 1,28,0 Isr: 28
Topic: __consumer_offsets Partition: 13 Leader: -1 Replicas: 2,0,1 Isr:
Topic: __consumer_offsets Partition: 14 Leader: -1 Replicas: 3,1,2 Isr:
Topic: __consumer_offsets Partition: 15 Leader: -1 Replicas: 4,2,3 Isr:
Topic: __consumer_offsets Partition: 16 Leader: -1 Replicas: 5,3,4 Isr:
Topic: __consumer_offsets Partition: 17 Leader: -1 Replicas: 6,4,5 Isr:
Topic: __consumer_offsets Partition: 18 Leader: -1 Replicas: 7,5,6 Isr:
Topic: __consumer_offsets Partition: 19 Leader: 8 Replicas: 8,6,7 Isr: 8
Topic: __consumer_offsets Partition: 20 Leader: 9 Replicas: 9,7,8 Isr: 9,8
Topic: __consumer_offsets Partition: 21 Leader: 10 Replicas: 10,8,9 Isr: 10,8,9
Topic: __consumer_offsets Partition: 22 Leader: 11 Replicas: 11,9,10 Isr: 11,10,9
Topic: __consumer_offsets Partition: 23 Leader: 12 Replicas: 12,10,11 Isr: 11,12,10
Topic: __consumer_offsets Partition: 24 Leader: 13 Replicas: 13,11,12 Isr: 13,11,12
Topic: __consumer_offsets Partition: 25 Leader: 14 Replicas: 14,12,13 Isr: 13,12,14
Topic: __consumer_offsets Partition: 26 Leader: 15 Replicas: 15,13,14 Isr: 13,14,15
Topic: __consumer_offsets Partition: 27 Leader: 16 Replicas: 16,14,15 Isr: 14,16,15
Topic: __consumer_offsets Partition: 28 Leader: 42 Replicas: 17,15,42 Isr: 42,15
Topic: __consumer_offsets Partition: 29 Leader: 18 Replicas: 18,17,19 Isr: 19,18
Topic: __consumer_offsets Partition: 30 Leader: 19 Replicas: 19,18,20 Isr: 19,20,18
Topic: __consumer_offsets Partition: 31 Leader: 20 Replicas: 20,19,21 Isr: 19,20,21
Topic: __consumer_offsets Partition: 32 Leader: 21 Replicas: 21,20,22 Isr: 20,21,22
Topic: __consumer_offsets Partition: 33 Leader: 22 Replicas: 22,21,23 Isr: 23,21,22
Topic: __consumer_offsets Partition: 34 Leader: 23 Replicas: 23,22,24 Isr: 23,24,22
Topic: __consumer_offsets Partition: 35 Leader: 24 Replicas: 24,23,25 Isr: 23,25,24
Topic: __consumer_offsets Partition: 36 Leader: 25 Replicas: 25,24,26 Isr: 26,25,24
Topic: __consumer_offsets Partition: 37 Leader: 26 Replicas: 26,25,27 Isr: 27,26,25
Topic: __consumer_offsets Partition: 38 Leader: 27 Replicas: 27,26,28 Isr: 27,26,28
Topic: __consumer_offsets Partition: 39 Leader: 28 Replicas: 28,27,0 Isr: 27,28
Topic: __consumer_offsets Partition: 40 Leader: 28 Replicas: 0,28,1 Isr: 28
Topic: __consumer_offsets Partition: 41 Leader: -1 Replicas: 1,0,2 Isr:
Topic: __consumer_offsets Partition: 42 Leader: -1 Replicas: 2,1,3 Isr:
Topic: __consumer_offsets Partition: 43 Leader: -1 Replicas: 3,2,4 Isr:
Topic: __consumer_offsets Partition: 44 Leader: -1 Replicas: 4,3,5 Isr:
Topic: __consumer_offsets Partition: 45 Leader: -1 Replicas: 5,4,6 Isr:
Topic: __consumer_offsets Partition: 46 Leader: -1 Replicas: 6,5,7 Isr:
Topic: __consumer_offsets Partition: 47 Leader: 8 Replicas: 7,6,8 Isr: 8
Topic: __consumer_offsets Partition: 48 Leader: 8 Replicas: 8,7,9 Isr: 9,8
Topic: __consumer_offsets Partition: 49 Leader: 9 Replicas: 9,8,10 Isr: 10,9,8
43分區果然存在leader異常的情況
2、問題複現
我們使用UUID批量生成消費組名,使其hashCode取模後為異常分區的分區号,再使用其進行消費時均出現消費異常的問題。
3、問題思考
- 為什麼
__consumer_offsets
部分分區會産生leader、replicaInfo、isr異常?
與網絡抖動和一些叢集操作可能有關,需要具體問題具體分析
- 如何将
__consumer_offsets
異常分區恢複正常?
這裡不詳細介紹可以參考http://blog.itpub.net/31543630/viewspace-2212467/ 。
五、參考資料
- Kafka new-consumer設計文檔 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design
- Kafka無法消費?!我的分布式消息服務Kafka卻穩如泰山! http://blog.itpub.net/31543630/viewspace-2212467/
- Problem with Kafka 0.9 Client http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html
- ErrorMapping https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala