天天看點

Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

1 基本的 API 概念

Flink程式是實作分布式集合轉換的正常程式(例如,過濾,映射,更新狀态,加入,分組,定義視窗,聚合)。最初從源建立集合(例如,通過從檔案,kafka主題或從本地的記憶體集合中讀取)。結果通過接收器傳回,接收器可以例如将資料寫入(分布式)檔案或标準輸出(例如,指令行終端)。 Flink程式可以在各種環境中運作,獨立運作或嵌入其他程式中。執行可以在本地JVM中執行,也可以在許多計算機的叢集上執行。

根據資料源的類型,即有界或無界源,您可以編寫批處理程式或流程式,其中

  • DataSet API用于批處理
  • DataStream API用于流式處理。
注意:在顯示如何使用API的實際示例時,我們将使用StreamingExecutionEnvironment和DataStream API。 DataSet API中的概念完全相同,隻需用ExecutionEnvironment和DataSet替換即可。
  • 大資料的處理流程
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

2 DataSet & DataStream

Flink具有特殊類DataSet和DataStream來表示程式中的資料。 可以将它們視為可以包含重複項的不可變資料集合。

  • 在DataSet的情況下,資料是有限的
  • 而對于DataStream,元素的數量可以是無限的

這些集合在某些關鍵方面與正常Java集合不同。 首先,它們是

不可變的

,這意味着

一旦建立它們,就無法添加或删除元素。 也不能簡單地檢查裡面的元素

最初通過在Flink程式中添加源來建立集合,并通過使用諸如map,filter等API方法對它們進行轉換來從這些集合中派生新集合。

Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

可以看出底層使用了資料源

3 Flink 項目流程剖析

Flink程式看起來像是轉換資料集合的正常程式。 每個程式包含相同的基本部分:

  • 獲得執行環境,
  • 加載/建立初始資料,
  • 指定此資料的轉換,
  • 指定放置計算結果的位置,
  • 觸發程式執行

Scala版本

我們現在将概述每個步驟

Scala DataSet API的所有核心類都可以在org.apache.flink.api.scala包中找到

而Scala DataStream API的類可以在org.apache.flink.streaming.api.scala中找到

StreamExecutionEnvironment是所有Flink程式的基礎

可以在StreamExecutionEnvironment上使用這些靜态方法擷取一個:

1:getExecutionEnvironment()

2:createLocalEnvironment()

3:createRemoteEnvironment(host: String, port: Int, jarFiles: String*)           
  • 法1示例代碼
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
  • 法2示例代碼
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
此方法将環境的預設并行度設定為給定參數,預設為通過[[setDefaultLocalParallelism(Int)]]設定的值。
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

通常,隻需要使用

getExecutionEnvironment()

,因為這将根據上下文執行正确的操作:

  • 如果在IDE中執行程式或作為正常Java程式,它将建立一個本地環境,将執行在本地機器上的程式。
  • 如果從程式中建立了一個JAR檔案,并通過指令行調用它,則Flink叢集管理器将執行您的main方法,

    getExecutionEnvironment()

    将傳回一個執行環境,用于在叢集上執行程式。

對于指定資料源,執行環境可以通過各種途徑從檔案中讀取

  • 逐行讀取它們
  • CSV檔案
  • 使用完全自定義資料輸入格式

要将文本檔案作為一系列行讀取,可以使用:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")           

這将提供一個

DataStream

,然後就可以在其上應用轉換來建立新的派生

DataStream

也可以通過使用轉換函數調用

DataSet

上的方法來應用轉換。 例如,map轉換如下所示:

val input: DataSet[String] = ...

val mapped = input.map { x => x.toInt }           

這将通過将原始集合中的每個String轉換為Integer來建立新的

DataStream

一旦有了包含最終結果的DataStream,就可以通過建立接收器将其寫入外部系統。 這些隻是建立接收器的一些示例方法:

writeAsText(path: String)

print()           

一旦指定了完整的程式,就需要通過調用

StreamExecutionEnvironment

上的

execute()

根據

ExecutionEnvironment

的類型,将在本地計算機上觸發執行或送出程式以在叢集上執行。

execute()方法傳回一個JobExecutionResult,它包含執行時間和累加器結果。

Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

觸發程式執行。環境将執行導緻"sink"操作運作程式的所有部分

Sink操作例如是列印結果或将它們轉發到消息隊列。

該法将記錄程式執行并使用提供的名稱顯示。

Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

4 延遲執行

所有Flink程式都是延遲執行:當執行程式的main方法時,資料加載和轉換不會立即執行。而是建立每個操作并将其添加到程式的計劃中。

當執行環境上的

execute()

調用顯式觸發執行時,實際執行操作。

程式是在本地執行還是在叢集上執行取決于執行環境的類型

延遲執行使我們可以建構Flink作為一個整體計劃單元執行的複雜程式,進行内部的優化。

5 指定keys

Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

上述程式中的這些資料如何确定呢?

