天天看點

Kafka Java用戶端Stream API

Kafka Stream概念及初識高層架構圖

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature,它提供了對存儲于Kafka内的資料進行流式處理和分析的功能。簡而言之,Kafka Stream就是一個用來做流計算的類庫,與Storm、Spark Streaming、Flink的作用類似,但要輕量得多。

Kafka Stream的基本概念:

  • Kafka Stream是處理分析存儲在Kafka資料的用戶端程式庫(lib)
  • 由于Kafka Streams是Kafka的一個lib,是以實作的程式不依賴單獨的環境
  • Kafka Stream通過state store可以實作高效的狀态操作
  • 支援原語Processor和高層抽象DSL

雖然Kafka Streams隻是一個java庫,但是它可以解決如下類似問題:

  1. 一次一件事件的處理而不是微批處理,延遲在毫秒級别;
  2. 有狀态的處理,包括連接配接操作(join)和聚合操作
  3. 提供了必要的流處理原語,包括進階流處理DSL和低級流處理API。進階流處理DSL提供了常用的流處理變換操作,低級處理器API支援用戶端自定義處理器并與狀态倉庫(state store)互動;
  4. 使用類似于DataFlow的模型來處理亂序資料的事件視窗問題;
  5. 分布式處理,有容錯機制,可以快速容錯;
  6. 有重新處理資料的能力;

Kafka Stream的高層架構圖:

Kafka Java用戶端Stream API
  • Partition的資料會分發到不同的Task上,Task主要是用來做流式的并行處理
  • 每個Task都會有自己的state store去記錄狀态
  • 每個Thread裡會有多個Task

Kafka Stream 核心概念

流(stream)是Kafka Streams提供的最重要的抽象,它代表一個無限的、不斷更新的資料集。一個流就是由一個有序的、可重放的、支援故障轉移的不可變的資料記錄(data record)序列,其中每個資料記錄被定義成一個鍵值對。

流處理器

一個流處理器(stream processor)是處理拓撲中的一個節點,它代表了拓撲中的處理步驟。

一個流處理器從它所在的拓撲上遊接收資料,通過Kafka Streams提供的流處理的基本方法,如map()、filter()、join()以及聚合等方法,對資料進行處理,然後将處理之後的一個或者多個輸出結果發送給下遊流處理器。

一個拓撲中的流處理器有Source和Sink處理器連個特殊的流處理器;

  • Source處理器:源處理器指的是資料的源頭,即第一個處理器,該處理器沒有任何上遊處理器
  • Sink處理器:該處理器沒有任何下遊處理器,是最終産出結果的一個處理器。該處理器将從上遊處理器接受到的任何資料發送到指定的主題當中;

流處理拓撲:

一個拓撲圖,該拓撲圖展示了資料流的走向,以及流處理器的節點位置。流處理器是圖的節點,流是圖的邊。

如下圖所示:

Kafka Java用戶端Stream API

Kafka提供了兩種定義流處理拓撲的API:

KafkaStreams DSL API.:這種類型的API提供了一些開箱即用的資料轉換操作算子例如:map、filter 和join和聚合類算子,開發這無需處理底層實作細節,缺點就是在一定程度上不夠靈活,這樣你就不必從頭開始實作這些流處理器。

Low-levelAPI:這些低級API允許開發人員定義和連接配接定制處理器和狀态存儲器進行交換,更加靈活,但是開發難度相對較大。

Kafka Stream使用示範

下圖是Kafka Stream完整的高層架構圖:

Kafka Java用戶端Stream API

從上圖中可以看到,Consumer對一組Partition進行消費,這組Partition可以在一個Topic中或多個Topic中。然後形成資料流,經過各個流處理器後最終通過Producer輸出到一組Partition中,同樣這組Partition也可以在一個Topic中或多個Topic中。這個過程就是資料流的輸入和輸出。

下面通過幾個例子來使用一下Kafka Stream API

Topic之間的流輸入

是以,我們在使用Stream API前需要先建立兩個Topic,一個作為輸入,一個作為輸出。到伺服器上使用指令行建立兩個Topic:

建立Topic

使用腳本檔案和Admin API 建立Topic均可

腳本檔案建立topic

進入kafka容器

docker exec -it ${CONTAINER ID} /bin/bash      

cd 到腳本檔案的檔案夾

cd /opt/kafka/bin      

使用腳本檔案建立Topic

kafka-1是我使用docker-compose 搭建kafka叢集的時候的容器名

./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-in
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-out      

使用Admin API 來建立Topic 可見 ​​kafka用戶端操作之Admin API​​

腳本檔案檢視已經建立好的API

./kafka-topics.sh --list --bootstrap-server kafka-1:9092      
Kafka Java用戶端Stream API

流輸入代碼

由于之前依賴的kafka-clients包中沒有Stream API,是以需要另外引入Stream的依賴包。

在項目中添加如下依賴:(版本号和部署的kafka server版本号一緻)

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.1</version>
</dependency>      

Kafka Streams從一個或多個輸入topic進行連續的計算并輸出到1個或多個外部topic中。

可以通過TopologyBuilder類定義一個計算邏輯處理器DAG拓撲。或者也可以通過提供的進階别KStream DSL來定義轉換的KStreamBuilder。(PS:計算邏輯其實就是自己的代碼邏輯)

KafkaStreams類管理Kafka Streams執行個體的生命周期。一個stream執行個體可以在配置檔案中為處理器指定一個或多個Thread。

