天天看點

[Kafka設計解析]--(七)Kafka Stream

Kafka Stream背景

Kafka Stream是什麼

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲于Kafka内的資料進行流式處理和分析的功能。

Kafka Stream的特點如下:

  • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常友善地嵌入任意Java應用中,也可以任意方式打包和部署
  • 除了Kafka外,無任何外部依賴
  • 充分利用Kafka分區機制實作水準擴充和順序性保證
  • 通過可容錯的state store實作高效的狀态操作(如windowed join和aggregation)
  • 支援正好一次處理語義
  • 提供記錄級的處理能力,進而實作毫秒級的低延遲
  • 支援基于事件時間的視窗操作,并且可處理晚到的資料(late arrival of records)
  • 同時提供底層的處理原語Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)

什麼是流式計算

一般流式計算會與批量計算相比較。在流式計算模型中,輸入是持續的,可以認為在時間上是無界的,也就意味着,永遠拿不到全量資料去做計算。同時,計算結果是持續輸出的,也即計算結果在時間上也是無界的。流式計算一般對實時性要求較高,同時一般是先定義目标計算,然後資料到來之後将計算邏輯應用于資料。同時為了提高計算效率,往往盡可能采用增量計算代替全量計算。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

批量處理模型中,一般先有全量資料集,然後定義計算邏輯,并将計算應用于全量資料。特點是全量計算,并且計算結果一次性全量輸出。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

為什麼要有Kafka Stream

目前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級别的處理能力,目前也支援SQL on Stream。而Spark Streaming基于Apache Spark,可以非常友善與圖計算,SQL處理等內建,功能強大,對于熟悉其它Spark應用開發的使用者而言使用門檻低。另外,目前主流的Hadoop發行版,如MapR,Cloudera和Hortonworks,都內建了Apache Storm和Apache Spark,使得部署更容易。

既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?筆者認為主要有如下原因。

第一,Spark和Storm都是流式處理架構,而Kafka Stream提供的是一個基于Kafka的流式處理類庫。架構要求開發者按照特定的方式去開發邏輯部分,供架構調用。開發者很難了解架構的具體運作方式,進而使得調試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運作方式主要由開發者控制,友善使用和調試。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

第二,雖然Cloudera與Hortonworks友善了Storm和Spark的部署,但是這些架構的部署仍然相對複雜。而Kafka Stream作為類庫,可以非常友善的嵌入應用程式中,它對應用的打包和部署基本沒有任何要求。更為重要的是,Kafka Stream充分利用了​​Kafka的分區機制​​​和​​Consumer的Rebalance機制​​,使得Kafka Stream可以非常友善的水準擴充,并且各個執行個體可以使用不同的部署方式。具體來說,每個運作Kafka Stream的應用程式執行個體都包含了Kafka Consumer執行個體,多個同一應用的執行個體之間并行處理資料集。而不同執行個體之間的部署方式并不要求一緻,比如部分執行個體可以運作在Web容器中,部分執行個體可運作在Docker或Kubernetes中。

第三,就流式處理系統而言,基本都支援Kafka作為資料源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka子產品。事實上,Kafka基本上是主流的流式處理系統的标準資料源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。

第四,使用Storm或Spark Streaming時,需要為架構本身的程序預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應用執行個體而言,架構本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留記憶體。

第五,由于Kafka本身提供資料持久化,是以Kafka Stream提供滾動部署和滾動更新以及重新計算的能力。

第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以線上動态調整并行度。

Kafka Stream架構

Kafka Stream整體架構

Kafka Stream的整體架構圖如下所示。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

目前(Kafka 0.11.0.0)Kafka Stream的資料源隻能如上圖所示是Kafka。但是處理結果并不一定要如上圖所示輸出到Kafka。實際上KStream和Ktable的執行個體化都需要指定Topic。

KStream<String, String> stream = builder.stream("words-stream");
KTable<String, String> table = builder.table("words-table", "words-store");      

另外,上圖中的Consumer和Producer并不需要開發者在應用中顯示執行個體化,而是由Kafka Stream根據參數隐式執行個體化和管理,進而降低了使用門檻。開發者隻需要專注于開發核心業務邏輯,也即上圖中Task内的部分。

