天天看點

從指定offset消費kafka資料從指定offset消費kafka資料

從指定offset消費kafka資料

java從指定offset消費kafka資料

代碼

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Map;

/**
 * Description
 * Create by haohongtao
 * Date 2021/4/20 3:02 下午
 */
@Slf4j
public class KafkaConsumerTest {
    private static KafkaConsumer<String, String> consumer;

    //初始化consumer,這段代碼可以根據不同情況自定義
    static {
        Map<String, Object> props = Maps.newHashMap();

        //叢集位址
        props.put("bootstrap.servers", "test-kafka..com:9092");

        //設定我們獨特的消費者的組id,每次不要一樣,一個offset隻能被同一個group消費一次的
        props.put("group.id", "test-"+System.currentTimeMillis());
        //設定手動送出
        props.put("enable.auto.commit", "false");
        //這個可以設定大一點
        props.put("session.timeout.ms", "30000");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //我一般測試單條報錯資料,1
        props.put("max.poll.records", 1);

        consumer = new KafkaConsumer<>(props);
    }

    public static void consume(String topicName, Integer partition, Integer offset) {

        //用于配置設定topic和partition
        consumer.assign(Arrays.asList(new TopicPartition(topicName, partition)));
        //不改變目前offset,指定從這個topic和partition的開始位置擷取。
        consumer.seek(new TopicPartition(topicName, partition),offset);
        //consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, partition)));


        for (int i = 0; i < 1; i++) {
            //poll消息timeout可以設定大一點,由此設定1000,拿不到資料,debug檢視cluster發現沒有資訊
            ConsumerRecords<String, String> records = consumer.poll(30000);

            log.info("records length = {}", records.count());

            for (ConsumerRecord record : records) {
                log.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value());
            }
        }

    }

    public static void main(String[] args) {
        String topicName = "my_topic_test";
        Integer partition = 4;
        Integer offset = 14103;

        consume(topicName, partition, offset);
    }
}

           

問題

消費不到資料

可能原因1:

//poll消息timeout可以設定大一點,由此設定1000,拿不到資料,debug檢視cluster發現沒有資訊
ConsumerRecords<String, String> records = consumer.poll(30000);
           

spark版本

比較簡單,在properties裡設定即可

後續補充代碼

flink版本

銅spark