Kafka Topic xxx not present in metadata bug問題排查
異常堆棧
17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] o.s.k.s.LoggingProducerListener error:254 - Exception thrown when sending a message with key='null' and payload='{"bizId":"property","extendMap":{"stationCode":"21097-01","clientId":"00187dd33c02:[email protected]","...' to topic iot_edge_property and partition 1:
org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms.
00:38:59.944, [a25f869d4afcdecf a25f869d4afcdecf] [TID:N/A] [MQTT Call: 17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] c.g.s.b.k.p.KafKaProducerClient send:76 - send mq message error msgId:f1f5de02f4c248c196691184c035336b topic:iot_edge_property
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:372)
at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.send(KafKaProducerClient.java:67)
at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.sendFailSave(KafKaProducerClient.java:17
分析
假設1:是不是網絡問題。
試了下沒問題,排除。
假設2:缺少jackson包的的問題
根據異常資訊從網上搜尋,網上的答案都比較統一說是缺少jackson包的的問題,基于我們現有的情況,這個項目是經過了多輪測試,在之前測試都比較順暢,沒出現過這個問題,是以應該不是這個問題,排除。
假設3:是不是生産kafka版本與測試環境的版本不一緻,版本不相容。
讓運維檢視了一下版本一緻,排除。
假設4:是不是封包過大了,導緻發送逾時了
寫了一個小demo,往生産的kafka進行發送。
發送用戶端代碼
這裡分區是跟運維上線前确認過是2,是以就沒改動。
/**
* 發送用戶端
**/
public class ProducerClient {
/**
* kafka的分區,預設是2
**/
@Value("${kafka.partition.count:2}")
private int partitionCount;
/**
* @description:發送消息
* @date: 2020/9/8 下午5:02
* @param: message
* @param: callback
* @return: void
*/
public void send(MQMessage message, ListenableFutureCallback callback) {
try {
if (Objects.isNull(message.getMessageId())) {
//雪花算法
message.setMessageId(idGenerator.nextStrValue());
}
//如果key和分區都為空,那麼自動計算分區
if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) {
message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount);
}
//消息序列化
message.setMessage(getBizMessage(message));
log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
if (log.isDebugEnabled()) {
log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message));
}
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message));
if (Objects.nonNull(callback)) {
future.addCallback(callback);
}
log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
} catch (Exception e) {
log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e);
throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e);
}
}
}
消息發送示例
一個簡單封包測試接口、一個原封包測試接口
@RestController
@RequestMapping(value = "/example/test/")
@Api(tags = " 測試接口")
@Log4j2
public class TestController {
/**
*簡單封包測試
**/
@PostMapping(value = "/testSendMq")
@ApiOperation(value = "發送mq測試")
public void testSendMq() {
if (BooleanUtil.intConvertBoolean(testEnable)) {
Map<String, Object> msg = Maps.newHashMap();
msg.put("userId", "0001");
MQMessage mqMessage = new MQMessage();
mqMessage.setTopic("test_topic1231a");
mqMessage.setMessageId(idGenerator.nextStrValue());
mqMessage.setStoreRecords(true);
mqMessage.setFailRetry(false);
mqMessage.setMessage(msg);
producerClient.sendFailSave(mqMessage);
}
}
/**
*原封包測試
**/
@PostMapping(value = "/testSendMq2")
@ApiOperation(value = "發送mq測試")
public void testSendMq2() {
if (BooleanUtil.intConvertBoolean(testEnable)) {
String parkCode = "21097", stationCode = "21097-01";
String emqMsgStr = "{\n" +
"\t\"messageId\": \"f1f5de02f4c248c196691184c035336b\",\n" +
"\t\"extendMap\": {\n" +
"\t\t\"clientId\": \"00187dd33c02\",\n" +
"\t\t\"parkCode\": \"21097\",\n" +
"\t\t\"stationCode\": \"21097-01\"\n" +
"\t},\n" +
"\t\"timestamp\": 1626287494277,\n" +
"\t\"bizId\": \"property\",\n" +
"\t\"message\": {\n" +
"\t\t\"deviceCode\": \"21097000641\",\n" +
"\t\t\"name\": \"測試資料\",\n" +
"\t\t\"areaCode\": \"1212121212\",\n" +
"\t\t\"deviceType\": \"xxxx1212\",\n" +
"\t\t\"properties\": [{\n" +
"\t\t\t\"name\": \"高度1\",\n" +
"\t\t\t\"identifier\": \"tank_height\",\n" +
"\t\t\t\"value\": \"6\",\n" +
"\t\t\t\"valueRefDevices\": null,\n" +
"\t\t\t\"timestamp\": 1626280679529,\n" +
"\t\t\t\"changed\": 0\n" +
"\t\t}],\n" +
"\t\t\"traceId\": \"\",\n" +
"\t\t\"mock\": 0\n" +
"\t},\n" +
"\t\"resend\": true\n" +
"}";
log.info("擷取到的parkCode {},stationCode{} 作為 topic為{}", parkCode, stationCode, parkCode.hashCode());
MQMessage mqMessage = JSON.parseObject(emqMsgStr, MQMessage.class);
Map<String, Object> extendMap = mqMessage.getExtendMap();
if (Objects.isNull(extendMap)) {
extendMap = Maps.newHashMap();
}
extendMap.put("parkCode", "21097");
extendMap.put("stationCode", "21097-01");
mqMessage.setExtendMap(extendMap);
mqMessage.setTopic("test_topic1231a");
mqMessage.setStoreRecords(true);
mqMessage.setFailRetry(false);
producerClient.sendFailSave(mqMessage);
}
}
}
- 執行簡單封包示例,成功

2. 執行原封包接口2,失敗。。。
3. 在此執行簡單封包,失敗了。。。
- 在等一會執行簡單封包又成功了。。。
問題複現了就比較好解決了,初步分析是不是原封包過大了,導緻發送隊列堵塞逾時的,然後在此發簡單封包,也會逾時,基于以上情況找運維同學,檢視kafka配置,突然發現kafka的配置是1。。。,突然想到我們之前的配置是2。
假設5:是不是分區配置錯了
基于檢視kafka的配置是1,而我們配置的是2,然後我們修改kafka.partition.count=1,在此進行測試,問題解決。。。。。
@Value("${kafka.partition.count:2}")
private int partitionCount;
/**
* @description:發送消息
* @date: 2020/9/8 下午5:02
* @param: message
* @param: callback
* @return: void
*/
public void send(MQMessage message, ListenableFutureCallback callback) {
try {
if (Objects.isNull(message.getMessageId())) {
//雪花算法
message.setMessageId(idGenerator.nextStrValue());
}
//如果key和分區都為空,那麼自動計算分區
if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) {
message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount);
}
//消息序列化
message.setMessage(getBizMessage(message));
log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
if (log.isDebugEnabled()) {
log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message));
}
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message));
if (Objects.nonNull(callback)) {
future.addCallback(callback);
}
log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
} catch (Exception e) {
log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e);
throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e);
}
}
}
總結
基于多種情況分析是由于kafka的分區配置錯誤導緻。在發送消息時設定的分區為不存在的分區然後發送失敗。