天天看點

Kafka-Consumer(Java & Scala)Maven 依賴Java版Scala版

Kafka-Consumer-Java

  • Maven 依賴
  • Java版
    • 配置類
    • 消費者
    • 測試類
    • 聯合測試
  • Scala版
    • 配置類
    • 消費者
    • 測試類
    • 聯合測試

本文使用java api來實作 kafka消費,配合伺服器生産消息/Java代碼生産消息

Kafka-Producer JAVA & Scala 實作

Maven 依賴

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
           

Java版

配置類

package kafka;

/**
 *  Kafka常用配置
 */
public class KafkaPro {
    public final static String ZK = "192.165.1.82:2181";
    public final static String TOPIC = "java_kafka";
    // 指定broker的位址清單,位址格式為 host : port
    public final static String BROKER_LIST = "192.165.1.82:9092";

    public final static String GROUP_ID = "Kafka_API";
}

           

消費者

package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer  extends Thread{

    private String topic;
    public CustomConsumer(String topic){
        this.topic = topic;
    }
    private KafkaConsumer ConsumerCreate (String topic){
        Properties props = new Properties();
        // Kafka 伺服器清單
        props.put("bootstrap.servers",KafkaPro.BROKER_LIST);
        // 組ID
        props.put("group.id",KafkaPro.GROUP_ID);
        // 開啟自動送出offset
        props.put("enable.auto.commit","true");
        // 每1000 ms 送出一次 offset,必須開啟自動送出offset
        props.put("auto.commit.interval.ms","1000");

        // 使用該類将值對象序列化為位元組數組
        props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //訂閱主題
        consumer.subscribe(Arrays.asList(topic));
        return  consumer;
    }

    @Override
    public void run() {
        KafkaConsumer consumer = ConsumerCreate(topic);
        while (true){
            ConsumerRecords<String, String> records =consumer.poll(100);
            for( ConsumerRecord<String, String> record: records){
                System.out.printf("offset = %d,key = %s ,value = %s%n",record.offset(),record.key(),record.value());
            }
        }
    }
}

           

測試類

package kafka;
/*
* Kafka Java 測試
* */

public class apptest {
    public static void main(String[] args) {
        new CustomConsumer(KafkaPro.TOPIC).start();
    }
}

           

伺服器端啟動消息生産

[[email protected] config]# kafka-console-producer.sh --broker-list localhost:9092 --topic java_kafka
>test

# 視窗顯示如下
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
offset = 39880380,key = null ,value = test

           

聯合測試

package kafka;
/*
* Kafka Java 測試
* */

public class apptest {
    public static void main(String[] args) {
        new KafkaProduct(KafkaPro.TOPIC).start();
        new CustomConsumer(KafkaPro.TOPIC).start();
    }
}

           

IDEA視窗顯示如下:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Send: message1
Send: message2
Send: message3
Send: message4
offset = 39880381,key = null ,value = message1
offset = 39880382,key = null ,value = message2
offset = 39880383,key = null ,value = message3
offset = 39880384,key = null ,value = message4
Send: message5
offset = 39880385,key = null ,value = message5
Send: message6
offset = 39880386,key = null ,value = message6
Send: message7
offset = 39880387,key = null ,value = message7
           

Scala版

配置類

package com.kafka

object KafkaProperties {
   final val ZK = "192.165.1.82:2181"
   final val TOPIC = "scala_kafka2"
   final val BROKER_LIST = "192.165.1.82:9092"
   final val GROUP_ID="Kafka_API"
}

           

消費者

package com.kafka

import java.util.Properties
import java.util
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

class CustomConsumer {
  def createConsumer(): Unit ={
    val props = new Properties()
    props.put("bootstrap.servers", KafkaProperties.BROKER_LIST)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("group.id", KafkaProperties.GROUP_ID)

    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String,String](props)

    consumer.subscribe(util.Arrays.asList(KafkaProperties.TOPIC))

    while(true){
      val records: ConsumerRecords[String, String] = consumer.poll(100)
      val value = records.iterator()
      while(value.hasNext){
        println(value.next())
      }
    }
  }
}

           

測試類

package com.kafka

object apptest {
  def main(args: Array[String]): Unit = {
    val consumer = new CustomConsumer()
    consumer.createConsumer()
  }
}

           

聯合測試

package com.kafka

object apptest {
  def main(args: Array[String]): Unit = {
    val producer = new CustomProducer()
    producer.createProducer()

    val consumer = new CustomConsumer()
    consumer.createConsumer()
  }
}