Processor Topology

基于Kafka Stream的流式應用的業務邏輯全部通過一個被稱為Processor Topology的地方執行。它與Storm的Topology和Spark的DAG類似,都定義了資料在各個處理單元(在Kafka Stream中被稱作Processor)間的流動方式,或者說定義了資料的處理邏輯。

下面是一個Processor的示例,它實作了Word Count功能,并且每秒輸出一次結果。

public class WordCountProcessor implements Processor<String, String> {
  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;
  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }
  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }
  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }
  @Override
  public void close() {
    this.kvStore.close();
  }
}      

從上述代碼中可見

​process​

  • 定義了對每條記錄的處理邏輯,也印證了Kafka可具有記錄級的資料處理能力。
  • context.scheduler定義了punctuate被執行的周期,進而提供了實作視窗操作的能力。
  • context.getStateStore提供的狀态存儲為有狀态計算(如視窗,聚合)提供了可能。

Kafka Stream并行模型

Kafka Stream的并行模型中,最小粒度為Task,而每個Task包含一個特定子Topology的所有Processor。是以每個Task所執行的代碼完全一樣,唯一的不同在于所處理的資料集互補。這一點跟Storm的Topology完全不一樣。Storm的Topology的每一個Task隻包含一個Spout或Bolt的執行個體。是以Storm的一個Topology内的不同Task之間需要通過網絡通信傳遞資料,而Kafka Stream的Task包含了完整的子Topology,是以Task之間不需要傳遞資料,也就不需要網絡通信。這一點降低了系統複雜度,也提高了處理效率。

如果某個Stream的輸入Topic有多個(比如2個Topic,1個Partition數為4,另一個Partition數為3),則總的Task數等于Partition數最多的那個Topic的Partition數(max(4,3)=4)。這是因為Kafka Stream使用了Consumer的Rebalance機制,每個Partition對應一個Task。

下圖展示了在一個程序(Instance)中以2個Topic(Partition數均為4)為資料源的Kafka Stream應用的并行模型。從圖中可以看到,由于Kafka Stream應用的預設線程數為1,是以4個Task全部在一個線程中運作。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

為了充分利用多線程的優勢,可以設定Kafka Stream的線程數。下圖展示了線程數為2時的并行模型。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

前文有提到,Kafka Stream可被嵌入任意Java應用(理論上基于JVM的應用都可以)中,下圖展示了在同一台機器的不同程序中同時啟動同一Kafka Stream應用時的并行模型。注意,這裡要保證兩個程序的​

​StreamsConfig.APPLICATION_ID_CONFIG​

​完全一樣。因為Kafka Stream将APPLICATION_ID_CONFIG作為隐式啟動的Consumer的Group ID。隻有保證APPLICATION_ID_CONFIG相同,才能保證這兩個程序的Consumer屬于同一個Group,進而可以通過Consumer Rebalance機制拿到互補的資料集。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

既然實作了多程序部署,可以以同樣的方式實作多機器部署。該部署方式也要求所有程序的APPLICATION_ID_CONFIG完全一樣。從圖上也可以看到,每個執行個體中的線程數并不要求一樣。但是無論如何部署,Task總數總會保證一緻。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

注意:Kafka Stream的并行模型,非常依賴于《​​Kafka設計解析(一)- Kafka背景及架構介紹​​​》一文中介紹的​​Kafka分區機制​​​和《​​Kafka設計解析(四)- Kafka Consumer設計解析​​​》中介紹的​​Consumer的Rebalance機制​​。強烈建議不太熟悉這兩種機制的朋友,先行閱讀這兩篇文章。

這裡對比一下Kafka Stream的Processor Topology與Storm的Topology。

  • Storm的Topology由Spout和Bolt組成,Spout提供資料源,而Bolt提供計算和資料導出。Kafka Stream的Processor Topology完全由Processor組成,因為它的資料固定由Kafka的Topic提供。
  • Storm的不同Bolt運作在不同的Executor中,很可能位于不同的機器,需要通過網絡通信傳輸資料。而Kafka Stream的Processor Topology的不同Processor完全運作于同一個Task中,也就完全處于同一個線程,無需網絡通信。
  • Storm的Topology可以同時包含Shuffle部分和非Shuffle部分,并且往往一個Topology就是一個完整的應用。而Kafka Stream的一個實體Topology隻包含非Shuffle部分,而Shuffle部分需要通過

