天天看點

阿裡雲ECS體驗 —— 建構kafka應用程式

# 大資料系統基礎 — Kafka

# 問題一:若kafka設定的副本級别為2,partition數為3,請闡述“Hello kafka”, “Hello world”, “kafka”三條資訊的傳輸流程。

根據題意,目前副本級别(replication)為2,partition數為3。其中parition 0 的leader為broker 1;parition 1的leader為broker 2; partition 3的leader為broker 3,于是

- 由于資訊“hello kafka"未設定flag,是以采用round-robin方式進行配置設定,不妨設配置設定到partition 0。由于parition 0的leader是broker 1,資訊傳輸至broker 1,由leader partition 0再配置設定至broker 2與broker 3;

- 資訊未設定flag,根據round-robin,配置設定至partition 1,即leader partition 所在的broker 2,接着由broker 2将資訊發送至broker 1與broker 3;

- 資訊未設定flag,配置設定至partition 2,即broker 3,接着由leader 負責将資訊同步到broker 1與broker 3.

# 問題二:WordCount

## 1. 啟動kafka

1. 首先啟動zookeeper,這是kafka目前的依賴環境

```jsx

$ bin/zkServer.sh start

```

1. 接着,啟動kafka服務

$ bin/kafka-server-start.sh config/server.properties

1. 此時,新開一個terminal,建立一個topic

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:

    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

1. 接着,運作producer,負責寫入資訊

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

This is my first event

This is my second event

1. 接着,新開一個terminal,啟動consumer,負責讀取資訊

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

執行個體運作:

![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled.png)

## 2. 文本單詞統計

        為了統計給定文本的單詞數量,并持久化存儲,需要一個producer從檔案中讀取資料,存放至"streams-plaintext-input";一個從"streams-plaintext-input"到"streams-wordcount-output"的stream負責進行統計;最後需要一個consumer将"streams-wordcount-output"的結果讀取并儲存。

結果如下圖所示:

![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%201.png)

Figure 1. Producer從文本中讀取,并上傳。

![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%202.png)

Figure 2. Stream對文本進行統計,結果傳送至另一topic。

![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%203.png)

Figure 3. Consumer 讀取結果,并儲存。

![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%204.png)

Figure 4. 輸出結果

# 附錄:源代碼

doc.txt

The Stanford neurologist, a soft-spoken demi-prodigy who became a professor while still a resident, had been obsessed for a decade with how to better define psychiatric disorders. Drugs for depression or bipolar disorder didn’t work for many patients with the conditions, and he suspected the reason was how traditional diagnoses didn’t actually get at the heart of what was going on in a patient’s brain.

He had shown in 2006 that some patients with different diagnoses — PTSD vs. depression, for example — looked remarkably similar under brain imaging, suggesting clinicians drew distinctions in the wrong places. And in 2014, he showed that one could define patients by looking at individual discrete behaviors, such as attention or sleep.

Load.java

```java

package myapps;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;

import java.io.FileReader;

import java.util.Properties;

public class Load {

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();

        props.put("bootstrap.servers", "8.130.172.60:9092");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> producer = new KafkaProducer<>(props);

        BufferedReader plaintxt =

                new BufferedReader(

                        new FileReader("C:\\Users\\yiqiao\\IdeaProjects\\kafkaDemo\\src\\main\\resources\\doc.txt"));

        String str;

        while ((str = plaintxt.readLine()) != null) {

            ProducerRecord<String,String> record =

                    new ProducerRecord<>("streams-plaintext-input","1", str);

            producer.send(record);

            System.out.println(str);

        }

        producer.close();

        System.exit(0);

    }

}

WordCount.java

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.common.utils.Bytes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.kstream.KeyValueMapper;

import org.apache.kafka.streams.kstream.Materialized;

import org.apache.kafka.streams.kstream.Produced;

import org.apache.kafka.streams.kstream.ValueMapper;

import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;

import java.util.Locale;

import java.util.concurrent.CountDownLatch;

public class WordCount {

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");

        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.172.60:9092");

        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.<String, String>stream("streams-plaintext-input")

               .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))

               .groupBy((key, value) -> value)

               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))

               .toStream()

               .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);

        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {

            @Override

            public void run() {

                streams.close();

                latch.countDown();

            }

        });

        try {

            streams.start();

            latch.await();

        } catch (Throwable e) {

            System.exit(1);

Output.java

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.LongDeserializer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.*;

import java.time.Duration;

public class Output {

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "Word Counter");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.172.60:9092");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("streams-wordcount-output"));

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            BufferedWriter count_plain = new BufferedWriter(new OutputStreamWriter(

                    new FileOutputStream("C:\\Users\\yiqiao\\IdeaProjects\\kafkaDemo\\src\\main" +

                            "\\resources\\count_plain.txt", true)));

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(record.key() + " " + String.valueOf(record.value()));

                count_plain.write(record.key() + " " + String.valueOf(record.value()) + "\n");

            count_plain.close();

pom.xml

```html

