天天看點

Kafka:Streams實作單詞統計

測試代碼

​pom.xml​

​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>
</project>      

建立​

​Topic​

​:

package com.kaven.kafka.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

public class Admin {

    // 基于Kafka服務位址與請求逾時時間來建立AdminClient執行個體
    private static final AdminClient adminClient = Admin.getAdminClient(
            "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094",
            "40000");

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Admin admin = new Admin();
        // 建立Topic,Topic名稱為topic-in,分區數為1,複制因子為1
        admin.createTopic("topic-in", 1, (short) 1);
        // 建立Topic,Topic名稱為topic-out,分區數為1,複制因子為1
        admin.createTopic("topic-out", 1, (short) 1);
    }

    public static AdminClient getAdminClient(String address, String requestTimeoutMS) {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        properties.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMS);
        return AdminClient.create(properties);
    }

    public void createTopic(String name, int numPartitions, short replicationFactor) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic(name, numPartitions, replicationFactor))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name__, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name__);
                latch.countDown();
            });
        });
        latch.await();
    }
}      

​Producer​

​釋出消息:

package com.kaven.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerTest {

    public static final String[] MESSAGES = new String[]{
            "Give me the strength lightly to bear my joys and sorrows",
            "Give me the strength to make my love fruitful in service",
            "Give me the strength never to disown the poor or bend my knees before insolent might",
            "Give me the strength to raise my mind high above daily trifles",
            "And give me the strength to surrender my strength to thy will with love"
    };

    public static void main(String[] args) {
        send("topic-in");
    }

    public static void send(String name) {
        Producer<String, String> producer = ProducerTest.createProducer();
        for (int i = 0; i < MESSAGES.length; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
                    name,
                    "key-" + i,
                    MESSAGES[i]
            );
            // 異步發送并回調
            producer.send(producerRecord, (metadata, exception) -> {
                if(exception == null) {
                    System.out.printf("topic: %s, partition: %s, offset: %s\n", name, metadata.partition(), metadata.offset());
                }
                else {
                    exception.printStackTrace();
                }
            });
        }
        // 要關閉Producer執行個體
        producer.close();
    }

    public static Producer<String, String> createProducer() {
        // Producer的配置
        Properties properties = new Properties();
        // 服務位址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
        // KEY的序列化器類
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // VALUE的序列化器類
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        return new KafkaProducer<>(properties);
    }
}      

​Consumer​

​消費消息:

package com.kaven.kafka.consumer;

import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.util.*;

public class ConsumerTest {

    public static void main(String[] args) {
        subscribeTopicList(Collections.singletonList("topic-out"));
    }

