天天看點

Flink(三):Flink 的主力 API -DataStream API

作者:青梅主碼

大家好,我是傑哥

今天,繼續來探索 Flink ,今天我們主要來看看 Flink 為我們所提供的分層 API

一 Flink 的分層 API

Flink 不僅具有高吞吐、低延遲、高可用等優秀特性,而且還提供了易于使用的分層 API,是以它也是一個易于開發的架構,它的 API 分層如圖所示

Flink(三):Flink 的主力 API -DataStream API

Flink 包含三層 API:低級 APIs - 有狀态處理、核心 APIs - DataStream API 以及 進階 APIs - Table API & SQL

1、有狀态處理

最底層級的抽象僅僅提供了有狀态流,它将處理函數(Process Function)嵌入到了DataStream API 中。通過實作 ProcessFunction 接口來進行操作。ProcessFunction 可以處理一或兩條輸入資料流中的單個事件或者歸入一個特定視窗内的多個事件。它提供了對于時間和狀态的細粒度控制。開發者可以在其中任意地修改狀态,也能夠注冊定時器用以在未來的某一時刻觸發回調函數

2、核心 APIs-DataStream API

DataStream?是不是感覺還有點兒熟悉呢?

是的,上一節的例子裡面,它就出現過,我們使用 DataStream 類來存放 Flink 程式中的資料集合,供 Flink 進行處理

// 2-  讀取文本流 (nc -lk 7777)
DataStreamSource<String> lineDataStream = env.socketTextStream("localhost", 7777);
           

其中 DataStreamSource 其實就是內建自 DataStream而這裡所提到的 DataStream API 的名字就來自于 DataStream 類。這個類中,可以存放包含重複項的不可變資料集合。集合中的資料可以是有限的,也可以是無界的

DataStream 在使用方面類似于正常的 Java 中的集合,但在一些關鍵方面有很大的不同,比如它們是不可變的,這意味着一旦建立了它們,就不能添加或删除元素。此外,我們也不能檢視裡面的元素,隻能使用 DataStream API 操作(也稱為轉換)對它們進行一些變換操作

可以通過在 Flink 程式中添加源來建立初始 DataStream。然後,再可以從中派生新的流,并使用映射、過濾器等 API 方法将它們組合在一起

3、Table API & SQL API

Table API 是一個針對 Java、Scala 和 Python 的語言內建查詢 API,它允許以非常直覺的方式組合來自關系操作符(如選擇、篩選和連接配接)的查詢。Flink 的 SQL API 基于 Apache Calcite,它實作了 SQL 标準的文法

Flink 提供的最高層級的抽象是 SQL。這一層抽象在文法與表達能力上與 Table API 類似, 但是是以 SQL 查詢表達式的形式表現程式。SQL 抽象與 Table API 互動密切,同時 SQL 查詢 可以直接在 Table API 定義的表上執行

Flink 的 分層 API 中,最主要、最常用的是 DataStream API 和 Table API & SQL 。我們可以将兩者單獨使用,也可以混合使用,取決于我們具體的應用場景

二 DataStream API 認識

我想,從它的操作步驟來認識它,應該是比較直覺的

類似于我們曾經了解到的 JDBC 會有固定的 7 個步驟,使用 Flink 的 DataStream API 進行資料的處理,實際上也包含以下 5 個固定步驟,而 DataStream API 也可以随之被分為 5 類:

1、設定執行環境
  
2、讀取輸入流
  
3、轉換操作
  
4、輸出到一個或多個資料彙中
  
5、執行程式
           

我們來分别看一下

1、設定執行環境

在 Flink 中,可以使用 StreamExecutionEnvironment 的下列三種方式進行執行環境的建立

getExecutionEnvironment();
createLocalEnvironment();
createRemoteEnvironment(String host, int port, String... jarFiles);
           

其中前兩個表示建立的是本地環境,即表示 Flink 程式運作在本地機器。還可以通過指定遠端機器的主機名、端口号以及程式本身所生成的 jar 包最終拷貝到的路徑,建立一個遠端執行環境,使得 Flink 程式運作在所指定的主機上

2、讀取輸入流

StreamExecutionEnvironment 為我們提供了一系列建立流式資料源的方法,使得我們可以将資料流讀取到應用中。這些資料流可以來自于檔案、資料庫、消息隊列等 讀取的資料,就可以統一放入 DataStream 對象中

1)從集合中讀取資料

ArrayList<User> user = new ArrayList<>();
user.add(new User("Mary","./home",1000L));
user.add(new User("Bob","./cart",2000L));
DataStream<User> stream = env.fromCollection(user);
           

2)從檔案中讀取資料

DataStream<String> stream = env.readTextFile("words.txt");
           

3)從 Socket 讀取資料

DataStream<String> stream = env.socketTextStream("localhost", 7777);
           

4)從消息中心讀取資料

DataStreamSource<String> stream = env.addSource(new 
FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
           

5)自定義 Source如果我們需要讀取資料的資料源,Flink 沒有為其提供讀取資料源的方法,我們就可以通過 Flink 提供的自定義 Source 的方式進行了 那就隻好自定義實作 SourceFunction 了。接下來我們建立一個自定義的資料源,實作 SourceFunction 接口。主要重寫兩個關鍵方法:run()和 cancel()

  • run()方法:使用運作時上下文對象(SourceContext)向下遊發送資料;
  • cancel()方法:通過辨別位控制退出循環,來達到中斷資料源的效果。

3、轉換操作

