天天看點

分區配置錯誤導緻Kafka Topic xxx not present in metadata bug問題排查Kafka Topic xxx not present in metadata bug問題排查異常堆棧分析總結

Kafka Topic xxx not present in metadata bug問題排查

異常堆棧

17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] o.s.k.s.LoggingProducerListener error:254 - Exception thrown when sending a message with key='null' and payload='{"bizId":"property","extendMap":{"stationCode":"21097-01","clientId":"00187dd33c02:[email protected]","...' to topic iot_edge_property and partition 1:

org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms.

00:38:59.944, [a25f869d4afcdecf a25f869d4afcdecf] [TID:N/A]  [MQTT Call: 17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] c.g.s.b.k.p.KafKaProducerClient send:76 - send mq message error msgId:f1f5de02f4c248c196691184c035336b topic:iot_edge_property

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms.

        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570)

        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:372)

        at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.send(KafKaProducerClient.java:67)

        at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.sendFailSave(KafKaProducerClient.java:17
           

分析

假設1:是不是網絡問題。

試了下沒問題,排除。

假設2:缺少jackson包的的問題

根據異常資訊從網上搜尋,網上的答案都比較統一說是缺少jackson包的的問題,基于我們現有的情況,這個項目是經過了多輪測試,在之前測試都比較順暢,沒出現過這個問題,是以應該不是這個問題,排除。

假設3:是不是生産kafka版本與測試環境的版本不一緻,版本不相容。

讓運維檢視了一下版本一緻,排除。

假設4:是不是封包過大了,導緻發送逾時了

寫了一個小demo,往生産的kafka進行發送。

發送用戶端代碼

這裡分區是跟運維上線前确認過是2,是以就沒改動。

/**
 * 發送用戶端
 **/

public class ProducerClient {
    /**
     * kafka的分區,預設是2
     **/
    @Value("${kafka.partition.count:2}")
    private int partitionCount;

    /**
     * @description:發送消息
     * @date: 2020/9/8 下午5:02
     * @param: message
     * @param: callback
     * @return: void
     */
    public void send(MQMessage message, ListenableFutureCallback callback) {
        try {
            if (Objects.isNull(message.getMessageId())) {
                //雪花算法
                message.setMessageId(idGenerator.nextStrValue());
            }
            //如果key和分區都為空,那麼自動計算分區
            if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) {
                message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount);
            }
            //消息序列化
            message.setMessage(getBizMessage(message));
            log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
            if (log.isDebugEnabled()) {
                log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message));
            }

            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message));
            if (Objects.nonNull(callback)) {
                future.addCallback(callback);
            }
            log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
        } catch (Exception e) {
            log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e);
            throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e);
        }
    }
}
           

消息發送示例

一個簡單封包測試接口、一個原封包測試接口