​through​

  • 操作顯示完成,該操作将一個大的Topology分成了2個子Topology。
  • Storm的Topology内,不同Bolt/Spout的并行度可以不一樣,而Kafka Stream的子Topology内,所有Processor的并行度完全一樣。
  • Storm的一個Task隻包含一個Spout或者Bolt的執行個體,而Kafka Stream的一個Task包含了一個子Topology的所有Processor。

KTable vs. KStream

KTable和KStream是Kafka Stream中非常重要的兩個概念,它們是Kafka實作各種語義的基礎。是以這裡有必要分析下二者的差別。

KStream是一個資料流,可以認為所有記錄都通過Insert only的方式插入進這個資料流裡。而KTable代表一個完整的資料集,可以了解為資料庫中的表。由于每條記錄都是Key-Value對,這裡可以将Key了解為資料庫中的Primary Key,而Value可以了解為一行記錄。可以認為KTable中的資料都是通過Update only的方式進入的。也就意味着,如果KTable對應的Topic中新進入的資料的Key已經存在,那麼從KTable隻會取出同一Key對應的最後一條資料,相當于新的資料更新了舊的資料。

以下圖為例,假設有一個KStream和KTable,基于同一個Topic建立,并且該Topic中包含如下圖所示5條資料。此時周遊KStream将得到與Topic内資料完全一樣的所有5條資料,且順序不變。而此時周遊KTable時,因為這5條記錄中有3個不同的Key,是以将得到3條記錄,每個Key對應最新的值,并且這三條資料之間的順序與原來在Topic中的順序保持一緻。這一點與Kafka的日志compact相同。

​​​

[Kafka設計解析]--(七)Kafka Stream

​​

此時如果對該KStream和KTable分别基于key做Group,對Value進行Sum,得到的結果将會不同。對KStream的計算結果是​

​<Jack,4>​

​,​

​<Lily,7>​

​,​

​<Mike,4>​

​。而對Ktable的計算結果是​

​<Mike,4>​

​,​

​<Jack,3>​

​,​

​<Lily,5>​

​。

State store

流式進行中,部分操作是無狀态的,例如過濾操作(Kafka Stream DSL中用​

​filer​

​方法實作)。而部分操作是有狀态的,需要記錄中間狀态,如Window操作和聚合計算。State store被用來存儲中間狀态。它可以是一個持久化的Key-Value存儲,也可以是記憶體中的HashMap,或者是資料庫。Kafka提供了基于Topic的狀态存儲。

Topic中存儲的資料記錄本身是Key-Value形式的,同時Kafka的log compaction機制可對曆史資料做compact操作,保留每個Key對應的最後一個Value,進而在保證Key不丢失的前提下,減少總資料量,進而提高查詢效率。

構造KTable時,需要指定其state store name。預設情況下,該名字也即用于存儲該KTable的狀态的Topic的名字,周遊KTable的過程,實際就是周遊它對應的state store,或者說周遊Topic的所有key,并取每個Key最新值的過程。為了使得該過程更加高效,預設情況下會對該Topic進行compact操作。

另外,除了KTable,所有狀态計算,都需要指定state store name,進而記錄中間狀态。

Kafka Stream如何解決流式系統中關鍵問題

時間

在流式資料進行中,時間是資料的一個非常重要的屬性。從Kafka 0.10開始,每條記錄除了Key和Value外,還增加了​

​timestamp​

​屬性。目前Kafka Stream支援三種時間

  • 事件發生時間。事件發生的時間,包含在資料記錄中。發生時間由Producer在構造ProducerRecord時指定。并且需要Broker或者Topic将

​message.timestamp.type​

  • 設定為

​CreateTime​

  • (預設值)才能生效。
  • 消息接收時間,也即消息存入Broker的時間。當Broker或Topic将

​message.timestamp.type​

  • 設定為

​LogAppendTime​

  • 時生效。此時Broker會在接收到消息後,存入磁盤前,将其