當資料被存入 DataStream 對象中後,我們就可以對它進行轉換操作了 轉換的類型有多種多樣。有些會重新組織一下 DataStream 中的資料流,比如對這些資料進行一次分組或者分區;有些呢,就會生成一個新的 DataStream (類型可能會發生變化)

比如,使用 map()可以将一個輸入流中的所有正方形全部轉換為圓形

今天在這裡就不細說了,下次文章會專門為大家介紹 DataStream API 的轉換操作

4、輸出結果

一般場景下,我們将資料處理的結果總是會發送到某些外部系統,比如檔案、資料庫,以及消息中心中等,當然測試時,可以直接輸出至标準輸出中

比如,輸出到檔案的方式如下:

StreamingFileSink<String> fileSink = StreamingFileSink
.<String>forRowFormat(new Path("./output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)
) 
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5
))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
// 将 Event 轉換成 String 寫入檔案
stream.map(Event::toString).addSink(fileSink);
           

此外,還可以輸出到 Kafka,mysql,pulsar 等

最新版本的 Flink 所支援的所有輸入輸出如下:

Flink(三):Flink 的主力 API -DataStream API

5、執行程式

在應用定義完成之後,我們就可以通過 StreamExecutionEnvironment 類中的 execute() 方法進行執行

Flink 被設計為了延遲計算的方式執行,在 執行 execute() 方法之前,前面的所有程式,隻是在執行環境中建構了一個執行計劃。也就是說,隻有執行了 execute() 方法之後,前面定義的 Flink 應用程式才會被真正執行

這 5 個步驟看完了,我們再将那個簡單的執行個體拿過來看看,确認一下是否是這樣

public class StreamWordCount {
    public static void main(String[] args) throws Exception{
//     1-建立執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//     2-讀取文本流 (nc -lk 7777)
        DataStreamSource<String> lineDataStream = env.socketTextStream("localhost", 7777);
//       3-轉換操作
//       3.1 收集各個單詞,定義為二進制組
        SingleOutputStreamOperator<Tuple2<String, Long>> streamOperator = lineDataStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String words[] = line.split(" ");
            for (String word :words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
//       3.2 分組
        KeyedStream<Tuple2<String, Long>, String> keyedStream = streamOperator.keyBy(data -> data.f0);
//       3.3-統計
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyedStream.sum(1);
//      4-列印
        sum.print();
//      5-啟動執行
        env.execute();
    }
}
           

這個其實就是嚴格按照我們上述所提到的步驟來走的

其中第 2 步驟,是通過一行一行地讀取 socket 文本流内容,進行資料源的輸入的,将所輸入的資料流存入 lineDataStream 中

而第 3 步驟,則是對 lineDataStream 中的資料進行轉換處理操作。首先将各個資料通過 flatMap()方法轉換為二進制組,然後使用 keyBy() 方法,對其元素進行分組,接着再使用 sum()方法,對分組之後的元素進行求和操作

随着資料流的不斷輸入,所輸入的每個單詞出現的次數便會通過程式被源源不斷地輸出

總結

Flink 提供了分層 API,便于我們進行開發。在實際開發中,我們使用最多的就是 DataStream API ,而怎樣認識 DataStream API 呢?從它的操作步驟來看,可以分為 5 個步驟,也就是說 DataStream API 可以分為 5 大類:

1、設定執行環境

2、讀取輸入流

3、轉換操作

4、輸出到一個或多個資料彙中

5、執行程式

嗯,就這樣。每天學習一點,時間會見證你的強大

歡迎大家關注我們的公衆号【青梅主碼】,一起持續性學習吧~

往期精彩回顧

總結複盤

架構設計讀書筆記與感悟總結

帶領新人團隊的沉澱總結

複盤篇:問題解決經驗總結複盤

網絡篇

網絡篇(四):《圖解 TCP/IP》讀書筆記

網絡篇(一):《趣談網絡協定》讀書筆記(一)

事務篇章

事務篇(四):Spring事務并發問題解決

事務篇(三):分享一個隐性事務失效場景

事務篇(一):畢業三年,你真的學會事務了嗎?

Docker篇章

Docker篇(六):Docker Compose如何管理多個容器?

Docker篇(二):Docker實戰,指令解析

Docker篇(一):為什麼要用Docker?

..........

SpringCloud篇章

Spring Cloud(十三):Feign居然這麼強大?

Spring Cloud(十):消息中心篇-Kafka經典面試題,你都會嗎?

Spring Cloud(九):注冊中心選型篇-四種注冊中心特點超全總結

Spring Cloud(四):公司内部,關于Eureka和zookeeper的一場辯論賽

..........

Spring Boot篇章

Spring Boot(十二):陌生又熟悉的 OAuth2.0 協定,實際上每個人都在用

Spring Boot(七):你不能不知道的Mybatis緩存機制!

Spring Boot(六):那些好用的資料庫連接配接池們

Spring Boot(四):讓人又愛又恨的JPA

SpringBoot(一):特性概覽

..........

翻譯

[譯]用 Mint 這門強大的語言來建立一個 Web 應用

【譯】基于 50 萬個浏覽器指紋的新發現

使用 CSS 提升頁面渲染速度

WebTransport 會在不久的将來取代 WebRTC 嗎?

.........

職業、生活感悟

你有沒有想過,旅行的意義是什麼?

程式員的職業規劃

靈魂拷問:人生最重要的是什麼?

如何高效學習一個新技術?

如何讓自己更坦然地度過一天?

..........

繼續閱讀