前幾篇介紹了Flink的入門、架構原理、安裝等,相信你對Flink已經了解入門。接下來開始介紹Flink DataStream API内容,先介紹DataStream API基本概念和使用,然後介紹核心概念,最後再介紹經典案例和代碼實作。本篇内容:Flink DataStream API的概念、模式、作業流程和程式。
1、基本概念
用于處理資料流的API稱之為DataStream API,而DataStream類用于表示Flink程式中的資料集合。你可以将它視為包含重複項的不可變資料集合。這些資料可以是有限的,也可以是無限的,用于處理這些資料的API是相同的。
DataStream資料集都是分布式資料集,分布式資料集是指:一個資料集存儲在不同的伺服器節點上,每個節點存儲資料集的一部分,例如下圖:
在程式設計時,可以把DataStream看作一個資料操作的基本機關,而不必關心資料的分布式特性,Flink會自動将其中的資料分發到叢集的各個節點。
2、執行模式
Flink的執行模式分為3種:
- STREAMING:典型的DataStream執行模式(預設)
- BATCH:在DataStream API上以批處理方式執行
- AUTOMATIC:讓系統根據資料源的有界性來決定
3、作業流程和程式結構
3.1、Flink作業流程
前面我們介紹過Flink JobManager是Flink叢集的主節點,它包含3個不同的元件:Flink Resource Manager、Dispatcher、運作每個Flink Job的JobMaster。JobManager和TaskManager被啟動後,TaskManager會将自己注冊給JobManager中的ResourceManager(資源注冊)。
Flink作業流程如下:
- 使用者編寫應用程式代碼,并通過Flink用戶端送出作業。,調用Flink API建構邏輯資料流圖,然後轉為作業圖JobGraph,并附加到StreamExecutionEnvironment中。代碼和相關配置檔案被編譯打包,被送出到JobManager的Dispatcher,形成一個應用作業。
- Dispatcher(JobManager的一個元件)接收到這個作業,啟動JobManager,JobManager負責本次作業的各項協調工作。
- 接下來JobManager向ResourceManager申請本次作業所需的資源。
- JobManager将使用者作業中的作業圖JobGraph轉化為并行化的實體執行圖,對作業并行處理并将其子任務分發部署到多個TaskManager上執行。每個作業的并行子任務将在Task Slot中執行。至此Flink作業就開始執行了
- TaskManager在執行計算任務的過程中可能會與其他TaskManager交換資料,會使用相應的資料交換政策。同時,TaskManager也會将一些任務狀态資訊回報給JobManager,這些資訊包括任務啟動、運作或終止的狀态、快照的中繼資料等。
Flink作業流程圖見下圖:
3.2、Flink程式結構
前面我們介紹過,Flink的程式是有固定模闆的,具體如下:
- 擷取執行環境
- 加載/建立初始資料
- 對初始資料進行轉換
- 指定計算結果的輸出位置
- 觸發程式執行
所有Flink程式都是**延遲(惰性)**執行的:執行程式的main()方法時,不會直接進行資料加載和轉換,而是将每個操作添加到資料流圖,當在執行環境中調用execute()顯式觸發執行時才會執行這些操作。程式是在本地執行還是在群集上執行取決于執行環境的類型。惰性計算允許建構複雜的程式,Flink将其作為一個整體規劃的單元執行。
Flink的程式模闆見下面的示例。示例采用流計算,讀取socket資料源,對輸入的資料進行統計,最後輸出到控制台。執行main方法前,現在本地開啟netcat,nc -lk 9999,然後輸入任意字元,即可看到統計結果。
public static void main(String[] args) throws Exception {
// 1. 建立執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 讀取資料源
DataStream<String> textStream = env.socketTextStream("localhost", 9999, "\n");
// 3. 資料轉換
DataStream<Tuple2<String, Integer>> wordCountStream = textStream
// 對資料源的單詞進行拆分,每個單詞記為1,然後通過out.collect将資料發射到下遊算子
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : value.split("\\s")) {
out.collect(new Tuple2<>(word, 1));
}
}
}
)
// 對單詞進行分組
.keyBy(value -> value.f0)
// 對某個組裡的單詞的數量進行滾動相加統計
.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
// 4. 資料輸出。位元組輸出到控制台
wordCountStream.print("WordCountStream=======").setParallelism(1);
// 5. 啟動任務
env.execute(WordCountStream.class.getSimpleName());
}
原文連結:http://www.mangod.top/articles/2023/07/31/1690758123965.html、https://mp.weixin.qq.com/s/r6EQF5s51wuanQOR3QJZWw
感謝你的閱讀,碼字不易,歡迎點贊、關注、收藏 !!!