KafkaStreams執行個體可以作為單個streams處理用戶端(也可能是分布式的),與其他的相同應用ID的執行個體進行協調(無論是否在同一個程序中,在同一台機器的其他程序中,或遠端機器上)。這些執行個體将根據輸入topic分區的基礎上來劃分工作,以便所有的分區都被消費掉。如果執行個體添加或失敗,所有執行個體将重新平衡它們之間的分區配置設定,以保證負載平衡。

在内部,KafkaStreams執行個體包含一個正常的KafkaProducer和KafkaConsumer執行個體,用于讀取和寫入。

下面是一個簡單的Topic之間的流輸入來體驗一下

static void easyStream(){

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"stream-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

    // 建構流構造器
    StreamsBuilder builder = new StreamsBuilder();
    KStream<Object, Object> source = builder.stream("xt-stream-in");

    //直接将xt-stream-in topic中的資料寫入到 xt-stream-out topic中
    source.to("xt-stream-out");

    final Topology topo = builder.build();
    final KafkaStreams streams = new KafkaStreams(topo, props);

    final CountDownLatch latch = new CountDownLatch(1);

    Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.start();
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.exit(0);

}      

運作以上Stream代碼,然後使用腳本檔案生産消費消息

腳本生産消息

也可以使用producer API

./kafka-console-producer.sh --topic xt-stream-in --bootstrap-server kafka-1:9092      

CTRL-C結束輸入

Kafka Java用戶端Stream API

腳本消費消息

也可以使用consumer API

./kafka-console-consumer.sh --topic xt-stream-out --bootstrap-server kafka-1:9092 --from-beginning      
Kafka Java用戶端Stream API

foreach方法

在之前的例子中,我們是從某個Topic讀取資料進行流處理後再輸出到另一個Topic裡。但在一些場景下,我們可能不希望将結果資料輸出到Topic,而是寫入到一些存儲服務中,例如ElasticSearch、MongoDB、MySQL等。

在這種場景下,就可以利用到foreach方法,該方法用于疊代流中的元素。我們可以在foreach中将資料存入例如Map、List等容器,然後再批量寫入到資料庫或其他存儲中間件即可。

foreach方法使用示例:

// 定義流計算過程
static void foreachStream(){

    //配置屬性
    Properties props = new Properties();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"foreach-app");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // 建構流結構拓撲
    StreamsBuilder builder = new StreamsBuilder();

    KStream<String,String> source = builder.stream("xt-stream-in");

    source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            .foreach((key,value)-> System.out.println(key + " : " + value));

    final KafkaStreams streams = new KafkaStreams(builder.build(), props);

    streams.start();
}      

更多其他方法可以查閱​​官方文檔​​,或者直接CTRL-B進入KStream源碼檢視

詞頻統計

建立Topic

使用腳本檔案和Admin API 建立Topic均可

腳本檔案建立topic

進入kafka容器

docker exec -it ${CONTAINER ID} /bin/bash      

cd 到腳本檔案的檔案夾

cd /opt/kafka/bin      

使用腳本檔案建立Topic

kafka-1是我使用docker-compose 搭建kafka叢集的時候的容器名

./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-in

./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-out      

使用Admin API 來建立Topic 可見 ​​kafka用戶端操作之Admin API​​

詞頻統計代碼

代碼示例:

// 定義流計算過程,詞頻統計
static void wordcountStream(){

    //配置屬性
    Properties props = new Properties();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // 建構流結構拓撲
    StreamsBuilder builder = new StreamsBuilder();
    // 不斷從INPUT_TOPIC上擷取新資料,并且追加到流上的一個抽象對象
    KStream<String,String> source = builder.stream("xt-wordcount-in");


    final Topology topo = builder.build();
    final KafkaStreams streams = new KafkaStreams(topo, props);

    final CountDownLatch latch = new CountDownLatch(1);

    // KTable是資料集合的抽象對象
    final KTable<String, Long> count = source
            //資料扁平化 ,按照空格分割資料
            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            // 合并 -> 按value值合并,分别進行統計
            .groupBy((key, value) -> value)
            // 統計出現的總數
            .count();

    // 将結果輸入到OUT_TOPIC中
    KStream<String, Long> sink = count.toStream();
    sink.to("xt-wordcount-out", Produced.with(Serdes.String(),Serdes.Long()));


    Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });
    try {
        streams.start();
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.exit(0);

}      

KTable與KStream的關系與差別

如下圖:

Kafka Java用戶端Stream API
  • KTable類似于一個時間片段,在一個時間片段内輸入的資料就會update進去,以這樣的形式來維護這張表,相同Key的每條記錄隻儲存最新的一條記錄,類似于資料庫的基于主鍵更新
  • KStream則沒有update這個概念,而是不斷的追加,是一個由鍵值對構成的抽象記錄流,每個鍵值對是一個獨立的單元,即使相同的Key也不會覆寫,類似資料庫的插入操作;

腳本生産消息

運作以上代碼,然後到伺服器中使用kafka-console-producer.sh腳本指令向input-topic生産一些資料,如下:

./kafka-console-producer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-in      

向xt-wordcount-in Topic 寫入如下消息,CTRL-C結束

Hello World xt
Hello World Kafka
Hello Java 
Hello xt      

腳本消費消息

然後再運作kafka-console-consumer.sh腳本指令從xt-wordcount-out Topic中消費資料,并進行列印。具體如下:

./kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-out --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning      
Kafka Java用戶端Stream API

References:

  • ​​https://blog.51cto.com/zero01/2498111​​
  • ​​https://www.orchome.com/512​​