​timestamp​

  • 屬性值設定為目前機器時間。一般消息接收時間比較接近于事件發生時間,部分場景下可代替事件發生時間。
  • 消息處理時間,也即Kafka Stream處理消息時的時間。

注:Kafka Stream允許通過實作​

​org.apache.kafka.streams.processor.TimestampExtractor​

​接口自定義記錄時間。

視窗

前文提到,流式資料是在時間上無界的資料。而聚合操作隻能作用在特定的資料集,也即有界的資料集上。是以需要通過某種方式從無界的資料集上按特定的語義選取出有界的資料。視窗是一種非常常用的設定計算邊界的方式。不同的流式處理系統支援的視窗類似,但不盡相同。

Kafka Stream支援的視窗如下。

​Hopping Time Window​

  1. ​​​
    [Kafka設計解析]--(七)Kafka Stream
    ​​

​Tumbling Time Window​

  1. 該視窗定義如下圖所示。可以認為它是Hopping Time Window的一種特例,也即Window size和Advance interval相等。它的特點是各個Window之間完全不相交。

    ​​​

    [Kafka設計解析]--(七)Kafka Stream
    ​​

​Sliding Window​

  1. 該視窗隻用于2個KStream進行Join計算時。該視窗的大小定義了Join兩側KStream的資料記錄被認為在同一個視窗的最大時間差。假設該視窗的大小為5秒,則參與Join的2個KStream中,記錄時間差小于5的記錄被認為在同一個視窗中,可以進行Join計算。

​Session Window​

  1. 該視窗用于對Key做Group後的聚合操作中。它需要對Key做分組,然後對組内的資料根據業務需求定義一個視窗的起始點和結束點。一個典型的案例是,希望通過Session Window計算某個使用者通路網站的時間。對于一個特定的使用者(用Key表示)而言,當發生登入操作時,該使用者(Key)的視窗即開始,當發生退出操作或者逾時時,該使用者(Key)的視窗即結束。視窗結束時,可計算該使用者的通路時間或者點選次數等。

Join

Kafka Stream由于包含KStream和Ktable兩種資料集,是以提供如下Join計算

​KTable Join KTable​

​​

​KStream Join KStream​

​​

​KStream Join KTable / GlobalKTable​

對于Join操作,如果要得到正确的計算結果,需要保證參與Join的KTable或KStream中Key相同的資料被配置設定到同一個Task。具體方法是

  • 參與Join的KTable或KStream的Key類型相同(實際上,業務含意也應該相同)
  • 參與Join的KTable或KStream對應的Topic的Partition數相同
  • Partitioner政策的最終結果等效(實作不需要完全一樣,隻要效果一樣即可),也即Key相同的情況下,被配置設定到ID相同的Partition内

如果上述條件不滿足,可通過調用如下方法使得它滿足上述條件。

KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)      

聚合與亂序處理

聚合操作可應用于KStream和KTable。當聚合發生在KStream上時必須指定視窗,進而限定計算的目标資料集。

需要說明的是,聚合操作的結果肯定是KTable。因為KTable是可更新的,可以在晚到的資料到來時(也即發生資料亂序時)更新結果KTable。

這裡舉例說明。假設對KStream以5秒為視窗大小,進行Tumbling Time Window上的Count操作。并且KStream先後出現時間為1秒, 3秒, 5秒的資料,此時5秒的視窗已達上限,Kafka Stream關閉該視窗,觸發Count操作并将結果3輸出到KTable中(假設該結果表示為<1-5,3>)。若1秒後,又收到了時間為2秒的記錄,由于1-5秒的視窗已關閉,若直接抛棄該資料,則可認為之前的結果<1-5,3>不準确。而如果直接将完整的結果<1-5,4>輸出到KStream中,則KStream中将會包含該視窗的2條記錄,<1-5,3>, <1-5,4>,也會存在肮資料。是以Kafka Stream選擇将聚合結果存于KTable中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。使用者可得到完整的正确的結果。

這種方式保證了資料準确性,同時也提高了容錯性。

但需要說明的是,Kafka Stream并不會對所有晚到的資料都重新計算并更新結果集,而是讓使用者設定一個​

​retention period​