    public static void subscribeTopicList(List<String> topicList) {
        KafkaConsumer<String, Long> consumer = createConsumer();
        consumer.subscribe(topicList);
        while (true) {
            ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(10000));
            records.forEach((record) -> {
                System.out.printf("topic: %s, partition: %s, offset: %s, key: %s, value: %d\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            });
        }
    }

    public static KafkaConsumer<String, Long> createConsumer() {
        // Consumer的配置
        Properties properties = new Properties();
        // 服務位址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
        // 組ID,用于辨別此消費者所屬的消費者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kaven-test");
        // 開啟offset自動送出
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 消費者offset自動送出到Kafka的頻率(以毫秒為機關)
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // KEY的反序列化器類
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // VALUE的反序列化器類
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongDeserializer");

        return new KafkaConsumer<>(properties);
    }
}      

​Streams​

​處理消息:

package com.kaven.kafka.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class StreamTest {
    public static void main(String[] args) {
        // 流的配置
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 建構流的拓撲
        StreamsBuilder builder = createStreamsBuilder();

        // 基于流的拓撲和配置建立KafkaStreams執行個體
        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
    }

    // 建構流的拓撲
    public static StreamsBuilder createStreamsBuilder() {
        StreamsBuilder builder = new StreamsBuilder();

        // 源,從Topic(topic-in)中擷取消息
        KStream<String, String> source = builder.stream("topic-in");
        /*
        * 處理,即單詞統計
        * 1 根據空格分割字元串,一個字元串可以得到一個單詞清單
        * 2 基于value(分組鍵)進行分組,此時的value是單詞
        * 3 按分組鍵計算此流中的記錄數
        * */
        KTable<String, Long> count = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
                .groupBy((key, value) -> value)
                .count();

        // 将資料流入Topic(topic-out)
        count.toStream().to("topic-out", Produced.with(Serdes.String(), Serdes.Long()));
        return builder;
    }
}      

先建立​

​topic-in​

​​和​

​topic-out​

​​這兩個​

​Topic​

​​,然後使用​

​Producer​

​​釋出消息到​

​topic-in​

​​,之後使用​

​Streams​

​​處理消息(即單詞統計,統計​

​topic-in​

​​中的消息,再将統計資料流入​

​topic-out​

​​),最後使用​

​Consumer​

​​消費​

​topic-out​

​中的消息。

​Consumer​

​程式輸出:

topic: topic-out, partition: 0, offset: 35, key: lightly, value: 2
topic: topic-out, partition: 0, offset: 36, key: bear, value: 2
topic: topic-out, partition: 0, offset: 37, key: joys, value: 2
topic: topic-out, partition: 0, offset: 38, key: sorrows, value: 2
topic: topic-out, partition: 0, offset: 39, key: make, value: 2
topic: topic-out, partition: 0, offset: 40, key: fruitful, value: 2
topic: topic-out, partition: 0, offset: 41, key: in, value: 2
topic: topic-out, partition: 0, offset: 42, key: service, value: 2
topic: topic-out, partition: 0, offset: 43, key: never, value: 2
topic: topic-out, partition: 0, offset: 44, key: disown, value: 2
topic: topic-out, partition: 0, offset: 45, key: poor, value: 2
topic: topic-out, partition: 0, offset: 46, key: or, value: 2
topic: topic-out, partition: 0, offset: 47, key: bend, value: 2
topic: topic-out, partition: 0, offset: 48, key: knees, value: 2
topic: topic-out, partition: 0, offset: 49, key: before, value: 2
topic: topic-out, partition: 0, offset: 50, key: insolent, value: 2
topic: topic-out, partition: 0, offset: 51, key: might, value: 2
topic: topic-out, partition: 0, offset: 52, key: raise, value: 2
topic: topic-out, partition: 0, offset: 53, key: mind, value: 2
topic: topic-out, partition: 0, offset: 54, key: high, value: 2
topic: topic-out, partition: 0, offset: 55, key: above, value: 2
topic: topic-out, partition: 0, offset: 56, key: daily, value: 2
topic: topic-out, partition: 0, offset: 57, key: trifles, value: 2
topic: topic-out, partition: 0, offset: 58, key: and, value: 4
topic: topic-out, partition: 0, offset: 59, key: give, value: 10
topic: topic-out, partition: 0, offset: 60, key: me, value: 10
topic: topic-out, partition: 0, offset: 61, key: the, value: 12
topic: topic-out, partition: 0, offset: 62, key: surrender, value: 2
topic: topic-out, partition: 0, offset: 63, key: my, value: 10
topic: topic-out, partition: 0, offset: 64, key: strength, value: 12
topic: topic-out, partition: 0, offset: 65, key: to, value: 12
topic: topic-out, partition: 0, offset: 66, key: thy, value: 2
topic: topic-out, partition: 0, offset: 67, key: will, value: 2
topic: topic-out, partition: 0, offset: 68, key: with, value: 2
topic: topic-out, partition: 0, offset: 69, key: love, value: 4      

符合預期,因為部落客通過​

​Producer​

​​釋出了兩次消息到​

​topic-in​

​。

​KStream​

​​提供了豐富的流操作,類比于​

​Java​

​​提供的​

​Stream​

​​,源碼分析部落客以後再介紹,修改​

​createStreamsBuilder​

​方法:

// 建構流的拓撲
    public static StreamsBuilder createStreamsBuilder() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // 源,從Topic(topic-in)中擷取消息
        KStream<String, String> source = builder.stream("topic-in");

        // 基于空格分割字元串并直接輸出
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
                .foreach((key, value) -> System.out.printf("key: %s, value: %s\n", key, value));

        return builder;
    }      
key: key-0, value: give
key: key-0, value: me
key: key-0, value: the
key: key-0, value: strength
key: key-0, value: lightly
key: key-0, value: to
key: key-0, value: bear
key: key-0, value: my
key: key-0, value: joys
key: key-0, value: and
key: key-0, value: sorrows
key: key-1, value: give
key: key-1, value: me
key: key-1, value: the
key: key-1, value: strength
key: key-1, value: to
key: key-1, value: make
key: key-1, value: my
key: key-1, value: love
key: key-1, value: fruitful
key: key-1, value: in
key: key-1, value: service
key: key-2, value: give
key: key-2, value: me
key: key-2, value: the
key: key-2, value: strength
key: key-2, value: never
key: key-2, value: to
key: key-2, value: disown
key: key-2, value: the
key: key-2, value: poor
key: key-2, value: or
key: key-2, value: bend
key: key-2, value: my
key: key-2, value: knees
key: key-2, value: before
key: key-2, value: insolent
key: key-2, value: might
key: key-3, value: give
key: key-3, value: me
key: key-3, value: the
key: key-3, value: strength
key: key-3, value: to
key: key-3, value: raise
key: key-3, value: my
key: key-3, value: mind
key: key-3, value: high
key: key-3, value: above
key: key-3, value: daily
key: key-3, value: trifles
key: key-4, value: and
key: key-4, value: give
key: key-4, value: me
key: key-4, value: the
key: key-4, value: strength
key: key-4, value: to
key: key-4, value: surrender
key: key-4, value: my
key: key-4, value: strength
key: key-4, value: to
key: key-4, value: thy
key: key-4, value: will
key: key-4, value: with
key: key-4, value: love