@RestController
@RequestMapping(value = "/example/test/")
@Api(tags = " 測試接口")
@Log4j2
public class TestController {

/**
*簡單封包測試
**/
@PostMapping(value = "/testSendMq")
@ApiOperation(value = "發送mq測試")
public void testSendMq() {
    if (BooleanUtil.intConvertBoolean(testEnable)) {
        Map<String, Object> msg = Maps.newHashMap();
        msg.put("userId", "0001");
        MQMessage mqMessage = new MQMessage();
        mqMessage.setTopic("test_topic1231a");
        mqMessage.setMessageId(idGenerator.nextStrValue());
        mqMessage.setStoreRecords(true);
        mqMessage.setFailRetry(false);
        mqMessage.setMessage(msg);
        producerClient.sendFailSave(mqMessage);
    }
}
/**
*原封包測試
**/
@PostMapping(value = "/testSendMq2")
@ApiOperation(value = "發送mq測試")
public void testSendMq2() {
    if (BooleanUtil.intConvertBoolean(testEnable)) {
        String parkCode = "21097", stationCode = "21097-01";
        String emqMsgStr = "{\n" +
                "\t\"messageId\": \"f1f5de02f4c248c196691184c035336b\",\n" +
                "\t\"extendMap\": {\n" +
                "\t\t\"clientId\": \"00187dd33c02\",\n" +
                "\t\t\"parkCode\": \"21097\",\n" +
                "\t\t\"stationCode\": \"21097-01\"\n" +
                "\t},\n" +
                "\t\"timestamp\": 1626287494277,\n" +
                "\t\"bizId\": \"property\",\n" +
                "\t\"message\": {\n" +
                "\t\t\"deviceCode\": \"21097000641\",\n" +
                "\t\t\"name\": \"測試資料\",\n" +
                "\t\t\"areaCode\": \"1212121212\",\n" +
                "\t\t\"deviceType\": \"xxxx1212\",\n" +
                "\t\t\"properties\": [{\n" +
                "\t\t\t\"name\": \"高度1\",\n" +
                "\t\t\t\"identifier\": \"tank_height\",\n" +
                "\t\t\t\"value\": \"6\",\n" +
                "\t\t\t\"valueRefDevices\": null,\n" +
                "\t\t\t\"timestamp\": 1626280679529,\n" +
                "\t\t\t\"changed\": 0\n" +
                "\t\t}],\n" +
                "\t\t\"traceId\": \"\",\n" +
                "\t\t\"mock\": 0\n" +
                "\t},\n" +
                "\t\"resend\": true\n" +
                "}";
        log.info("擷取到的parkCode {},stationCode{} 作為 topic為{}", parkCode, stationCode, parkCode.hashCode());

        MQMessage mqMessage = JSON.parseObject(emqMsgStr, MQMessage.class);

        Map<String, Object> extendMap = mqMessage.getExtendMap();
        if (Objects.isNull(extendMap)) {
            extendMap = Maps.newHashMap();
        }
        extendMap.put("parkCode", "21097");
        extendMap.put("stationCode", "21097-01");
        mqMessage.setExtendMap(extendMap);
        mqMessage.setTopic("test_topic1231a");
        mqMessage.setStoreRecords(true);
        mqMessage.setFailRetry(false);
        producerClient.sendFailSave(mqMessage);
    }

}

}
           
  1. 執行簡單封包示例,成功
分區配置錯誤導緻Kafka Topic xxx not present in metadata bug問題排查Kafka Topic xxx not present in metadata bug問題排查異常堆棧分析總結

2. 執行原封包接口2,失敗。。。

分區配置錯誤導緻Kafka Topic xxx not present in metadata bug問題排查Kafka Topic xxx not present in metadata bug問題排查異常堆棧分析總結

3. 在此執行簡單封包,失敗了。。。

分區配置錯誤導緻Kafka Topic xxx not present in metadata bug問題排查Kafka Topic xxx not present in metadata bug問題排查異常堆棧分析總結
  1. 在等一會執行簡單封包又成功了。。。
分區配置錯誤導緻Kafka Topic xxx not present in metadata bug問題排查Kafka Topic xxx not present in metadata bug問題排查異常堆棧分析總結

問題複現了就比較好解決了,初步分析是不是原封包過大了,導緻發送隊列堵塞逾時的,然後在此發簡單封包,也會逾時,基于以上情況找運維同學,檢視kafka配置,突然發現kafka的配置是1。。。,突然想到我們之前的配置是2。

假設5:是不是分區配置錯了

基于檢視kafka的配置是1,而我們配置的是2,然後我們修改kafka.partition.count=1,在此進行測試,問題解決。。。。。

@Value("${kafka.partition.count:2}")
    private int partitionCount;
    
 /**
     * @description:發送消息
     * @date: 2020/9/8 下午5:02
     * @param: message
     * @param: callback
     * @return: void
     */
    public void send(MQMessage message, ListenableFutureCallback callback) {
        try {
            if (Objects.isNull(message.getMessageId())) {
                //雪花算法
                message.setMessageId(idGenerator.nextStrValue());
            }
            //如果key和分區都為空,那麼自動計算分區
            if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) {
                message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount);
            }
            //消息序列化
            message.setMessage(getBizMessage(message));
            log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
            if (log.isDebugEnabled()) {
                log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message));
            }

            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message));
            if (Objects.nonNull(callback)) {
                future.addCallback(callback);
            }
            log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
        } catch (Exception e) {
            log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e);
            throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e);
        }
    }
}    
           

總結

基于多種情況分析是由于kafka的分區配置錯誤導緻。在發送消息時設定的分區為不存在的分區然後發送失敗。