天天看點

Flink1.4 資料流類型與轉換關系

Flink 為流處理和批處理分别提供了 DataStream API 和 DataSet API。正是這種高層的抽象和 flunent API 極大地便利了使用者編寫大資料應用。不過很多初學者在看到官方文檔中那一大坨的轉換時,常常會蒙了圈,文檔中那些隻言片語也很難講清它們之間的關系。是以本文将介紹幾種關鍵的資料流類型,它們之間是如何通過轉換關聯起來的。下圖展示了 Flink 中目前支援的主要幾種流的類型,以及它們之間的轉換關系。

Flink1.4 資料流類型與轉換關系

1. DataStream

DataStream

Flink

流處理 API 中最核心的資料結構。它代表了一個運作在多個分區上的并行流。一個

DataStream

可以從

StreamExecutionEnvironment

通過

env.addSource(SourceFunction)

獲得。

DataStream

上的轉換操作都是逐條的,比如

map()

flatMap()

filter()

DataStream

也可以執行

rebalance

(再平衡,用來減輕資料傾斜)和

broadcaseted

(廣播)等分區轉換。

val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...))
val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
val str2: DataStream[(String, MyType)] = stream.rebalance()
val str3: DataStream[AnotherType] = stream.map { ... }           

複制

上述 DataStream 上的轉換在運作時會轉換成如下的執行圖:

Flink1.4 資料流類型與轉換關系

如上圖的執行圖所示,

DataStream

各個算子會并行運作,算子之間是資料流分區。如

Source

的第一個并行執行個體(S1)和

flatMap()

的第一個并行執行個體(m1)之間就是一個資料流分區。而在

flatMap()

map()

之間由于加了

rebalance()

,它們之間的資料流分區就有3個子分區(m1的資料流向3個map()執行個體)。這與

Apache Kafka

是很類似的,把流想象成

Kafka Topic

,而一個流分區就表示一個

Topic Partition

,流的目标并行算子執行個體就是

Kafka Consumers

2. KeyedStream

KeyedStream

用來表示根據指定的

key

進行分組的資料流。一個

KeyedStream

可以通過調用

DataStream.keyBy()

來獲得。而在

KeyedStream

上進行任何

transformation

都将轉變回

DataStream

。在實作中,

KeyedStream

是把

key

的資訊寫入到了

transformation

中。每條記錄隻能通路所屬

key

的狀态,其上的聚合函數可以友善地操作和儲存對應

key

的狀态。

3. WindowedStream & AllWindowedStream

WindowedStream

代表了根據

key

分組,并且基于

WindowAssigner

切分視窗的資料流。是以

WindowedStream

都是從

KeyedStream

衍生而來的。而在

WindowedStream

上進行任何

transformation

也都将轉變回

DataStream

val stream: DataStream[MyType] = ...
val windowed: WindowedDataStream[MyType] = stream
        .keyBy("userId")
        .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
val result: DataStream[ResultType] = windowed.reduce(myReducer)           

複制

上述

WindowedStream

的樣例代碼在運作時會轉換成如下的執行圖:

Flink1.4 資料流類型與轉換關系

Flink

的視窗實作中會将到達的資料緩存在對應的視窗

buffer

中(一個資料可能會對應多個視窗)。當到達視窗發送的條件時(由

Trigger

控制),

Flink

會對整個視窗中的資料進行處理。

Flink

在聚合類視窗有一定的優化,即不會儲存視窗中的所有值,而是每到一個元素執行一次聚合函數,最終隻儲存一份資料即可。

key

分組的流上進行視窗切分是比較常用的場景,也能夠很好地并行化(不同的

key

上的視窗聚合可以配置設定到不同的

task

去處理)。不過有時候我們也需要在普通流上進行視窗的操作,這就是

AllWindowedStream

AllWindowedStream

是直接在

DataStream

上進行

windowAll(...)

操作。

AllWindowedStream

的實作是基于

WindowedStream

的(Flink 1.1.x 開始)。

Flink

不推薦使用

AllWindowedStream

