天天看点

kafka查询offset&生产者offset计算&消费offset计算

本文目录

    • kafka查询offset&生产者offset计算&消费offset计算
      • 1、简介
      • 2、需求背景
      • 3、前期准备
      • 4、获取kafka生产者的offset以及消费者的offset
      • 5、代码测试
      • 6、总结
        • 鸡汤送上:每个生命都有裂缝,如此才会有光线射进来。

kafka查询offset&生产者offset计算&消费offset计算

1、简介

​ kafka的介绍:略…(有兴趣的同学可自行Google,这与本文无关 ^ _ ^)

2、需求背景

​ 对kafka做监控,需要获取到kafka接收到消息的offset和被消费者消费掉消息的offset,编写接口将数值交给prometheus,直接观察判断kafka的消费性能如何。(如何自定义prometheus的监控指标后续再更新,本章只讲如何获得kafka的offset)

3、前期准备

​ 我们需要在自己的机器上装一个zookeeper和kafka,并测试kafka可用(具体怎么装,请自行搜索)

4、获取kafka生产者的offset以及消费者的offset

​ kafka是有两个offset的,在kafka结构图中,生产者生产消息到kafka后,kafka会记录一次生产者消息的offset,消费者消费掉消息后,kafka会记录一次消费者消费消息的offset,这两个offset是分开的,过程如下图:(大概意思是这样的)

kafka查询offset&生产者offset计算&消费offset计算

值得注意的是生产者和消费者的差值就是积压的消息量。(生产者的offset一定是大于等于消费者offset的)

​ 我们再来回顾一下kafka的命令行,如图:

kafka查询offset&生产者offset计算&消费offset计算

我们执行kafka提供的命令后可以看到,图中的数据,LOG-END-OFFSET就是当前生产者的offset,CURRENT-OFFSET就是当前已提交的commit offset,也就是消费者的offset,而LAG就是消息积压量。

​ 接下来我们用代码演示一遍(准备小demo,展示演示效果)

项目结构如下:

kafka查询offset&生产者offset计算&消费offset计算

代码献上:(每个步骤基本都有注释,有问题的可以私信我)

maven依赖:

<!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>
           

配置文件:

server:
  port: 9494
kafka:
  address: localhost:9092
  group: kclog
           

代码:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Java Devin
 * @date 2023/1/8 14:01
 */
@Configuration
public class KafkaConfig {

    @Value("${kafka.address}")
    private String address;

    @Value("${kafka.group}")
    private String group;

    private Map<String, Object> producerProp = new HashMap<>();
    private Map<String, Object> consumerProp = new HashMap<>();

    @PostConstruct
    private void initProp() {
        // 生产者配置
        producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 消费者配置
        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

    /**
     * 返回kafka客户端
     * @return
     */
    @Bean
    public AdminClient getAdminClient() {
        return KafkaAdminClient.create(producerProp);
    }

    /**
     * 返回一个消费者
     * @return
     */
    @Bean
    public Consumer<String, String> getConsumer() {
        DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProp);
        return kafkaConsumerFactory.createConsumer();
    }
}
           
import com.javadevin.config.KafkaConfig;
import com.javadevin.util.KafkaUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * 查看 kafka offset 的接口
 * @author Java Devin
 * @date 2023/1/8 14:13
 */
@RestController
public class KafkaOffsetDemo {

    private static final Log log = LogFactory.getLog(KafkaOffsetDemo.class);

    @Autowired
    private KafkaConfig kafkaConfig;

    @Autowired
    private KafkaUtil kafkaUtil;