某些轉換(join,coGroup,keyBy,groupBy)要求在元素集合上定義key

其他轉換(Reduce,GroupReduce,Aggregate,Windows)允許資料在應用之前在key上分組。

  • DataSet分組為
DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);           

雖然可以使用DataStream指定key

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);           

Flink的資料模型不基于鍵值對。 是以,無需将資料集類型實體打包到鍵和值中。 鍵是“虛拟的”:它們被定義為實際資料上的函數,以指導分組操作符。

注意:在下面的讨論中,将使用DataStream API和keyBy。 對于DataSet API,隻需要用DataSet和groupBy替換。

5.1 定義元組的鍵

  • 源碼
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

即 :按給定的鍵位置(對于元組/數組類型)對DataStream的元素進行分組,以與分組運算符(如分組縮減或分組聚合)一起使用。

最簡單的情況是在元組的一個或多個字段上對元組進行分組:

val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)           

元組在第一個字段(整數類型)上分組。

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)           

在這裡,我們将元組分組在由第一個和第二個字段組成的複合鍵上。

關于嵌套元組的注釋:如果你有一個帶有嵌套元組的DataStream,例如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;           

指定keyBy(0)将使系統使用完整的Tuple2作為鍵(以Integer和Float為鍵)。 如果要“導航”到嵌套的Tuple2中,則必須使用下面解釋的字段表達式鍵。

5.2 指定key的字段表達式

可以使用基于字元串的字段表達式來引用嵌套字段,并定義用于分組,排序,連接配接或coGrouping的鍵。

字段表達式可以非常輕松地選擇(嵌套)複合類型中的字段,例如Tuple和POJO類型。

我們有一個WC POJO,其中包含兩個字段“word”和“count”。

  • Java版本代碼
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
  • Scala版本代碼
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

要按字段分組,我們隻需将其名稱傳遞給keyBy()函數。

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
  def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)           

5.2.1 字段表達式文法:

  • 按字段名稱選擇POJO字段

    例如,“user”指的是POJO類型的“user”字段

  • 通過1偏移字段名稱或0偏移字段索引選擇元組字段

    例如,“_ 1”和“5”分别表示Scala Tuple類型的第一個和第六個字段。

  • 可以在POJO和Tuples中選擇嵌套字段

    例如,“user.zip”指的是POJO的“zip”字段,其存儲在POJO類型的“user”字段中。 支援任意嵌套和混合POJO和元組,例如“_2.user.zip”或“user._4.1.zip”。

  • 可以使用“_”通配符表達式選擇完整類型

    這也适用于非Tuple或POJO類型的類型。

5.2.2 字段表達示例

class WC(var complex: ComplexNestedClass, var count: Int) {
  def this() { this(null, 0) }
}

class ComplexNestedClass(
    var someNumber: Int,
    someFloat: Float,
    word: (Long, Long, String),
    hadoopCitizen: IntWritable) {
  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}           

這些是上面示例代碼的有效字段表達式:

  • “count”:WC類中的count字段。
  • “complex”:遞歸選擇POJO類型ComplexNestedClass的字段複合體的所有字段。
  • “complex.word._3”:選擇嵌套Tuple3的最後一個字段。
  • “complex.hadoopCitizen”:選擇Hadoop IntWritable類型。

5.3 指定key的key選擇器函數

定義鍵的另一種方法是“鍵選擇器”功能。 鍵選擇器函數将單個元素作為輸入并傳回元素的鍵。 key可以是任何類型,并且可以從确定性計算中導出。

以下示例顯示了一個鍵選擇器函數,它隻傳回一個對象的字段:

  • Java
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
  • Scala
    Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

6 指定轉換函數

大多數轉換都需要使用者自定義的函數。 本節列出了如何指定它們的不同方法

6.1 Java版本

6.1.1 實作接口

最基本的方法是實作一個提供的接口:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());           
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

6.1.2 匿名類

可以将函數作為匿名類傳遞:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});           

6.1.3 Java 8 Lambdas

Flink還支援Java API中的Java 8 Lambdas。

data.filter(s -> s.startsWith("http://"));

data.reduce((i1,i2) -> i1 + i2);           

6.1.4 增強函數

所有需要使用者定義函數的轉換都可以将增強函數作為參數。 例如,與其寫成

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};           
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

不如寫成

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};           
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

并像往常一樣将函數傳遞給map轉換:

data.map(new MyMapFunction());           

也可以定義為匿名類:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});           

除了使用者定義的函數(map,reduce等)之外,Rich函數還提供了四種方法:open,close,getRuntimeContext和setRuntimeContext。

這些用于參數化函數(請參閱将參數傳遞給函數),建立和完成本地狀态,通路廣播變量以及通路運作時資訊(如累加器和計數器)

7 支援的資料類型

Flink對DataSet或DataStream中可以包含的元素類型設定了一些限制。 原因是系統分析類型以确定有效的執行政策。

