從指定offset消費kafka資料
java從指定offset消費kafka資料
代碼
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
/**
* Description
* Create by haohongtao
* Date 2021/4/20 3:02 下午
*/
@Slf4j
public class KafkaConsumerTest {
private static KafkaConsumer<String, String> consumer;
//初始化consumer,這段代碼可以根據不同情況自定義
static {
Map<String, Object> props = Maps.newHashMap();
//叢集位址
props.put("bootstrap.servers", "test-kafka..com:9092");
//設定我們獨特的消費者的組id,每次不要一樣,一個offset隻能被同一個group消費一次的
props.put("group.id", "test-"+System.currentTimeMillis());
//設定手動送出
props.put("enable.auto.commit", "false");
//這個可以設定大一點
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//我一般測試單條報錯資料,1
props.put("max.poll.records", 1);
consumer = new KafkaConsumer<>(props);
}
public static void consume(String topicName, Integer partition, Integer offset) {
//用于配置設定topic和partition
consumer.assign(Arrays.asList(new TopicPartition(topicName, partition)));
//不改變目前offset,指定從這個topic和partition的開始位置擷取。
consumer.seek(new TopicPartition(topicName, partition),offset);
//consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, partition)));
for (int i = 0; i < 1; i++) {
//poll消息timeout可以設定大一點,由此設定1000,拿不到資料,debug檢視cluster發現沒有資訊
ConsumerRecords<String, String> records = consumer.poll(30000);
log.info("records length = {}", records.count());
for (ConsumerRecord record : records) {
log.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
}
public static void main(String[] args) {
String topicName = "my_topic_test";
Integer partition = 4;
Integer offset = 14103;
consume(topicName, partition, offset);
}
}
問題
消費不到資料
可能原因1:
//poll消息timeout可以設定大一點,由此設定1000,拿不到資料,debug檢視cluster發現沒有資訊
ConsumerRecords<String, String> records = consumer.poll(30000);
spark版本
比較簡單,在properties裡設定即可
後續補充代碼
flink版本
銅spark