本文從以下六個方面詳細介紹Kafka Streams的示範程式:
Step 1: 下載下傳代碼
Step 2: 啟動kafka服務
Step 3: 準備輸入topic并啟動Kafka生産者
Step 4: 啟動 Wordcount 程式
Step 5: 處理資料
Step 6: 停止應用
本教程假定你第一次,且沒有搭建現有的Kafka或ZooKeeper。但是,如果你已經啟動了Kafka和ZooKeeper,請跳過前兩個步驟。
Kafka Streams結合了在用戶端編寫和部署标準Java和Scala應用程式的簡單性以及Kafka伺服器端叢集技術的優勢,使這些應用程式具有高度可伸縮性,彈性,容錯性,分布式等特性。
這個快速入門示例将示範如何運作一個流應用程式。一個
WordCountDemo
的例子(為了友善閱讀,使用的是java8 lambda表達式)
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
Consumed.with(stringSerde, stringSerde);
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count()
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
從輸入的文本計算出一個詞出現的次數。但是,不像其他的
WordCount
的例子,你可能會看到,在有限的資料基礎上,執行的示範應用程式的行為略有不同,因為它應該是在一個無限資料的操作,資料流。類似的有界變量,它是一種動态算法,跟蹤和更新的單詞計數。然而,由于它必須假設潛在的無界輸入資料,它會定期輸出其目前狀态和結果,同時繼續處理更多的資料,因為它不知道什麼時候它處理過的“所有”的輸入資料。
作為第一步,我們将啟動Kafka,然後我們将輸入資料準備到Kafka主題,然後由Kafka Streams應用程式處理。
下載下傳
1.1.0版本
并解壓它。注意,有多個可下載下傳的Scala版本,我們選擇在這裡使用推薦版本(2.11):
> tar -xzf kafka_2.11-1.1.0.tgz
> cd kafka_2.11-1.1.0
Kafka使用Zookeeper,是以第一步啟動Zookeeper服務。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
現在啟動 Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
接下來,我們建立一個輸入主題“streams-plaintext-input”,和一個輸出主題"streams-wordcount-output":
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Created topic "streams-plaintext-input".
注意:因為輸出主題是更新日志流(參見下面的應用程式輸出的說明),是以我們為輸出主題啟用了
壓縮
。
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Created topic "streams-wordcount-output".
也可以使用kafka topic工具檢視主題描述:
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
以下指令啟動WordCount示範程式:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
示範程式将從輸入主題
streams-plaintext-input
中讀取,對每個讀取消息執行
WordCount算法
計算,并将其目前結果連續寫入輸出主題
streams-wordcount-output
。 是以,除了日志條目外,不會有任何STDOUT輸出,因為結果會寫回到Kafka中。
現在我們另外開一個終端,來啟動生産者來為該主題寫入一些輸入資料:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
在開一個終端,讀取輸出主題的資料。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
現在,我們通過輸入一行文本然後按,生産一些新的消息到輸入主題
streams-plaintext-input
。其中消息key為空,消息value為剛剛輸入的字元串編碼文本行(實際上,應用程式的輸入資料通常會連續流入Kafka,而不是 像我們在這個快速入門中那樣手動輸入):
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
這些消息将被Wordcount程式處理,然後輸出資料到
streams-wordcount-output
主題中,我們新打開一個指令視窗,輸出消費者:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
這裡,第一列是java.lang.String格式的Kafka消息key,表示正在計數的單詞,第二列是java.lang.Longformat中的消息value,表示該單詞的最新計數。
現在,用生産者繼續往streams-plaintext-input主題中發消息,輸入"hello kafka streams",然後:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
在消費者指令視窗,你可以觀察WordCount程式寫入到輸出主題的資料:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
在這裡,最後一行列印行
kafka 2
和
streams 2
表示計數已經從1遞增到2。每當你向輸入主題寫入更多的輸入消息時,你将觀察到新的消息被添加到
streams-wordcount-output
主題,表示由WordCount應用程式計算出的最新字數。讓我們輸入一個最終的輸入文本行“join kafka summit”,然後在控制台生産者中輸入主題streams-wordcount-input之前的:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
all streams lead to kafka
hello kafka streams
join kafka summit
streams-wordcount-output主題随後将顯示相應的更新變化(請參見最後三行):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
可以看到,Wordcount應用程式的輸出實際上是一個連續的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是諸如“kafka”的記錄關鍵字。 對于具有相同密鑰的多個記錄,每個後面的記錄都是前一個記錄的更新。
下面的兩張圖說明了幕後發生的事情。第一列顯示KTable <string,long>目前狀态的演變,它計數count的單詞出現次數。 第二列顯示從KTable的狀态更新以及發送到輸出主題streams-wordcount-output的更改記錄。
首先正在處理文本行“all streams lead to kafka”。KTable正在建立,因為每個新單詞都會生成一個新表格(用綠色背景突出顯示),并将相應的更改記錄發送到下遊KStream。
當處理第二行文本“hello kafka streams”時,我們首次觀察到KTable中現有的條目正在被更新(這裡是:“kafka”和“streams”)。 再次,更改記錄發送到輸出主題。
(我們跳過了第三行如何處理的說明)。這解釋了為什麼輸出主題具有我們上面顯示的内容,因為它包含完整的變更記錄。
在這個例子的範圍之外,Kafka Streams在這裡做的是利用表和變更日志流之間的對偶性(這裡:table = KTable,changelog stream =下遊KStream):你可以釋出table轉換為流,并且如果你從頭到尾使用整個變更日志流,則可以重新建構表的内容。
最後,通過
Ctrl-C
停止控制台消費者,生産者,Wordcount程式,Kafka Broker和Zokeeper服務。
本文轉發自 http://orchome.com/936
關于Kafka深入學習視訊, 如Kafka上司選舉, offset管理, Streams接口, 高性能之道, 監控運維, 性能測試等,
請關注個人微信公衆号: 求學之旅, 發送Kafka, 即可收獲Kafka學習視訊大禮包一枚。

作者:HuZixia
出處:http://www.cnblogs.com/huzixia/
本文版權歸作者和部落格園共有,歡迎轉載,但必須給出原文連結,并保留此段聲明,否則保留追究法律責任的權利。