天天看点

分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查Kafka Topic xxx not present in metadata bug问题排查异常堆栈分析总结

Kafka Topic xxx not present in metadata bug问题排查

异常堆栈

17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] o.s.k.s.LoggingProducerListener error:254 - Exception thrown when sending a message with key='null' and payload='{"bizId":"property","extendMap":{"stationCode":"21097-01","clientId":"00187dd33c02:[email protected]","...' to topic iot_edge_property and partition 1:

org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms.

00:38:59.944, [a25f869d4afcdecf a25f869d4afcdecf] [TID:N/A]  [MQTT Call: 17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] c.g.s.b.k.p.KafKaProducerClient send:76 - send mq message error msgId:f1f5de02f4c248c196691184c035336b topic:iot_edge_property

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms.

        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570)

        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:372)

        at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.send(KafKaProducerClient.java:67)

        at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.sendFailSave(KafKaProducerClient.java:17
           

分析

假设1:是不是网络问题。

试了下没问题,排除。

假设2:缺少jackson包的的问题

根据异常信息从网上搜索,网上的答案都比较统一说是缺少jackson包的的问题,基于我们现有的情况,这个项目是经过了多轮测试,在之前测试都比较顺畅,没出现过这个问题,所以应该不是这个问题,排除。

假设3:是不是生产kafka版本与测试环境的版本不一致,版本不兼容。

让运维查看了一下版本一致,排除。

假设4:是不是报文过大了,导致发送超时了

写了一个小demo,往生产的kafka进行发送。

发送客户端代码

这里分区是跟运维上线前确认过是2,所以就没改动。

/**
 * 发送客户端
 **/

public class ProducerClient {
    /**
     * kafka的分区,默认是2
     **/
    @Value("${kafka.partition.count:2}")
    private int partitionCount;

    /**
     * @description:发送消息
     * @date: 2020/9/8 下午5:02
     * @param: message
     * @param: callback
     * @return: void
     */
    public void send(MQMessage message, ListenableFutureCallback callback) {
        try {
            if (Objects.isNull(message.getMessageId())) {
                //雪花算法
                message.setMessageId(idGenerator.nextStrValue());
            }
            //如果key和分区都为空,那么自动计算分区
            if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) {
                message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount);
            }
            //消息序列化
            message.setMessage(getBizMessage(message));
            log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
            if (log.isDebugEnabled()) {
                log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message));
            }

            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message));
            if (Objects.nonNull(callback)) {
                future.addCallback(callback);
            }
            log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
        } catch (Exception e) {
            log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e);
            throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e);
        }
    }
}
           

消息发送示例

一个简单报文测试接口、一个原报文测试接口

