天天看點

Kafka Streams示範程式

本文從以下六個方面詳細介紹Kafka Streams的示範程式:

Step 1: 下載下傳代碼

Step 2: 啟動kafka服務

Step 3: 準備輸入topic并啟動Kafka生産者

Step 4: 啟動 Wordcount 程式

Step 5: 處理資料

Step 6: 停止應用

本教程假定你第一次,且沒有搭建現有的Kafka或ZooKeeper。但是,如果你已經啟動了Kafka和ZooKeeper,請跳過前兩個步驟。

Kafka Streams結合了在用戶端編寫和部署标準Java和Scala應用程式的簡單性以及Kafka伺服器端叢集技術的優勢,使這些應用程式具有高度可伸縮性,彈性,容錯性,分布式等特性。

這個快速入門示例将示範如何運作一個流應用程式。一個

WordCountDemo

的例子(為了友善閱讀,使用的是java8 lambda表達式)

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
    Consumed.with(stringSerde, stringSerde);

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count()

// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
           

從輸入的文本計算出一個詞出現的次數。但是,不像其他的

WordCount

的例子,你可能會看到,在有限的資料基礎上,執行的示範應用程式的行為略有不同,因為它應該是在一個無限資料的操作,資料流。類似的有界變量,它是一種動态算法,跟蹤和更新的單詞計數。然而,由于它必須假設潛在的無界輸入資料,它會定期輸出其目前狀态和結果,同時繼續處理更多的資料,因為它不知道什麼時候它處理過的“所有”的輸入資料。

作為第一步,我們将啟動Kafka,然後我們将輸入資料準備到Kafka主題,然後由Kafka Streams應用程式處理。

下載下傳

1.1.0版本

并解壓它。注意,有多個可下載下傳的Scala版本,我們選擇在這裡使用推薦版本(2.11):

> tar -xzf kafka_2.11-1.1.0.tgz
> cd kafka_2.11-1.1.0
           

Kafka使用Zookeeper,是以第一步啟動Zookeeper服務。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
           

現在啟動 Kafka server:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
           

接下來,我們建立一個輸入主題“streams-plaintext-input”,和一個輸出主題"streams-wordcount-output":

> bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".
           

注意:因為輸出主題是更新日志流(參見下面的應用程式輸出的說明),是以我們為輸出主題啟用了

壓縮

> bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
    Created topic "streams-wordcount-output".
           

也可以使用kafka topic工具檢視主題描述:

> bin/kafka-topics.sh --zookeeper localhost:2181 --describe

Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0
           

以下指令啟動WordCount示範程式:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
           

示範程式将從輸入主題

streams-plaintext-input

中讀取,對每個讀取消息執行

WordCount算法

計算,并将其目前結果連續寫入輸出主題

streams-wordcount-output

。 是以,除了日志條目外,不會有任何STDOUT輸出,因為結果會寫回到Kafka中。

現在我們另外開一個終端,來啟動生産者來為該主題寫入一些輸入資料:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
           

在開一個終端,讀取輸出主題的資料。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
           

現在,我們通過輸入一行文本然後按,生産一些新的消息到輸入主題

streams-plaintext-input

。其中消息key為空,消息value為剛剛輸入的字元串編碼文本行(實際上,應用程式的輸入資料通常會連續流入Kafka,而不是 像我們在這個快速入門中那樣手動輸入):

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
           

這些消息将被Wordcount程式處理,然後輸出資料到

streams-wordcount-output

主題中,我們新打開一個指令視窗,輸出消費者:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1
streams 1
lead    1
to      1
kafka   1
           

這裡,第一列是java.lang.String格式的Kafka消息key,表示正在計數的單詞,第二列是java.lang.Longformat中的消息value,表示該單詞的最新計數。

現在,用生産者繼續往streams-plaintext-input主題中發消息,輸入"hello kafka streams",然後:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
           

在消費者指令視窗,你可以觀察WordCount程式寫入到輸出主題的資料:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
           

在這裡,最後一行列印行

kafka 2

streams 2

表示計數已經從1遞增到2。每當你向輸入主題寫入更多的輸入消息時,你将觀察到新的消息被添加到

streams-wordcount-output

主題,表示由WordCount應用程式計算出的最新字數。讓我們輸入一個最終的輸入文本行“join kafka summit”,然後在控制台生産者中輸入主題streams-wordcount-input之前的:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
all streams lead to kafka
hello kafka streams
join kafka summit
           

streams-wordcount-output主題随後将顯示相應的更新變化(請參見最後三行):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1
           

可以看到,Wordcount應用程式的輸出實際上是一個連續的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是諸如“kafka”的記錄關鍵字。 對于具有相同密鑰的多個記錄,每個後面的記錄都是前一個記錄的更新。

下面的兩張圖說明了幕後發生的事情。第一列顯示KTable <string,long>目前狀态的演變,它計數count的單詞出現次數。 第二列顯示從KTable的狀态更新以及發送到輸出主題streams-wordcount-output的更改記錄。

首先正在處理文本行“all streams lead to kafka”。KTable正在建立,因為每個新單詞都會生成一個新表格(用綠色背景突出顯示),并将相應的更改記錄發送到下遊KStream。

當處理第二行文本“hello kafka streams”時,我們首次觀察到KTable中現有的條目正在被更新(這裡是:“kafka”和“streams”)。 再次,更改記錄發送到輸出主題。

(我們跳過了第三行如何處理的說明)。這解釋了為什麼輸出主題具有我們上面顯示的内容,因為它包含完整的變更記錄。

在這個例子的範圍之外,Kafka Streams在這裡做的是利用表和變更日志流之間的對偶性(這裡:table = KTable,changelog stream =下遊KStream):你可以釋出table轉換為流,并且如果你從頭到尾使用整個變更日志流,則可以重新建構表的内容。

最後,通過

Ctrl-C

停止控制台消費者,生産者,Wordcount程式,Kafka Broker和Zokeeper服務。

本文轉發自 http://orchome.com/936

關于Kafka深入學習視訊, 如Kafka上司選舉, offset管理, Streams接口, 高性能之道, 監控運維, 性能測試等,

請關注個人微信公衆号: 求學之旅, 發送Kafka, 即可收獲Kafka學習視訊大禮包一枚。 

Kafka Streams示範程式

作者:HuZixia

出處:http://www.cnblogs.com/huzixia/

本文版權歸作者和部落格園共有,歡迎轉載,但必須給出原文連結,并保留此段聲明,否則保留追究法律責任的權利。