package my.test.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/**
* @author WGY
* 簡單consumer
* 手動 同步異步送出
* 自動送出
*/
public class MyConsumer1 {
public static void main(String[] args) {
//1、建立消費者配置資訊
Properties prop = new Properties();
//2、給配置資訊指派
// prop.put("bootstrap.servers","hadoop1:9092");
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092");
//開啟自動送出
//手動送出這裡先關閉 再在for循環下面加上consumer.commitSync()//同步送出;
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自動送出的延遲
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//kv的反序列化
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//消費者組
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "aaa");
//重置消費者offset 這個配置隻在兩種情況下生效: 如果換組或者過了七天offset就會重置
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//建立消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
//訂閱主題
consumer.subscribe(Arrays.asList("first1","second"));
//拉取資料 一次拉去獲得多個資料 開啟就不關閉 寫入死循環
while(true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
//解析并列印ConsumerRecords
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
}
}
//異步送出 一個線程送出 一個線程接收
// consumer.commitAsync(new OffsetCommitCallback() {
// @Override
// public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
// if(exception != null){
// System.out.println("Commit failed for"+ offsets);
// }
// }
// });
}
}