    @GetMapping("/kafkaOffset")
    public Map<String, Long> testKafkaOffset(){

        // 获取kafka客户端
        AdminClient adminClient = kafkaConfig.getAdminClient();

        // 获取消费者
        Consumer<String, String> consumer = kafkaConfig.getConsumer();

        //获取所有topic名称
        KafkaFuture<Set<String>> topics = adminClient.listTopics().names();
        Set<String> names = null;
        try {
            names = topics.get();
        } catch (Exception e) {
            log.error("kafka topic query error", e);
        }

        //写入kafka的消息总量
        Long endOffset = 0L;
        //从kafka消费的消息总量
        Long commitOffset = 0L;

        // 将每一个topic中的数据累加起来
        for (String name : names) {
            endOffset += kafkaUtil.getTopicEndOffset(consumer, name);
            commitOffset += kafkaUtil.getTopicCommitOffset(consumer, name);
        }

        Map<String, Long> map = new HashMap<>(2);

        map.put("endOffset", endOffset);
        map.put("commitOffset", commitOffset);

        return map;
    }
}
           
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * kafka 工具类
 * @author Java Devin
 * @date 2023/1/8 14:16
 */
@Component
public class KafkaUtil {

    /**
     * 获取单个topic的消费量
     * @param consumer
     * @param topic
     * @return
     */
    public Long getTopicCommitOffset(Consumer<String, String> consumer, String topic){
        Long commitOffset = 0L;

        // 这一步是因为 kafka架构的特殊性 在每一个topic中都存在partition 根据每个项目的不同 会配置或不配置partition
        // 消息会分散的存储在partition中 所以我们计算时应该计算所有partition的offset的和
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            OffsetAndMetadata committed = consumer.committed(topicPartition);
            if (committed != null){
                commitOffset += committed.offset();
            }
        }
        return commitOffset;
    }


    /**
     * 获取单个topic接收量
     * @param consumer
     * @param topic
     * @return
     */
    public Long getTopicEndOffset(Consumer<String, String> consumer, String topic){
        Long endOffset = 0L;

        // 这一步是因为 kafka架构的特殊性 在每一个topic中都存在partition 根据每个项目的不同 会配置或不配置partition
        // 消息会分散的存储在partition中 所以我们计算时应该计算所有partition的offset的和
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> partitions = new ArrayList<>();
        for (PartitionInfo partitionInfo : partitionInfos) {
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            partitions.add(topicPartition);
        }

        Collection<Long> values = consumer.endOffsets(partitions).values();
        for (Long value : values) {
            endOffset += value;
        }
        return endOffset;
    }
}
           

运行结果展示:

kafka查询offset&amp;生产者offset计算&amp;消费offset计算

5、代码测试

我们先来查询一下offset:

kafka查询offset&amp;生产者offset计算&amp;消费offset计算

首先使用命令创建一个生产者:

kafka查询offset&amp;生产者offset计算&amp;消费offset计算

向kafka发送5条消息,然后再查询一下:

kafka查询offset&amp;生产者offset计算&amp;消费offset计算
kafka查询offset&amp;生产者offset计算&amp;消费offset计算

现在可以看到接收到消息已经为6,而消费掉的消息还是1,计算一下得出积压了5条消息未消费

现在来开启消费者,再次查询:

kafka查询offset&amp;生产者offset计算&amp;消费offset计算
kafka查询offset&amp;生产者offset计算&amp;消费offset计算

现在可以根据数据判断出kafka内的消息已经消费完了,再用命令查询一下

kafka查询offset&amp;生产者offset计算&amp;消费offset计算

命令查询后,结果是一样的,值得注意的是,我在电脑上配置的kafka只有一个topic,且只会给一个partition中塞数据,创建多个topic或多个partition然后测试的任务就交给大家伙了,我就不一一演示了。

6、总结

​ 其实kafka的offset计算并不难,难点在于很多api如果不去研究官方文档或者请教朋友,就无从得知,并且在计算时topic和partition概念很难一次性消化,但有时间就可以去研究一下,研究明白了会发现其实挺好玩的。

鸡汤送上:每个生命都有裂缝,如此才会有光线射进来。

最后说明,创作不易,若转载请标明出处或原文链接!!!