天天看點

kafka學習--Consumer API--代碼示範

package my.test.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @author WGY
 * 簡單consumer
 * 手動 同步異步送出
 * 自動送出
 */
public class MyConsumer1 {
    public static void main(String[] args) {
        //1、建立消費者配置資訊
        Properties prop = new Properties();

        //2、給配置資訊指派
//        prop.put("bootstrap.servers","hadoop1:9092");
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092");
        //開啟自動送出
        //手動送出這裡先關閉 再在for循環下面加上consumer.commitSync()//同步送出;
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //自動送出的延遲
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //kv的反序列化
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //消費者組
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "aaa");
        //重置消費者offset 這個配置隻在兩種情況下生效: 如果換組或者過了七天offset就會重置
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


        //建立消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        //訂閱主題
        consumer.subscribe(Arrays.asList("first1","second"));
        //拉取資料  一次拉去獲得多個資料  開啟就不關閉  寫入死循環
        while(true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
            //解析并列印ConsumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
            }

        }
        //異步送出 一個線程送出 一個線程接收
//        consumer.commitAsync(new OffsetCommitCallback() {
//            @Override
//            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
//                if(exception != null){
//                    System.out.println("Commit failed for"+ offsets);
//                }
//            }
//        });

    }
}