1 基本的 API 概念
Flink程式是實作分布式集合轉換的正常程式(例如,過濾,映射,更新狀态,加入,分組,定義視窗,聚合)。最初從源建立集合(例如,通過從檔案,kafka主題或從本地的記憶體集合中讀取)。結果通過接收器傳回,接收器可以例如将資料寫入(分布式)檔案或标準輸出(例如,指令行終端)。 Flink程式可以在各種環境中運作,獨立運作或嵌入其他程式中。執行可以在本地JVM中執行,也可以在許多計算機的叢集上執行。
根據資料源的類型,即有界或無界源,您可以編寫批處理程式或流程式,其中
- DataSet API用于批處理
- DataStream API用于流式處理。
注意:在顯示如何使用API的實際示例時,我們将使用StreamingExecutionEnvironment和DataStream API。 DataSet API中的概念完全相同,隻需用ExecutionEnvironment和DataSet替換即可。
- 大資料的處理流程
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
2 DataSet & DataStream
Flink具有特殊類DataSet和DataStream來表示程式中的資料。 可以将它們視為可以包含重複項的不可變資料集合。
- 在DataSet的情況下,資料是有限的
- 而對于DataStream,元素的數量可以是無限的
這些集合在某些關鍵方面與正常Java集合不同。 首先,它們是
不可變的
,這意味着
一旦建立它們,就無法添加或删除元素。 也不能簡單地檢查裡面的元素
。
最初通過在Flink程式中添加源來建立集合,并通過使用諸如map,filter等API方法對它們進行轉換來從這些集合中派生新集合。
可以看出底層使用了資料源
3 Flink 項目流程剖析
Flink程式看起來像是轉換資料集合的正常程式。 每個程式包含相同的基本部分:
- 獲得執行環境,
- 加載/建立初始資料,
- 指定此資料的轉換,
- 指定放置計算結果的位置,
- 觸發程式執行
Scala版本
我們現在将概述每個步驟
Scala DataSet API的所有核心類都可以在org.apache.flink.api.scala包中找到
而Scala DataStream API的類可以在org.apache.flink.streaming.api.scala中找到
StreamExecutionEnvironment是所有Flink程式的基礎
可以在StreamExecutionEnvironment上使用這些靜态方法擷取一個:
1:getExecutionEnvironment()
2:createLocalEnvironment()
3:createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
- 法1示例代碼
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考 - 法2示例代碼
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
此方法将環境的預設并行度設定為給定參數,預設為通過[[setDefaultLocalParallelism(Int)]]設定的值。
通常,隻需要使用
getExecutionEnvironment()
,因為這将根據上下文執行正确的操作:
- 如果在IDE中執行程式或作為正常Java程式,它将建立一個本地環境,将執行在本地機器上的程式。
- 如果從程式中建立了一個JAR檔案,并通過指令行調用它,則Flink叢集管理器将執行您的main方法,
将傳回一個執行環境,用于在叢集上執行程式。getExecutionEnvironment()
對于指定資料源,執行環境可以通過各種途徑從檔案中讀取
- 逐行讀取它們
- CSV檔案
- 使用完全自定義資料輸入格式
要将文本檔案作為一系列行讀取,可以使用:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
這将提供一個
DataStream
,然後就可以在其上應用轉換來建立新的派生
DataStream
也可以通過使用轉換函數調用
DataSet
上的方法來應用轉換。 例如,map轉換如下所示:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
這将通過将原始集合中的每個String轉換為Integer來建立新的
DataStream
一旦有了包含最終結果的DataStream,就可以通過建立接收器将其寫入外部系統。 這些隻是建立接收器的一些示例方法:
writeAsText(path: String)
print()
一旦指定了完整的程式,就需要通過調用
StreamExecutionEnvironment
上的
execute()
根據
ExecutionEnvironment
的類型,将在本地計算機上觸發執行或送出程式以在叢集上執行。
execute()方法傳回一個JobExecutionResult,它包含執行時間和累加器結果。
觸發程式執行。環境将執行導緻"sink"操作運作程式的所有部分
Sink操作例如是列印結果或将它們轉發到消息隊列。
該法将記錄程式執行并使用提供的名稱顯示。
4 延遲執行
所有Flink程式都是延遲執行:當執行程式的main方法時,資料加載和轉換不會立即執行。而是建立每個操作并将其添加到程式的計劃中。
當執行環境上的
execute()
調用顯式觸發執行時,實際執行操作。
程式是在本地執行還是在叢集上執行取決于執行環境的類型
延遲執行使我們可以建構Flink作為一個整體計劃單元執行的複雜程式,進行内部的優化。
5 指定keys
上述程式中的這些資料如何确定呢?
某些轉換(join,coGroup,keyBy,groupBy)要求在元素集合上定義key
其他轉換(Reduce,GroupReduce,Aggregate,Windows)允許資料在應用之前在key上分組。
- DataSet分組為
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
雖然可以使用DataStream指定key
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink的資料模型不基于鍵值對。 是以,無需将資料集類型實體打包到鍵和值中。 鍵是“虛拟的”:它們被定義為實際資料上的函數,以指導分組操作符。
注意:在下面的讨論中,将使用DataStream API和keyBy。 對于DataSet API,隻需要用DataSet和groupBy替換。
5.1 定義元組的鍵
- 源碼
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
即 :按給定的鍵位置(對于元組/數組類型)對DataStream的元素進行分組,以與分組運算符(如分組縮減或分組聚合)一起使用。
最簡單的情況是在元組的一個或多個字段上對元組進行分組:
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
元組在第一個字段(整數類型)上分組。
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)
在這裡,我們将元組分組在由第一個和第二個字段組成的複合鍵上。
關于嵌套元組的注釋:如果你有一個帶有嵌套元組的DataStream,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)将使系統使用完整的Tuple2作為鍵(以Integer和Float為鍵)。 如果要“導航”到嵌套的Tuple2中,則必須使用下面解釋的字段表達式鍵。
5.2 指定key的字段表達式
可以使用基于字元串的字段表達式來引用嵌套字段,并定義用于分組,排序,連接配接或coGrouping的鍵。
字段表達式可以非常輕松地選擇(嵌套)複合類型中的字段,例如Tuple和POJO類型。
我們有一個WC POJO,其中包含兩個字段“word”和“count”。
- Java版本代碼
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考 - Scala版本代碼
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
要按字段分組,我們隻需将其名稱傳遞給keyBy()函數。
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
5.2.1 字段表達式文法:
-
按字段名稱選擇POJO字段
例如,“user”指的是POJO類型的“user”字段
-
通過1偏移字段名稱或0偏移字段索引選擇元組字段
例如,“_ 1”和“5”分别表示Scala Tuple類型的第一個和第六個字段。
-
可以在POJO和Tuples中選擇嵌套字段
例如,“user.zip”指的是POJO的“zip”字段,其存儲在POJO類型的“user”字段中。 支援任意嵌套和混合POJO和元組,例如“_2.user.zip”或“user._4.1.zip”。
-
可以使用“_”通配符表達式選擇完整類型
這也适用于非Tuple或POJO類型的類型。
5.2.2 字段表達示例
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
這些是上面示例代碼的有效字段表達式:
- “count”:WC類中的count字段。
- “complex”:遞歸選擇POJO類型ComplexNestedClass的字段複合體的所有字段。
- “complex.word._3”:選擇嵌套Tuple3的最後一個字段。
- “complex.hadoopCitizen”:選擇Hadoop IntWritable類型。
5.3 指定key的key選擇器函數
定義鍵的另一種方法是“鍵選擇器”功能。 鍵選擇器函數将單個元素作為輸入并傳回元素的鍵。 key可以是任何類型,并且可以從确定性計算中導出。
以下示例顯示了一個鍵選擇器函數,它隻傳回一個對象的字段:
- Java
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考 - Scala
Flink實戰(三) - 程式設計範式及核心概念1 基本的 API 概念2 DataSet & DataStream3 Flink 項目流程剖析4 延遲執行5 指定keys6 指定轉換函數7 支援的資料類型參考
6 指定轉換函數
大多數轉換都需要使用者自定義的函數。 本節列出了如何指定它們的不同方法
6.1 Java版本
6.1.1 實作接口
最基本的方法是實作一個提供的接口:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
6.1.2 匿名類
可以将函數作為匿名類傳遞:
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
6.1.3 Java 8 Lambdas
Flink還支援Java API中的Java 8 Lambdas。
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
6.1.4 增強函數
所有需要使用者定義函數的轉換都可以将增強函數作為參數。 例如,與其寫成
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
不如寫成
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
并像往常一樣将函數傳遞給map轉換:
data.map(new MyMapFunction());
也可以定義為匿名類:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
除了使用者定義的函數(map,reduce等)之外,Rich函數還提供了四種方法:open,close,getRuntimeContext和setRuntimeContext。
這些用于參數化函數(請參閱将參數傳遞給函數),建立和完成本地狀态,通路廣播變量以及通路運作時資訊(如累加器和計數器)
7 支援的資料類型
Flink對DataSet或DataStream中可以包含的元素類型設定了一些限制。 原因是系統分析類型以确定有效的執行政策。
有六種不同類别的資料類型:
- Java 元組 and Scala Case 類
- Java POJOs
- 原生類型
- Regular Classes
- Values
- Hadoop Writables
- Special Types
7.1 元組 and Case 類
7.1.1 Java版本
元組是包含固定數量的具有各種類型的字段的複合類型。 Java API提供從Tuple0到Tuple25的類。
元組的每個字段都可以是包含更多元組的任意的Flink的類型,進而産生嵌套元組。 可以使用字段名稱tuple.f4直接通路元組的字段,也可以使用通用getter方法tuple.getField(int position)。 字段索引從0開始。
這與Scala的元組形成對比,但Java的正常索引更為一緻。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
7.1.2 Scala版本
Scala case類(和Scala元組是case類的特例)是包含固定數量的具有各種類型的字段的複合類型。 元組字段由它們的1偏移名稱尋址,例如第一個字段的_1。 字段按名稱通路。
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
7.2 POJOs
如果滿足以下要求,則Flink将Java和Scala類視為特殊的POJO資料類型:
- public限定
- 它必須有一個沒有參數的公共構造函數(預設構造函數)。
- 所有字段都是public的,或者必須通過getter和setter函數通路。 對于名為foo的字段,getter和setter方法必須命名為getFoo()和setFoo()。
- Flink必須支援字段的類型。 目前,Flink使用Avro序列化任意對象(例如Date)。
Flink分析POJO類型的結構,即它了解POJO的字段。 是以,POJO類型比一般類型更容易使用。 此外,Flink可以比一般類型更有效地處理POJO。
以下示例顯示了一個包含兩個公共字段的簡單POJO。
7.2.1 Java版本
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
7.2.2 Scala 版本
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
7.3 原生類型
Flink支援所有Java和Scala原生類型,如Integer,String和Double。
7.4 General Class Types
Flink支援大多數Java和Scala類(API和自定義)。 限制适用于包含無法序列化的字段的類,如檔案指針,I / O流或其他本機資源。 遵循Java Beans約定的類通常可以很好地工作。
所有未辨別為POJO類型的類都由Flink作為正常類類型處理。 Flink将這些資料類型視為黑盒子,并且無法通路其内容(即,用于有效排序)。 使用序列化架構Kryo對正常類型進行反序列化。
7.5 Values
值類型手動描述其序列化和反序列化。
它們不是通過通用序列化架構,而是通過使用讀取和寫入方法實作org.apache.flinktypes.Value接口來為這些操作提供自定義代碼。當通用序列化效率非常低時,使用值類型是合理的。
一個示例是将元素的稀疏向量實作為數組的資料類型。知道數組大部分為零,可以對非零元素使用特殊編碼,而通用序列化隻需編寫所有數組元素。
org.apache.flinktypes.CopyableValue接口以類似的方式支援手動内部克隆邏輯。
Flink帶有與基本資料類型對應的預定義值類型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。這些值類型充當基本資料類型的可變變體:它們的值可以被更改,允許程式員重用對象并從垃圾收集器中消除壓力。
7.6 Hadoop Writables
可以使用實作org.apache.hadoop.Writable接口的類型。 write()和readFields()方法中定義的序列化邏輯将用于序列化。
7.7 Special Types
可以使用特殊類型,包括Scala的Either,Option和Try
Java API有自己的自定義Either實作。 與Scala的Either類似,它代表兩種可能類型的值,左或右。 兩者都可用于錯誤處理或需要輸出兩種不同類型記錄的運算符。
7.8 Type Erasure & Type Inference
僅适用于Java
Java編譯器在編譯後抛棄了大部分泛型類型資訊。這在Java中稱為類型擦除。這意味着在運作時,對象的執行個體不再知道其泛型類型。例如,DataStream 和DataStream 的執行個體于JVM看起來相同。
Flink在準備執行程式時(當調用程式的主要方法時)需要類型資訊。 Flink Java API嘗試重建以各種方式丢棄的類型資訊,并将其顯式存儲在資料集和運算符中。您可以通過DataStream.getType()檢索類型。該方法傳回TypeInformation的一個執行個體,這是Flink表示類型的内部方式。
類型推斷有其局限性,在某些情況下需要程式員的“合作”。這方面的示例是從集合建立資料集的方法,例如
ExecutionEnvironment.fromCollection()
可以在其中傳遞描述類型的參數。但是像MapFunction 這樣的通用函數也可能需要額外的類型資訊。
ResultTypeQueryable接口可以通過輸入格式和函數實作,以明确告知API其傳回類型。調用函數的輸入類型通常可以通過先前操作的結果類型來推斷。
參考
Apache Flink更多内容請關注JavaEdge 公-号