@RestController
@RequestMapping(value = "/example/test/")
@Api(tags = " 测试接口")
@Log4j2
public class TestController {

/**
*简单报文测试
**/
@PostMapping(value = "/testSendMq")
@ApiOperation(value = "发送mq测试")
public void testSendMq() {
    if (BooleanUtil.intConvertBoolean(testEnable)) {
        Map<String, Object> msg = Maps.newHashMap();
        msg.put("userId", "0001");
        MQMessage mqMessage = new MQMessage();
        mqMessage.setTopic("test_topic1231a");
        mqMessage.setMessageId(idGenerator.nextStrValue());
        mqMessage.setStoreRecords(true);
        mqMessage.setFailRetry(false);
        mqMessage.setMessage(msg);
        producerClient.sendFailSave(mqMessage);
    }
}
/**
*原报文测试
**/
@PostMapping(value = "/testSendMq2")
@ApiOperation(value = "发送mq测试")
public void testSendMq2() {
    if (BooleanUtil.intConvertBoolean(testEnable)) {
        String parkCode = "21097", stationCode = "21097-01";
        String emqMsgStr = "{\n" +
                "\t\"messageId\": \"f1f5de02f4c248c196691184c035336b\",\n" +
                "\t\"extendMap\": {\n" +
                "\t\t\"clientId\": \"00187dd33c02\",\n" +
                "\t\t\"parkCode\": \"21097\",\n" +
                "\t\t\"stationCode\": \"21097-01\"\n" +
                "\t},\n" +
                "\t\"timestamp\": 1626287494277,\n" +
                "\t\"bizId\": \"property\",\n" +
                "\t\"message\": {\n" +
                "\t\t\"deviceCode\": \"21097000641\",\n" +
                "\t\t\"name\": \"测试数据\",\n" +
                "\t\t\"areaCode\": \"1212121212\",\n" +
                "\t\t\"deviceType\": \"xxxx1212\",\n" +
                "\t\t\"properties\": [{\n" +
                "\t\t\t\"name\": \"高度1\",\n" +
                "\t\t\t\"identifier\": \"tank_height\",\n" +
                "\t\t\t\"value\": \"6\",\n" +
                "\t\t\t\"valueRefDevices\": null,\n" +
                "\t\t\t\"timestamp\": 1626280679529,\n" +
                "\t\t\t\"changed\": 0\n" +
                "\t\t}],\n" +
                "\t\t\"traceId\": \"\",\n" +
                "\t\t\"mock\": 0\n" +
                "\t},\n" +
                "\t\"resend\": true\n" +
                "}";
        log.info("获取到的parkCode {},stationCode{} 作为 topic为{}", parkCode, stationCode, parkCode.hashCode());

        MQMessage mqMessage = JSON.parseObject(emqMsgStr, MQMessage.class);

        Map<String, Object> extendMap = mqMessage.getExtendMap();
        if (Objects.isNull(extendMap)) {
            extendMap = Maps.newHashMap();
        }
        extendMap.put("parkCode", "21097");
        extendMap.put("stationCode", "21097-01");
        mqMessage.setExtendMap(extendMap);
        mqMessage.setTopic("test_topic1231a");
        mqMessage.setStoreRecords(true);
        mqMessage.setFailRetry(false);
        producerClient.sendFailSave(mqMessage);
    }

}

}
           
  1. 执行简单报文示例,成功
分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查Kafka Topic xxx not present in metadata bug问题排查异常堆栈分析总结

2. 执行原报文接口2,失败。。。

分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查Kafka Topic xxx not present in metadata bug问题排查异常堆栈分析总结

3. 在此执行简单报文,失败了。。。

分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查Kafka Topic xxx not present in metadata bug问题排查异常堆栈分析总结
  1. 在等一会执行简单报文又成功了。。。
分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查Kafka Topic xxx not present in metadata bug问题排查异常堆栈分析总结

问题复现了就比较好解决了,初步分析是不是原报文过大了,导致发送队列堵塞超时的,然后在此发简单报文,也会超时,基于以上情况找运维同学,查看kafka配置,突然发现kafka的配置是1。。。,突然想到我们之前的配置是2。

假设5:是不是分区配置错了

基于查看kafka的配置是1,而我们配置的是2,然后我们修改kafka.partition.count=1,在此进行测试,问题解决。。。。。

@Value("${kafka.partition.count:2}")
    private int partitionCount;
    
 /**
     * @description:发送消息
     * @date: 2020/9/8 下午5:02
     * @param: message
     * @param: callback
     * @return: void
     */
    public void send(MQMessage message, ListenableFutureCallback callback) {
        try {
            if (Objects.isNull(message.getMessageId())) {
                //雪花算法
                message.setMessageId(idGenerator.nextStrValue());
            }
            //如果key和分区都为空,那么自动计算分区
            if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) {
                message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount);
            }
            //消息序列化
            message.setMessage(getBizMessage(message));
            log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
            if (log.isDebugEnabled()) {
                log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message));
            }

            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message));
            if (Objects.nonNull(callback)) {
                future.addCallback(callback);
            }
            log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic());
        } catch (Exception e) {
            log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e);
            throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e);
        }
    }
}    
           

总结

基于多种情况分析是由于kafka的分区配置错误导致。在发送消息时设置的分区为不存在的分区然后发送失败。