有六種不同類别的資料類型:

  • Java 元組 and Scala Case 類
  • Java POJOs
  • 原生類型
  • Regular Classes
  • Values
  • Hadoop Writables
  • Special Types

7.1 元組 and Case 類

7.1.1 Java版本

元組是包含固定數量的具有各種類型的字段的複合類型。 Java API提供從Tuple0到Tuple25的類。

Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet &amp; DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考

元組的每個字段都可以是包含更多元組的任意的Flink的類型,進而産生嵌套元組。 可以使用字段名稱tuple.f4直接通路元組的字段,也可以使用通用getter方法tuple.getField(int position)。 字段索引從0開始。

這與Scala的元組形成對比,但Java的正常索引更為一緻。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(0); // also valid .keyBy("f0")           

7.1.2 Scala版本

Scala case類(和Scala元組是case類的特例)是包含固定數量的具有各種類型的字段的複合類型。 元組字段由它們的1偏移名稱尋址,例如第一個字段的_1。 字段按名稱通路。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(0, 1) // key by field positions 0 and 1           

7.2 POJOs

如果滿足以下要求,則Flink将Java和Scala類視為特殊的POJO資料類型:

  • public限定
  • 它必須有一個沒有參數的公共構造函數(預設構造函數)。
  • 所有字段都是public的,或者必須通過getter和setter函數通路。 對于名為foo的字段,getter和setter方法必須命名為getFoo()和setFoo()。
  • Flink必須支援字段的類型。 目前,Flink使用Avro序列化任意對象(例如Date)。

Flink分析POJO類型的結構,即它了解POJO的字段。 是以,POJO類型比一般類型更容易使用。 此外,Flink可以比一般類型更有效地處理POJO。

以下示例顯示了一個包含兩個公共字段的簡單POJO。

7.2.1 Java版本

public class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

wordCounts.keyBy("word"); // key by field expression "word"           

7.2.2 Scala 版本

class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}

val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"           

7.3 原生類型

Flink支援所有Java和Scala原生類型,如Integer,String和Double。

7.4 General Class Types

Flink支援大多數Java和Scala類(API和自定義)。 限制适用于包含無法序列化的字段的類,如檔案指針,I / O流或其他本機資源。 遵循Java Beans約定的類通常可以很好地工作。

所有未辨別為POJO類型的類都由Flink作為正常類類型處理。 Flink将這些資料類型視為黑盒子,并且無法通路其内容(即,用于有效排序)。 使用序列化架構Kryo對正常類型進行反序列化。

7.5 Values

值類型手動描述其序列化和反序列化。

它們不是通過通用序列化架構,而是通過使用讀取和寫入方法實作org.apache.flinktypes.Value接口來為這些操作提供自定義代碼。當通用序列化效率非常低時,使用值類型是合理的。

一個示例是将元素的稀疏向量實作為數組的資料類型。知道數組大部分為零,可以對非零元素使用特殊編碼,而通用序列化隻需編寫所有數組元素。

org.apache.flinktypes.CopyableValue接口以類似的方式支援手動内部克隆邏輯。

Flink帶有與基本資料類型對應的預定義值類型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。這些值類型充當基本資料類型的可變變體:它們的值可以被更改,允許程式員重用對象并從垃圾收集器中消除壓力。

7.6 Hadoop Writables

可以使用實作org.apache.hadoop.Writable接口的類型。 write()和readFields()方法中定義的序列化邏輯将用于序列化。

7.7 Special Types

可以使用特殊類型,包括Scala的Either,Option和Try

Java API有自己的自定義Either實作。 與Scala的Either類似,它代表兩種可能類型的值,左或右。 兩者都可用于錯誤處理或需要輸出兩種不同類型記錄的運算符。

7.8 Type Erasure & Type Inference

僅适用于Java

Java編譯器在編譯後抛棄了大部分泛型類型資訊。這在Java中稱為類型擦除。這意味着在運作時,對象的執行個體不再知道其泛型類型。例如,DataStream 和DataStream 的執行個體于JVM看起來相同。

Flink在準備執行程式時(當調用程式的主要方法時)需要類型資訊。 Flink Java API嘗試重建以各種方式丢棄的類型資訊,并将其顯式存儲在資料集和運算符中。您可以通過DataStream.getType()檢索類型。該方法傳回TypeInformation的一個執行個體,這是Flink表示類型的内部方式。

類型推斷有其局限性,在某些情況下需要程式員的“合作”。這方面的示例是從集合建立資料集的方法,例如

ExecutionEnvironment.fromCollection()           

可以在其中傳遞描述類型的參數。但是像MapFunction 這樣的通用函數也可能需要額外的類型資訊。

ResultTypeQueryable接口可以通過輸入格式和函數實作,以明确告知API其傳回類型。調用函數的輸入類型通常可以通過先前操作的結果類型來推斷。

參考

Apache Flink
更多内容請關注JavaEdge 公-号