,因為在普通流上進行視窗操作,就勢必需要将所有分區的流都彙集到單個的

Task

中,而這個單個的

Task

很顯然就會成為整個Job的瓶頸。

4. JoinedStreams & CoGroupedStreams

雙流

Join

也是一個非常常見的應用場景。深入源碼你可以發現,

JoinedStreams

CoGroupedStreams

的代碼實作有80%是一模一樣的,

JoinedStreams

在底層又調用了

CoGroupedStreams

來實作

Join

功能。除了名字不一樣,一開始很難将它們區分開來,而且為什麼要提供兩個功能類似的接口呢?

實際上這兩者還是很點差別的。首先

co-group

側重的是

group

,是對同一個

key

上的兩組集合進行操作,而

join

側重的是

pair

,是對同一個

key

上的每對元素進行操作。

co-group

join

更通用一些,因為

join

隻是

co-group

的一個特例,是以

join

是可以基于

co-group

來實作的(當然有優化的空間)。而在

co-group

之外又提供了

join

接口是因為使用者更熟悉

join

(源于資料庫吧),而且能夠跟

DataSet API

保持一緻,降低使用者的學習成本。

JoinedStreams

CoGroupedStreams

是基于

Window

上實作的,是以

CoGroupedStreams

最終又調用了

WindowedStream

來實作。

val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...

val result: DataStream[(MyType, AnotherType)] = firstInput.join(secondInput)
    .where("userId").equalTo("id")
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})           

複制

上述

JoinedStreams

的樣例代碼在運作時會轉換成如下的執行圖:

Flink1.4 資料流類型與轉換關系

雙流上的資料在同一個

key

的會被分别配置設定到同一個

window

視窗的左右兩個籃子裡,當

window

結束的時候,會對左右籃子進行笛卡爾積進而得到每一對

pair

,對每一對

pair

應用

JoinFunction

。不過目前(Flink 1.1.x)

JoinedStreams

隻是簡單地實作了流上的

join

操作而已,距離真正的生産使用還是有些距離。因為目前

join

視窗的雙流資料都是被緩存在記憶體中的,也就是說如果某個

key

上的視窗資料太多就會導緻

JVM OOM

(然而資料傾斜是常态)。雙流

join

的難點也正是在這裡,這也是社群後面對

join

操作的優化方向,例如可以借鑒

Flink

在批處理

join

中的優化方案,也可以用

ManagedMemory

來管理視窗中的資料,并當資料超過門檻值時能spill到硬碟。

5. ConnectedStreams

DataStream

上有一個

union

的轉換

dataStream.union(otherStream1, otherStream2, ...)

,用來合并多個流,新的流會包含所有流中的資料。

union

有一個限制,就是所有合并的流的類型必須是一緻的。

ConnectedStreams

提供了和

union

類似的功能,用來連接配接兩個流,但是與

union

轉換有以下幾個差別:

  • ConnectedStreams

    隻能連接配接兩個流,而

    union

    可以連接配接多于兩個流。
  • ConnectedStreams

    連接配接的兩個流類型可以不一緻,而

    union

    連接配接的流的類型必須一緻。
  • ConnectedStreams

    會對兩個流的資料應用不同的處理方法,并且雙流之間可以共享狀态。這在第一個流的輸入會影響第二個流時, 會非常有用。

如下

ConnectedStreams

的樣例,連接配接

input

other

流,并在

input

流上應用

map1

方法,在

other

上應用

map2

方法,雙流可以共享狀态(比如計數)。

val input: DataStream[MyType] = ...
val other: DataStream[AnotherType] = ...

val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other)

val result: DataStream[ResultType] =
        connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() {
            override def map1(value: MyType): ResultType = { ... }
            override def map2(value: AnotherType): ResultType = { ... }
        })           

複制

當并行度為2時,其執行圖如下所示:

Flink1.4 資料流類型與轉換關系

6. 總結

本文介紹通過不同資料流類型的轉換圖來解釋每一種資料流的含義、轉換關系。後面的文章會深入講解 Window 機制的實作,雙流 Join 的實作等。

原文:http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/