​,将每個視窗的結果集在記憶體中保留一定時間,該視窗内的資料晚到時,直接合并計算,并更新結果KTable。超過​

​retention period​

​後,該視窗結果将從記憶體中删除,并且晚到的資料即使落入視窗,也會被直接丢棄。

容錯

Kafka Stream從如下幾個方面進行容錯

  • 高可用的Partition保證無資料丢失。每個Task計算一個Partition,而Kafka資料複制機制保證了Partition内資料的高可用性,故無資料丢失風險。同時由于資料是持久化的,即使任務失敗,依然可以重新計算。
  • 狀态存儲實作快速故障恢複和從故障點繼續處理。對于Join和聚合及視窗等有狀态計算,狀态存儲可儲存中間狀态。即使發生Failover或Consumer Rebalance,仍然可以通過狀态存儲恢複中間狀态,進而可以繼續從Failover或Consumer Rebalance前的點繼續計算。
  • KTable與

​retention period​

  • 提供了對亂序資料的處理能力。

Kafka Stream應用示例

下面結合一個案例來講解如何開發Kafka Stream應用。本例完整代碼可從​​作者Github​​擷取。

訂單KStream(名為orderStream),底層Topic的Partition數為3,Key為使用者名,Value包含使用者名,商品名,訂單時間,數量。使用者KTable(名為userTable),底層Topic的Partition數為3,Key為使用者名,Value包含性别,位址和年齡。商品KTable(名為itemTable),底層Topic的Partition數為6,Key為商品名,價格,種類和産地。現在希望計算每小時購買産地與自己所在地相同的使用者總數。

首先由于希望使用訂單時間,而它包含在orderStream的Value中,需要通過提供一個實作TimestampExtractor接口的類從orderStream對應的Topic中抽取出訂單時間。

public class OrderTimestampExtractor implements TimestampExtractor {
  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
      return ((Order)record).getTS();
    } else {
      return 0;
    }
  }
}      

接着通過将orderStream與userTable進行Join,來擷取訂單使用者所在地。由于二者對應的Topic的Partition數相同,且Key都為使用者名,再假設Producer往這兩個Topic寫資料時所用的Partitioner實作相同,則此時上文所述Join條件滿足,可直接進行Join。

orderUserStream = orderStream
    .leftJoin(userTable, 
         // 該lamda表達式定義了如何從orderStream與userTable生成結果集的Value
        (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
         // 結果集Key序列化方式
        Serdes.String(),
         // 結果集Value序列化方式
         SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)      

從上述代碼中,可以看到,Join時需要指定如何從參與Join雙方的記錄生成結果記錄的Value。Key不需要指定,因為結果記錄的Key與Join Key相同,故無須指定。Join結果存于名為orderUserStream的KStream中。

接下來需要将orderUserStream與itemTable進行Join,進而擷取商品産地。此時orderUserStream的Key仍為使用者名,而itemTable對應的Topic的Key為産品名,并且二者的Partition數不一樣,是以無法直接Join。此時需要通過through方法,對其中一方或雙方進行重新分區,使得二者滿足Join條件。這一過程相當于Spark的Shuffle過程和Storm的FieldGrouping。

orderUserStrea
    .through(
        // Key的序列化方式
        Serdes.String(),
        // Value的序列化方式 
        SerdesFactory.serdFrom(OrderUser.class), 
        // 重新按照商品名進行分區,具體取商品名的哈希值,然後對分區數取模
        (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
        "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))      

總結

  • Kafka Stream的并行模型完全基于Kafka的分區機制和Rebalance機制,實作了線上動态調整并行度
  • 同一Task包含了一個子Topology的所有Processor,使得所有處理邏輯都在同一線程内完成,避免了不必的網絡通信開銷,進而提高了效率。
  • 方法提供了類似Spark的Shuffle機制,為使用不同分區政策的資料提供了Join的可能
  • log compact提高了基于Kafka的state store的加載效率
  • state store為狀态計算提供了可能
  • 基于offset的計算進度管理以及基于state store的中間狀态管理為發生Consumer rebalance或Failover時從斷點處繼續處理提供了可能,并為系統容錯性提供了保障
  • KTable的引入,使得聚合計算擁用了處理亂序問題的能力

繼續閱讀