<?xml version="1.0" encoding="UTF-8"?>

<!--

   Licensed to the Apache Software Foundation (ASF) under one or more

   contributor license agreements.  See the NOTICE file distributed with

   this work for additional information regarding copyright ownership.

   The ASF licenses this file to You under the Apache License, Version 2.0

   (the "License"); you may not use this file except in compliance with

   the License.  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software

   distributed under the License is distributed on an "AS IS" BASIS,

   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

   See the License for the specific language governing permissions and

   limitations under the License.

--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd"

>

    <modelVersion>4.0.0</modelVersion>

    <groupId>streams.examples</groupId>

    <artifactId>streams.examples</artifactId>

    <version>0.1</version>

    <packaging>jar</packaging>

    <name>Kafka Streams Quickstart :: Java</name>

    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <kafka.version>3.0.0</kafka.version>

        <slf4j.version>1.7.7</slf4j.version>

        <log4j.version>1.2.17</log4j.version>

    </properties>

    <repositories>

        <repository>

            <id>apache.snapshots</id>

            <name>Apache Development Snapshot Repository</name>

            <url>

https://repository.apache.org/content/repositories/snapshots/

</url>

            <releases>

                <enabled>false</enabled>

            </releases>

            <snapshots>

            </snapshots>

        </repository>

    </repositories>

    <!--

        Execute "mvn clean package -Pbuild-jar"

        to build a jar file out of this project!

    -->

    <build>

        <plugins>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.1</version>

                <configuration>

                    <source>1.8</source>

                    <target>1.8</target>

                </configuration>

            </plugin>

        </plugins>

        <pluginManagement>

            <plugins>

                <plugin>

                    <artifactId>maven-compiler-plugin</artifactId>

                    <configuration>

                        <source>1.8</source>

                        <target>1.8</target>

                        <compilerId>jdt</compilerId>

                    </configuration>

                    <dependencies>

                        <dependency>

                            <groupId>org.eclipse.tycho</groupId>

                            <artifactId>tycho-compiler-jdt</artifactId>

                            <version>0.21.0</version>

                        </dependency>

                    </dependencies>

                </plugin>

                    <groupId>org.eclipse.m2e</groupId>

                    <artifactId>lifecycle-mapping</artifactId>

                    <version>1.0.0</version>

                        <lifecycleMappingMetadata>

                            <pluginExecutions>

                                <pluginExecution>

                                    <pluginExecutionFilter>

                                        <groupId>org.apache.maven.plugins</groupId>

                                        <artifactId>maven-assembly-plugin</artifactId>

                                        <versionRange>[2.4,)</versionRange>

                                        <goals>

                                            <goal>single</goal>

                                        </goals>

                                    </pluginExecutionFilter>

                                    <action>

                                        <ignore />

                                    </action>

                                </pluginExecution>

                                        <artifactId>maven-compiler-plugin</artifactId>

                                        <versionRange>[3.1,)</versionRange>

                                            <goal>testCompile</goal>

                                            <goal>compile</goal>

                            </pluginExecutions>

                        </lifecycleMappingMetadata>

            </plugins>

        </pluginManagement>

    </build>

    <dependencies>

        <!-- Apache Kafka dependencies -->

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-streams</artifactId>

            <version>${kafka.version}</version>

        </dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-api</artifactId>

            <version>${slf4j.version}</version>

            <artifactId>slf4j-simple</artifactId>

    </dependencies>

</project>