天天看點

kafka生産者API入門-01

廢話不多說,直接上代碼:

package com.scathon.kafka;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Unit test for simple App.
 */
public class kafkaApiTest {
    private static final Properties producerPropertries = new Properties();
    private static final Properties consumerProperties = new Properties();

    static {
        //=====producerPropertries======
        producerPropertries.put("bootstrap.servers", "192.168.138.102:9092,192.168.138.103:9092");
        producerPropertries.put("acks", "all");
        producerPropertries.put("retries", 0);
        producerPropertries.put("batch.size", 16384);
        producerPropertries.put("linger.ms", 1);
        producerPropertries.put("buffer.memory", 33554432);
        producerPropertries.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerPropertries.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //=====consumerProperties=======
        // 新版不建議使用zookeeper
        //properties.put("zookeeper.connect", "node-2:2181,node-3:2181,node-4:2181");
        consumerProperties.put("bootstrap.servers", "192.168.138.102:9092,192.168.138.103:9092");
        consumerProperties.put("group.id", "test-consumer-group");
        consumerProperties.put("enable.auto.commit", "true");
        consumerProperties.put("auto.commit.interval.ms", "1000");
        consumerProperties.put("auto.offset.reset", "earliest");
        consumerProperties.put("session.timeout.ms", "30000");
        consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    // 多線程版本的生産者
    @Test
    public void producerWithMultiThread() throws InterruptedException {
        int threadCount = 5;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        Thread[] threads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++) {
            threads[i] = new Thread(() -> {
                KafkaProducer<String, String> producer = new KafkaProducer<>(producerPropertries);
                for (int msgIndex = 0; msgIndex < 10; msgIndex++) {
                    String message = Thread.currentThread().getName() + "-message-" + msgIndex;
                    producer.send(new ProducerRecord<>("test-01", message), (metadata, exception) -> {
                        System.out.println(Thread.currentThread().getName() + "-" + message + "-succ");
                    });
                }
                System.out.println("==================");
                countDownLatch.countDown();
            });
        }
        for (int i = 0; i < threadCount; i++) {
            threads[i].start();
        }
        countDownLatch.await();
    }

    @Test
    public void producerApiTest() {
        KafkaProducer<String, String> producer = null;
        try {
            producer = new KafkaProducer<>(producerPropertries);
            for (int i = 0; i < 100; i++) {
                String msg = "message" + i;
                producer.send(new ProducerRecord<>("test-01", msg), ((metadata, exception) -> {
                    System.out.println("資料發送完畢。。。");
                    System.out.println(metadata);
                }));
                System.out.println("send:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }


    //新版本的消費者API
    @Test
    public void consumerApiTest() {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
        consumer.subscribe(Arrays.asList("test-01", "flume_kafka"));
        consumer.seekToBeginning();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(0);
            records.forEach(record -> System.out.printf("offset = %d ,value = %s\n", record.offset(), record.value()));
        }
    }


    //舊版本的消費者API
    @Test
    public void consumerOldStreamApi() {
        ConsumerConnector consumer = kafka.consumer.Consumer
                .createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProperties));
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("test-01", 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> kafkaStreams = messageStreams.get("test-01").get(0);
        ConsumerIterator<byte[], byte[]> iterator = kafkaStreams.iterator();
        while (iterator.hasNext()) {
            System.out.println(new String(iterator.next().message()));
        }
    }
}
           

運作生産者單元測試之後,可以運作消費者API,可以檢視控制台顯示:

kafka生産者API入門-01

資料截圖沒截全,意思就這麼個意思,

另外可以通過

kafka-console-consumer.sh --zookeeper node-2:2181,node-3:2181 --topic test-01指令在kafka伺服器中檢視。

kafka生産者API入門-01

注意:demo中broker的位址,包括zookeeper的位址,是我自己的kafka叢集位址,大家如果要運作這個demo的話,别忘記了将相應的參數改成自己的叢集位址。

繼續閱讀