天天看點

Kafka(四) :Java 編寫簡單的kafka生産者和消費者

一、準備工作

  • 架構:springboot
  • 電腦檢視網關是否開啟工具:telnet 安裝(Mac)
  • 引入依賴:
<!--添加kafka -->
 <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
 </dependency>
           

二、生産者

public static void main(String[] args) {
        //建立生産者配置資訊
        Properties properties = new Properties();
        //設定key
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //設定重試次數
        properties.put(ProducerConfig.RETRIES_CONFIG,10);
        //設定value
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //設定服務位址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"http://106.52.174.56:9092");

        //設定kafka
        KafkaProducer<String, String> producers = new KafkaProducer<String, String>(properties);
        //設定消息
        ProducerRecord<String,String> producerRecords = new ProducerRecord<>("kafkalearn","learn-info","hello word!");
        //發送消息
        producers.send(producerRecords);
        //關閉
        producers.close();

    }
           

三、消費者

public static void main(String[] args) {

        //建立消費者配置資訊
        Properties properties = new Properties();
        //設定key
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //設定消費組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group.demo");
        //設定value
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //設定服務位址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"http://106.52.174.56:9092");

        //設定kafka
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //訂閱消息
        consumer.subscribe(Collections.singletonList("kafkalearn"));
        //監聽消息,間隔2秒輪詢一次
        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(2000));
            poll.forEach(item->{
                System.out.println("結果呢:----"+item.key()+"--"+item.value());
            });

        }
           

五、出現的問題

  1. centeros上本地連接配接不上,報錯,
  •   解決辦法:無法遠端連接配接kafka