一、在linux上安裝confluent
下載下傳tar包
cd /usr/local
tar -zxvf confluent-5.0.0-2.11.tar.gz
cd confluent-5.0.0
bin/schema-registry-start etc/schema-registry/schema-registry.properties
二、添加依賴
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.9.0</version>
</dependency>
從 confluent-5.0.0裡lib包選擇這些jar包放進idea中

生産者代碼:
public class kafkaSend {
public static void main(String[] args) {
Properties kafkaProps=new Properties();
kafkaProps.put("bootstrap.servers","192.168.65.130:9092");
kafkaProps.put("key.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
kafkaProps.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
kafkaProps.put("schema.registry.url", "http://192.168.65.130:8081");
// String schemaString="{\"namespace\": \"CustomerManagerAvro.avro\", \"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\", \"default\": \"null\"}]}";
String schemaString="{\n" +
" \"namespace\": \"CustomerManagerAvro\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"Customer\",\n" +
" \"fields\": [\n" +
" {\"name\": \"id\", \"type\": \"int\"},\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"email\", \"type\": [\"null\", \"string\"]}\n" +
" ]\n" +
"}\n";
KafkaProducer producer=new KafkaProducer<String, GenericRecord>(kafkaProps);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
GenericData.Record customer = new GenericData.Record(schema);
customer.put("id",1);
customer.put("name","linyue");
customer.put("email","[email protected]");
ProducerRecord<String,GenericRecord> record = new ProducerRecord<String, GenericRecord>("CustomerTest",customer);
try {
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (
ExecutionException e) {
e.printStackTrace();
}
}
}
消費者代碼
public class kafkaRevice {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.65.130:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://192.168.65.130:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
//訂閱主題
//consumer.subscribe(Collections.singletonList("test"));//從分區的最新的偏移量開始
//從分區的開頭開始
TopicPartition tp = new TopicPartition("CustomerTest", 0);
List<TopicPartition> list = new ArrayList<TopicPartition>();
list.add(tp);
consumer.assign(list);//需要指派主題分區清單
consumer.seekToBeginning(Collections.singleton(tp));//從指定主題的特定分區開始
//輪詢
try{
while (true)
{
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
try {
System.out.println(record.value());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}catch (Exception e)
{
e.printStackTrace();
}finally {
consumer.close();
System.out.println("測試消費者!");
}
}
}