Kafka Stream概念及初識高層架構圖
Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature,它提供了對存儲于Kafka内的資料進行流式處理和分析的功能。簡而言之,Kafka Stream就是一個用來做流計算的類庫,與Storm、Spark Streaming、Flink的作用類似,但要輕量得多。
Kafka Stream的基本概念:
- Kafka Stream是處理分析存儲在Kafka資料的用戶端程式庫(lib)
- 由于Kafka Streams是Kafka的一個lib,是以實作的程式不依賴單獨的環境
- Kafka Stream通過state store可以實作高效的狀态操作
- 支援原語Processor和高層抽象DSL
雖然Kafka Streams隻是一個java庫,但是它可以解決如下類似問題:
- 一次一件事件的處理而不是微批處理,延遲在毫秒級别;
- 有狀态的處理,包括連接配接操作(join)和聚合操作
- 提供了必要的流處理原語,包括進階流處理DSL和低級流處理API。進階流處理DSL提供了常用的流處理變換操作,低級處理器API支援用戶端自定義處理器并與狀态倉庫(state store)互動;
- 使用類似于DataFlow的模型來處理亂序資料的事件視窗問題;
- 分布式處理,有容錯機制,可以快速容錯;
- 有重新處理資料的能力;
Kafka Stream的高層架構圖:
- Partition的資料會分發到不同的Task上,Task主要是用來做流式的并行處理
- 每個Task都會有自己的state store去記錄狀态
- 每個Thread裡會有多個Task
Kafka Stream 核心概念
流
流(stream)是Kafka Streams提供的最重要的抽象,它代表一個無限的、不斷更新的資料集。一個流就是由一個有序的、可重放的、支援故障轉移的不可變的資料記錄(data record)序列,其中每個資料記錄被定義成一個鍵值對。
流處理器
一個流處理器(stream processor)是處理拓撲中的一個節點,它代表了拓撲中的處理步驟。
一個流處理器從它所在的拓撲上遊接收資料,通過Kafka Streams提供的流處理的基本方法,如map()、filter()、join()以及聚合等方法,對資料進行處理,然後将處理之後的一個或者多個輸出結果發送給下遊流處理器。
一個拓撲中的流處理器有Source和Sink處理器連個特殊的流處理器;
- Source處理器:源處理器指的是資料的源頭,即第一個處理器,該處理器沒有任何上遊處理器
- Sink處理器:該處理器沒有任何下遊處理器,是最終産出結果的一個處理器。該處理器将從上遊處理器接受到的任何資料發送到指定的主題當中;
流處理拓撲:
一個拓撲圖,該拓撲圖展示了資料流的走向,以及流處理器的節點位置。流處理器是圖的節點,流是圖的邊。
如下圖所示:
Kafka提供了兩種定義流處理拓撲的API:
KafkaStreams DSL API.:這種類型的API提供了一些開箱即用的資料轉換操作算子例如:map、filter 和join和聚合類算子,開發這無需處理底層實作細節,缺點就是在一定程度上不夠靈活,這樣你就不必從頭開始實作這些流處理器。
Low-levelAPI:這些低級API允許開發人員定義和連接配接定制處理器和狀态存儲器進行交換,更加靈活,但是開發難度相對較大。
Kafka Stream使用示範
下圖是Kafka Stream完整的高層架構圖:
從上圖中可以看到,Consumer對一組Partition進行消費,這組Partition可以在一個Topic中或多個Topic中。然後形成資料流,經過各個流處理器後最終通過Producer輸出到一組Partition中,同樣這組Partition也可以在一個Topic中或多個Topic中。這個過程就是資料流的輸入和輸出。
下面通過幾個例子來使用一下Kafka Stream API
Topic之間的流輸入
是以,我們在使用Stream API前需要先建立兩個Topic,一個作為輸入,一個作為輸出。到伺服器上使用指令行建立兩個Topic:
建立Topic
使用腳本檔案和Admin API 建立Topic均可
腳本檔案建立topic
進入kafka容器
docker exec -it ${CONTAINER ID} /bin/bash
cd 到腳本檔案的檔案夾
cd /opt/kafka/bin
使用腳本檔案建立Topic
kafka-1是我使用docker-compose 搭建kafka叢集的時候的容器名
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-in
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-out
使用Admin API 來建立Topic 可見 kafka用戶端操作之Admin API
腳本檔案檢視已經建立好的API
./kafka-topics.sh --list --bootstrap-server kafka-1:9092
流輸入代碼
由于之前依賴的kafka-clients包中沒有Stream API,是以需要另外引入Stream的依賴包。
在項目中添加如下依賴:(版本号和部署的kafka server版本号一緻)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.1</version>
</dependency>
Kafka Streams從一個或多個輸入topic進行連續的計算并輸出到1個或多個外部topic中。
可以通過TopologyBuilder類定義一個計算邏輯處理器DAG拓撲。或者也可以通過提供的進階别KStream DSL來定義轉換的KStreamBuilder。(PS:計算邏輯其實就是自己的代碼邏輯)
KafkaStreams類管理Kafka Streams執行個體的生命周期。一個stream執行個體可以在配置檔案中為處理器指定一個或多個Thread。
KafkaStreams執行個體可以作為單個streams處理用戶端(也可能是分布式的),與其他的相同應用ID的執行個體進行協調(無論是否在同一個程序中,在同一台機器的其他程序中,或遠端機器上)。這些執行個體将根據輸入topic分區的基礎上來劃分工作,以便所有的分區都被消費掉。如果執行個體添加或失敗,所有執行個體将重新平衡它們之間的分區配置設定,以保證負載平衡。
在内部,KafkaStreams執行個體包含一個正常的KafkaProducer和KafkaConsumer執行個體,用于讀取和寫入。
下面是一個簡單的Topic之間的流輸入來體驗一下
static void easyStream(){
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
// 建構流構造器
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("xt-stream-in");
//直接将xt-stream-in topic中的資料寫入到 xt-stream-out topic中
source.to("xt-stream-out");
final Topology topo = builder.build();
final KafkaStreams streams = new KafkaStreams(topo, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
運作以上Stream代碼,然後使用腳本檔案生産消費消息
腳本生産消息
也可以使用producer API
./kafka-console-producer.sh --topic xt-stream-in --bootstrap-server kafka-1:9092
CTRL-C結束輸入
腳本消費消息
也可以使用consumer API
./kafka-console-consumer.sh --topic xt-stream-out --bootstrap-server kafka-1:9092 --from-beginning
foreach方法
在之前的例子中,我們是從某個Topic讀取資料進行流處理後再輸出到另一個Topic裡。但在一些場景下,我們可能不希望将結果資料輸出到Topic,而是寫入到一些存儲服務中,例如ElasticSearch、MongoDB、MySQL等。
在這種場景下,就可以利用到foreach方法,該方法用于疊代流中的元素。我們可以在foreach中将資料存入例如Map、List等容器,然後再批量寫入到資料庫或其他存儲中間件即可。
foreach方法使用示例:
// 定義流計算過程
static void foreachStream(){
//配置屬性
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"foreach-app");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 建構流結構拓撲
StreamsBuilder builder = new StreamsBuilder();
KStream<String,String> source = builder.stream("xt-stream-in");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.foreach((key,value)-> System.out.println(key + " : " + value));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
更多其他方法可以查閱官方文檔,或者直接CTRL-B進入KStream源碼檢視
詞頻統計
建立Topic
使用腳本檔案和Admin API 建立Topic均可
腳本檔案建立topic
進入kafka容器
docker exec -it ${CONTAINER ID} /bin/bash
cd 到腳本檔案的檔案夾
cd /opt/kafka/bin
使用腳本檔案建立Topic
kafka-1是我使用docker-compose 搭建kafka叢集的時候的容器名
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-in
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-out
使用Admin API 來建立Topic 可見 kafka用戶端操作之Admin API
詞頻統計代碼
代碼示例:
// 定義流計算過程,詞頻統計
static void wordcountStream(){
//配置屬性
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 建構流結構拓撲
StreamsBuilder builder = new StreamsBuilder();
// 不斷從INPUT_TOPIC上擷取新資料,并且追加到流上的一個抽象對象
KStream<String,String> source = builder.stream("xt-wordcount-in");
final Topology topo = builder.build();
final KafkaStreams streams = new KafkaStreams(topo, props);
final CountDownLatch latch = new CountDownLatch(1);
// KTable是資料集合的抽象對象
final KTable<String, Long> count = source
//資料扁平化 ,按照空格分割資料
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
// 合并 -> 按value值合并,分别進行統計
.groupBy((key, value) -> value)
// 統計出現的總數
.count();
// 将結果輸入到OUT_TOPIC中
KStream<String, Long> sink = count.toStream();
sink.to("xt-wordcount-out", Produced.with(Serdes.String(),Serdes.Long()));
Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
KTable與KStream的關系與差別
如下圖:
- KTable類似于一個時間片段,在一個時間片段内輸入的資料就會update進去,以這樣的形式來維護這張表,相同Key的每條記錄隻儲存最新的一條記錄,類似于資料庫的基于主鍵更新
- KStream則沒有update這個概念,而是不斷的追加,是一個由鍵值對構成的抽象記錄流,每個鍵值對是一個獨立的單元,即使相同的Key也不會覆寫,類似資料庫的插入操作;
腳本生産消息
運作以上代碼,然後到伺服器中使用kafka-console-producer.sh腳本指令向input-topic生産一些資料,如下:
./kafka-console-producer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-in
向xt-wordcount-in Topic 寫入如下消息,CTRL-C結束
Hello World xt
Hello World Kafka
Hello Java
Hello xt
腳本消費消息
然後再運作kafka-console-consumer.sh腳本指令從xt-wordcount-out Topic中消費資料,并進行列印。具體如下:
./kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-out --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
References:
- https://blog.51cto.com/zero01/2498111
- https://www.orchome.com/512