天天看點

Kafka整合Avrot進行序列化和反序列化

一、在linux上安裝confluent

下載下傳tar包

cd /usr/local
tar -zxvf confluent-5.0.0-2.11.tar.gz
cd confluent-5.0.0
bin/schema-registry-start etc/schema-registry/schema-registry.properties
           

二、添加依賴

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.9.0</version>
</dependency>
 
<dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-tools</artifactId>
      <version>1.9.0</version>
</dependency>
           

從 confluent-5.0.0裡lib包選擇這些jar包放進idea中

Kafka整合Avrot進行序列化和反序列化

生産者代碼:

public class kafkaSend {
    public static void main(String[] args) {
        Properties kafkaProps=new Properties();
        kafkaProps.put("bootstrap.servers","192.168.65.130:9092");
        kafkaProps.put("key.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
        kafkaProps.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
        kafkaProps.put("schema.registry.url", "http://192.168.65.130:8081");

        // String schemaString="{\"namespace\": \"CustomerManagerAvro.avro\", \"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"email\", \"type\":  \"string\", \"default\": \"null\"}]}";
        String schemaString="{\n" +
                "    \"namespace\": \"CustomerManagerAvro\",\n" +
                "    \"type\": \"record\",\n" +
                "    \"name\": \"Customer\",\n" +
                "    \"fields\": [\n" +
                "        {\"name\": \"id\", \"type\": \"int\"},\n" +
                "        {\"name\": \"name\",  \"type\": \"string\"},\n" +
                "        {\"name\": \"email\", \"type\": [\"null\", \"string\"]}\n" +
                "    ]\n" +
                "}\n";
        KafkaProducer producer=new KafkaProducer<String, GenericRecord>(kafkaProps);
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaString);



        GenericData.Record customer = new GenericData.Record(schema);
        customer.put("id",1);
        customer.put("name","linyue");
        customer.put("email","[email protected]");
        ProducerRecord<String,GenericRecord> record = new ProducerRecord<String, GenericRecord>("CustomerTest",customer);
        try {
            producer.send(record).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (
                ExecutionException e) {
            e.printStackTrace();
        }

    }

}
           

消費者代碼

public class kafkaRevice {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.65.130:9092");
        props.put("group.id", "CountryCounter");
        props.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://192.168.65.130:8081");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);


        //訂閱主題
        //consumer.subscribe(Collections.singletonList("test"));//從分區的最新的偏移量開始
        //從分區的開頭開始
        TopicPartition tp = new TopicPartition("CustomerTest", 0);
        List<TopicPartition> list = new ArrayList<TopicPartition>();
        list.add(tp);
        consumer.assign(list);//需要指派主題分區清單
        consumer.seekToBeginning(Collections.singleton(tp));//從指定主題的特定分區開始
        //輪詢
        try{
            while (true)
            {
                ConsumerRecords<String,  GenericRecord> records = consumer.poll(100);
                for (ConsumerRecord<String, GenericRecord> record : records) {

                    try {
                        System.out.println(record.value());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }catch (Exception e)
        {
            e.printStackTrace();
        }finally {
            consumer.close();
            System.out.println("測試消費者!");
        }
    }

}
           

繼續閱讀