天天看點

Kafka Streams 剖析1.概述2.内容3.示例4.總結5.結束語

1.概述

  Kafka Streams 是一個用來處理流式資料的庫,屬于Java類庫,它并不是一個流處理架構,和Storm,Spark Streaming這類流處理架構是明顯不一樣的。那這樣一個庫是做什麼的,能應用到哪些場合,如何使用。筆者今天就給大家來一一剖析這些内容。

2.内容

  首先,我們研究這樣一個庫,需要知道它是做什麼的。Kafka Streams是一個用來建構流處理應用的庫,和Java的那些内置庫一樣,以一種分布式的容錯方式來處理一些事情。目前,業界用于流處理的計算架構包含有:Flink,Spark,Storm等等。Kafka Streams處理完後的結果可以回寫到Topic中,也可以外接其他系統進行落地。包含以下特性:

  • 事件區分:記錄資料發生的時刻
  • 時間處理:記錄資料被流處理應用開始處理的時刻,如記錄被消費的時刻
  • 開窗
  • 狀态管理:本身應用不需要管理狀态,如若需要處理複雜的流處理應用(分組,聚合,連接配接等)

  Kafka Streams使用是很簡單的,這一點通過閱讀官方的示例代碼就能發現,另外它利用Kafka的并發模型來完成負載均衡。

2.1 優勢

  在Kafka叢集上,能夠很便捷的使用,亮點如下圖所示:

Kafka Streams 剖析1.概述2.内容3.示例4.總結5.結束語
  • 能夠設計一些輕量級的Client類庫,和現有的Java程式整合
  • 不需要額外的Kafka叢集,利用現有的Kafka叢集的分區實作水準擴充
  • 容錯率,高可用性
  • 多平台部署,支援Mac,Linux和Windows系統
  • 權限安全控制

2.2 Sample

  Kafka Streams是直接建構與Kafka的基礎之上的,沒有了額外的流處理叢集,Table和一些有狀态的處理完全整合到了流處理本身。其核心代碼非常的簡介。簡而言之,就和你寫Consumer或Producer一樣,但是Kafka Streams更加的簡潔。

2.3 屬性

名稱 描述 類型 預設值 級别
application.id 流處理辨別,對應一個應用需要保持一緻,用作消費的group.id string
bootstrap.servers 用來發現Kafka的叢集節點,不需要配置所有的Broker list
replication.factor 複制因子 int 1
state.dir 本地狀态存儲目錄 /tmp/kafka-streams
cache.max.bytes.buffering 所有線程的最大緩沖記憶體 long 10485760
client.id 用戶端邏輯名稱,用于辨別請求位置 ""
default.key.serde 對Key序列化或反序列化類,實作于Serde接口 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.value.serde 對Value序列化或反序列化類,實作與Serde接口
...

  這裡隻是列舉了部分Kafka Streams的屬性值,更多的詳情可參考

Kafka Streams Configs

3.示例

  下面,我們可以通過一個示例代碼,來熟悉Kafka Streams的運作流程,如下所示:

import org.apache.kafka.common.serialization.Serdes;     import org.apache.kafka.streams.KafkaStreams;     import org.apache.kafka.streams.StreamsConfig;     import org.apache.kafka.streams.kstream.KStream;     import org.apache.kafka.streams.kstream.KStreamBuilder;     import org.apache.kafka.streams.kstream.KTable;     import java.util.Arrays;     import java.util.Properties;     public class WordCountApplication {         public static void main(final String[] args) throws Exception {             Properties config = new Properties();             config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount_topic_appid");             config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");             config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());             config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());             KStreamBuilder builder = new KStreamBuilder();             KStream<String, String> textLines = builder.stream("TextLinesTopic");             KTable<String, Long> wordCounts = textLines                 .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))                 .groupBy((key, word) -> word)                 .count("Counts");             wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");             KafkaStreams streams = new KafkaStreams(builder, config);             streams.start();         }     }      

  從代碼中,我們可以看出Kafka Streams為上層流定義了兩種基本抽象:

  • KStream:可以從一個或者多個Topic源來建立
  • KTable:從一個Topic源來建立

  這兩者的差別是,前者比較像傳統意義上的流,可以把每一個K/V看成獨立的,後者的思想更加接近與Map的概念。同一個Key輸入多次,後者是會覆寫前者的。而且,KStream和KTable都提供了一系列的轉換操作,每個操作可以産生一個或者多個KStream和KTable對象,所有這些轉換的方法連接配接在一起,就形成了一個複雜的Topology。由于KStream和KTable是強類型,這些轉換都被定義為通用函數,這樣在使用的時候讓使用者指定輸入和輸出資料類型。

  另外,無狀态的轉換不依賴于處理的狀态,是以不需要狀态倉庫。有狀态的轉換則需要進行存儲相應的狀态用于處理和生成結果。例如,在進行聚合操作的時候,一個視窗狀态用于儲存目前預定義收到的值,然後轉換擷取累計的值,再做計算。

  在處理完後,對于結果集使用者可以持續的将結果回寫到Topic,也可以通過KStream.to() 或者 KTable.to() 方法來實作。

4.總結

  通過對Kafka Streams的研究,它的優勢可以總結為以下幾點。首先,它提供了輕量級并且易用的API來有效的降低流資料的開發成本,之前要實作這類處理,需要使用Spark Streaming,Storm,Flink,或者自己編寫Consumer。其次,它開發的應用程式可以支援在YARN,Mesos這類資源排程中,使用方式靈活。而對于異步操作,不是很友好,需要謹慎處理;另外,對SQL文法的支援有限,需要額外開發。

5.結束語

  這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行讨論或發送郵件給我,我會盡我所能為您解答,與君共勉。

聯系方式:

郵箱:[email protected]

Twitter:

https://twitter.com/smartloli

QQ群(Hadoop - 交流社群1):

424769183

溫馨提示:請大家加群的時候寫上加群理由(姓名+公司/學校),友善管理者稽核,謝謝!

熱愛生活,享受程式設計,與君共勉!

作者:哥不是小蘿莉 [ 關于我 ][ 犒賞

出處: http://www.cnblogs.com/smartloli/

轉載請注明出處,謝謝合作!