天天看點

Flink DataStream API-概念、模式、作業流程和程式

作者:不焦躁的程式員

前幾篇介紹了Flink的入門、架構原理、安裝等,相信你對Flink已經了解入門。接下來開始介紹Flink DataStream API内容,先介紹DataStream API基本概念和使用,然後介紹核心概念,最後再介紹經典案例和代碼實作。本篇内容:Flink DataStream API的概念、模式、作業流程和程式。

1、基本概念

用于處理資料流的API稱之為DataStream API,而DataStream類用于表示Flink程式中的資料集合。你可以将它視為包含重複項的不可變資料集合。這些資料可以是有限的,也可以是無限的,用于處理這些資料的API是相同的。

DataStream資料集都是分布式資料集,分布式資料集是指:一個資料集存儲在不同的伺服器節點上,每個節點存儲資料集的一部分,例如下圖:

Flink DataStream API-概念、模式、作業流程和程式

在程式設計時,可以把DataStream看作一個資料操作的基本機關,而不必關心資料的分布式特性,Flink會自動将其中的資料分發到叢集的各個節點。

2、執行模式

Flink的執行模式分為3種:

  1. STREAMING:典型的DataStream執行模式(預設)
  2. BATCH:在DataStream API上以批處理方式執行
  3. AUTOMATIC:讓系統根據資料源的有界性來決定

3、作業流程和程式結構

3.1、Flink作業流程

前面我們介紹過Flink JobManager是Flink叢集的主節點,它包含3個不同的元件:Flink Resource Manager、Dispatcher、運作每個Flink Job的JobMaster。JobManager和TaskManager被啟動後,TaskManager會将自己注冊給JobManager中的ResourceManager(資源注冊)。

Flink作業流程如下:

  1. 使用者編寫應用程式代碼,并通過Flink用戶端送出作業。,調用Flink API建構邏輯資料流圖,然後轉為作業圖JobGraph,并附加到StreamExecutionEnvironment中。代碼和相關配置檔案被編譯打包,被送出到JobManager的Dispatcher,形成一個應用作業。
  2. Dispatcher(JobManager的一個元件)接收到這個作業,啟動JobManager,JobManager負責本次作業的各項協調工作。
  3. 接下來JobManager向ResourceManager申請本次作業所需的資源。
  4. JobManager将使用者作業中的作業圖JobGraph轉化為并行化的實體執行圖,對作業并行處理并将其子任務分發部署到多個TaskManager上執行。每個作業的并行子任務将在Task Slot中執行。至此Flink作業就開始執行了
  5. TaskManager在執行計算任務的過程中可能會與其他TaskManager交換資料,會使用相應的資料交換政策。同時,TaskManager也會将一些任務狀态資訊回報給JobManager,這些資訊包括任務啟動、運作或終止的狀态、快照的中繼資料等。

Flink作業流程圖見下圖:

Flink DataStream API-概念、模式、作業流程和程式

3.2、Flink程式結構

前面我們介紹過,Flink的程式是有固定模闆的,具體如下:

  1. 擷取執行環境
  2. 加載/建立初始資料
  3. 對初始資料進行轉換
  4. 指定計算結果的輸出位置
  5. 觸發程式執行

所有Flink程式都是**延遲(惰性)**執行的:執行程式的main()方法時,不會直接進行資料加載和轉換,而是将每個操作添加到資料流圖,當在執行環境中調用execute()顯式觸發執行時才會執行這些操作。程式是在本地執行還是在群集上執行取決于執行環境的類型。惰性計算允許建構複雜的程式,Flink将其作為一個整體規劃的單元執行。

Flink的程式模闆見下面的示例。示例采用流計算,讀取socket資料源,對輸入的資料進行統計,最後輸出到控制台。執行main方法前,現在本地開啟netcat,nc -lk 9999,然後輸入任意字元,即可看到統計結果。

public static void main(String[] args) throws Exception {
    // 1. 建立執行環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 2. 讀取資料源
    DataStream<String> textStream = env.socketTextStream("localhost", 9999, "\n");
    // 3. 資料轉換
    DataStream<Tuple2<String, Integer>> wordCountStream = textStream
            // 對資料源的單詞進行拆分,每個單詞記為1,然後通過out.collect将資料發射到下遊算子
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                         @Override
                         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                             for (String word : value.split("\\s")) {
                                 out.collect(new Tuple2<>(word, 1));
                             }
                         }
                     }
            )
            // 對單詞進行分組
            .keyBy(value -> value.f0)
            // 對某個組裡的單詞的數量進行滾動相加統計
            .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
    // 4. 資料輸出。位元組輸出到控制台
    wordCountStream.print("WordCountStream=======").setParallelism(1);
    // 5. 啟動任務
    env.execute(WordCountStream.class.getSimpleName());
}
           

原文連結:http://www.mangod.top/articles/2023/07/31/1690758123965.html、https://mp.weixin.qq.com/s/r6EQF5s51wuanQOR3QJZWw

感謝你的閱讀,碼字不易,歡迎點贊、關注、收藏 !!!

繼續閱讀