Kafka-Consumer-Java
- Maven 依賴
- Java版
-
- 配置類
- 消費者
- 測試類
- 聯合測試
- Scala版
-
- 配置類
- 消費者
- 測試類
- 聯合測試
本文使用java api來實作 kafka消費,配合伺服器生産消息/Java代碼生産消息
Kafka-Producer JAVA & Scala 實作
Maven 依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
Java版
配置類
package kafka;
/**
* Kafka常用配置
*/
public class KafkaPro {
public final static String ZK = "192.165.1.82:2181";
public final static String TOPIC = "java_kafka";
// 指定broker的位址清單,位址格式為 host : port
public final static String BROKER_LIST = "192.165.1.82:9092";
public final static String GROUP_ID = "Kafka_API";
}
消費者
package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer extends Thread{
private String topic;
public CustomConsumer(String topic){
this.topic = topic;
}
private KafkaConsumer ConsumerCreate (String topic){
Properties props = new Properties();
// Kafka 伺服器清單
props.put("bootstrap.servers",KafkaPro.BROKER_LIST);
// 組ID
props.put("group.id",KafkaPro.GROUP_ID);
// 開啟自動送出offset
props.put("enable.auto.commit","true");
// 每1000 ms 送出一次 offset,必須開啟自動送出offset
props.put("auto.commit.interval.ms","1000");
// 使用該類将值對象序列化為位元組數組
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//訂閱主題
consumer.subscribe(Arrays.asList(topic));
return consumer;
}
@Override
public void run() {
KafkaConsumer consumer = ConsumerCreate(topic);
while (true){
ConsumerRecords<String, String> records =consumer.poll(100);
for( ConsumerRecord<String, String> record: records){
System.out.printf("offset = %d,key = %s ,value = %s%n",record.offset(),record.key(),record.value());
}
}
}
}
測試類
package kafka;
/*
* Kafka Java 測試
* */
public class apptest {
public static void main(String[] args) {
new CustomConsumer(KafkaPro.TOPIC).start();
}
}
伺服器端啟動消息生産
[[email protected] config]# kafka-console-producer.sh --broker-list localhost:9092 --topic java_kafka
>test
# 視窗顯示如下
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
offset = 39880380,key = null ,value = test
聯合測試
package kafka;
/*
* Kafka Java 測試
* */
public class apptest {
public static void main(String[] args) {
new KafkaProduct(KafkaPro.TOPIC).start();
new CustomConsumer(KafkaPro.TOPIC).start();
}
}
IDEA視窗顯示如下:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/host/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Send: message1
Send: message2
Send: message3
Send: message4
offset = 39880381,key = null ,value = message1
offset = 39880382,key = null ,value = message2
offset = 39880383,key = null ,value = message3
offset = 39880384,key = null ,value = message4
Send: message5
offset = 39880385,key = null ,value = message5
Send: message6
offset = 39880386,key = null ,value = message6
Send: message7
offset = 39880387,key = null ,value = message7
Scala版
配置類
package com.kafka
object KafkaProperties {
final val ZK = "192.165.1.82:2181"
final val TOPIC = "scala_kafka2"
final val BROKER_LIST = "192.165.1.82:9092"
final val GROUP_ID="Kafka_API"
}
消費者
package com.kafka
import java.util.Properties
import java.util
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
class CustomConsumer {
def createConsumer(): Unit ={
val props = new Properties()
props.put("bootstrap.servers", KafkaProperties.BROKER_LIST)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", KafkaProperties.GROUP_ID)
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String,String](props)
consumer.subscribe(util.Arrays.asList(KafkaProperties.TOPIC))
while(true){
val records: ConsumerRecords[String, String] = consumer.poll(100)
val value = records.iterator()
while(value.hasNext){
println(value.next())
}
}
}
}
測試類
package com.kafka
object apptest {
def main(args: Array[String]): Unit = {
val consumer = new CustomConsumer()
consumer.createConsumer()
}
}
聯合測試
package com.kafka
object apptest {
def main(args: Array[String]): Unit = {
val producer = new CustomProducer()
producer.createProducer()
val consumer = new CustomConsumer()
consumer.createConsumer()
}
}