天天看點

專家帶你吃透 Flink 架構:一個新版 Connector 的實作

作者:閃念基因

前言

Flink 可以說已經是流計算領域的事實标準,其開源社群發展迅速,提出了很多改進計劃(Flink Improvement Proposals,簡稱 FLIP [1])并不斷疊代,幾乎每個新的版本在功能、性能和使用便捷性上都有所提高。Flink 提供了豐富的資料連接配接器(connecotr)來連接配接各種資料源,内置了 kafka [2]、jdbc [3]、hive [4]、hbase [5]、elasticsearch [6]、file system [7] 等常見的 connector,此外 Flink 還提供了靈活的機制友善開發者開發新的 connector。

對于 source connector 的開發,有基于傳統的 SourceFunction [8] 的方式和基于 Flink 改進計劃 FLIP-27 [9] 的 Source [10] 新架構的方式。本文首先介紹基于 SourceFunction 方式的不足,接着介紹 Source 新架構以及其設計上的深層思考,然後基于 Flink 1.13 ,以從零開發一個簡單的 FileSource connector 為例,介紹開發 source connector 的基本要素,盡量做到理論與實踐相結合加深大家的了解。

流計算 Oceanus 是大資料産品生态體系的實時化分析利器,是基于 Apache Flink 建構的具備一站開發、無縫連接配接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大資料分析平台。流計算 Oceanus 以實作企業資料價值最大化為目标,加速企業實時化數字化的建設程序。流計算 Oceanus 提供了便捷的控制台環境,友善使用者編寫 SQL 分析語句、ETL 作業或者上傳運作自定義 JAR 包,支援作業運維管理。歡迎大家試用,目前還有新使用者1元秒殺活動,機會難得,不容錯過。

Source 舊架構

在 Flink 1.12 之前,開發一個 source connector 通過實作 SourceFunction [8] 接口來完成,官方給出的通用的實作模式如下。當 source 開始發送資料時,run 方法被調用,其參數 SourceContext 用于發送資料。run 方法是一個無限循環,通過一個辨別 isRunning 來跳出循環結束 source。批模式和流模式通常需要不同的處理邏輯,例如示例的批模式通過一個計數器來結束批資料。此外,還需要通過 checkpoint 鎖來保證狀态更新和資料發送的原子性。值得一提的是,Flink 在 SourceFunction 之上抽象出了 InputFormatSourceFunction,開發者隻需要實作 InputFormat,批模式 source connector(如 HBase)通常基于 InputFormat 實作,當然 InputFormat 也可以用于流模式,在一定程度上展現了批流融合的思想,但整體上來看至少在接口層面上流批并沒有完全一緻。

public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
      private long count = 0L;
      private volatile boolean isRunning = true;


      private transient ListState<Long> checkpointedCount;


      public void run(SourceContext<T> ctx) {
          while (isRunning && count < 1000) {
              // this synchronized block ensures that state checkpointing,
              // internal state updates and emission of elements are an atomic operation
              synchronized (ctx.getCheckpointLock()) {
                  ctx.collect(count);
                  count++;
              }
          }
      }


      public void cancel() {
          isRunning = false;
      }


      public void initializeState(FunctionInitializationContext context) {
          this.checkpointedCount = context
              .getOperatorStateStore()
              .getListState(new ListStateDescriptor<>("count", Long.class));


          if (context.isRestored()) {
              for (Long count : this.checkpointedCount.get()) {
                  this.count += count;
              }
          }
      }


      public void snapshotState(FunctionSnapshotContext context) {
          this.checkpointedCount.clear();
          this.checkpointedCount.add(count);
      }
 }           

在基于 SourceFunction 的開發模式下,以 Kafka Source 為例,見下圖,FlinkKafkaConsumer 為 SourceFunction 的實作類,該類中集中了 kafka partition 發現邏輯(KafkaPartitionDiscoverer)、資料讀取邏輯(KafkaFetcher)、基于阻塞隊列實作的生産者消費者模型(KafkaConsumerThread -> Handover -> SourceContext)等等。

專家帶你吃透 Flink 架構:一個新版 Connector 的實作

我們可以發現,這種開發模式存在如下不足:

  1. 首先對于批模式和流模式需要不同的處理邏輯,不符合批流融合的業界趨勢。
  2. 資料分片(例如 kafka partition、file source 的檔案 split)和實際資料讀取邏輯混合在 SourceFunction 中,導緻複雜的實作。
  3. 資料分片在接口中并不明确,這使得很難以獨立于 source 的方式實作某些功能,例如事件時間對齊(event-time alignment)、分區 watermarks(per-partition watermarks)、動态資料分片配置設定、工作竊取(work stealing)。
  4. 沒有更好的方式來優化 Checkpoint 鎖,在鎖争用下,一些線程(例如 checkpoint 線程)可能無法獲得鎖。
  5. 沒有通用的構模組化式,每個源都需要實作自行實作複雜的線程模型,這使得開發和測試一個新的 source 變得困難,也提高了開發者對現有 source 的作出貢獻的門檻。

有鑒于此,Flink 社群提出了 FLIP-27 [9] 的改進計劃,并在 Flink 1.12 實作了基礎架構,在 Flink 1.13 中 kafka、hive 和 file source 已移植到新架構,開源社群的 Flink CDC connector 2.0 [11] 也基于新架構實作。

Source 新架構

專家帶你吃透 Flink 架構:一個新版 Connector 的實作

基于 FLIP-27 的 Source 新架構如上圖所示,由兩個主要部件組成:SplitEnumerator 和 SourceReader。SplitEnumerator 負責資料分片和配置設定,SourceReader 則負責具體分片資料的讀取。當一個新的分片被 SplitEnumerator 添加到 SourceReader,首先初始化分片狀态并放入狀态哈希表中,然後分片被配置設定給 SplitReader 讀取資料。讀取的資料以小批量模式封裝于 RecordsWithSplitIds 并放置于中間隊列 Queue,這種批量資料模式可以提高性能。SourceReader 從 Queue 中擷取一批資料,周遊每一條資料,并查找資料相應的分片狀态,資料和分片狀态一并傳遞給 RecordEmitter,RecordEmitter 先把資料傳遞給 SourceOutput,然後更新分片狀态。狀态哈希表中的狀态在 checkpoint 時持久化到狀态存儲。

Source 新架構具有以下特點。

資料分片與資料讀取分離。例如在 FileSource 中,SplitEnumerator 負責列出所有的檔案,并有可能把檔案按塊或者範圍進行切分,SourceReader 則負責具體的檔案/塊的資料讀取。又例如在 KafkaSource 中,SplitEnumerator 負責發現需要讀取的 kafka partition,SourceReader 則負責具體 partition 資料的讀取。

批流融合。基于新架構開發的 Source 既可以工作于批模式也可以工作于流模式,批僅僅是有界的流。大多數情況下,隻有 SplitEnumerator 需要感覺資料源是否有界。例如對于 FileSource,批模式下 SplitEnumerator 隻需要一次性的列出目錄下的所有檔案,流模式下則需要周期性的列出所有檔案,并為新增的檔案生成資料分片。對于 KafkaSource,批模式下 SplitEnumerator 列出所有的 partition,并把每個 partition 的目前最新的資料偏移作為資料分片的結束點,流模式下 SplitEnumerator 則把無窮大作為 partition 資料分片的結束點,即會持續的讀取每個 partition 的新增資料,流模式下還可以周期性的監測 partition 的變化并為新增的 partitition 生成資料分片。

雙向通信。SplitEnumerator 運作在 JobManager,SourceReader 運作在 TaskManager,SplitEnumerator 和 SourceReader 之間可以雙向通信,SourceReader 可以主動向 SplitEnumerator 請求資料分片實作 pull 模式的資料分片配置設定(例如 FileSource),SplitEnumerator 也可以把資料分片直接配置設定給 SourceReader 實作 push 模式的配置設定(例如 KafkaSource)。此外,根據需要還可以定制化一些消息實作 SplitEnumerator 和 SourceReader 之間的互動需求。基于雙向通信的能力,比較容易實作事件時間對齊(event-time alignment)的功能,實作資料分片之間事件時間的均衡推進。

通用線程模型。考慮到外部資料源系統的用戶端 API 調用方式的差異(阻塞、非阻塞、異步),SourceReader 在設計上支援單分片串行讀取、多分片多路複用、多分片多線程三種模式。Flink 1.13 核心的 SingleThreadMultiplexSourceReaderBase/SingleThreadFetcherManager 抽象出的架構支援前兩種線程模型,開發者基于此開發 source connector 變得容易。例如 FileSource 采用了單分片串行讀取模式,在一個資料分片讀取後,再向 SplitEnumerator 請求新的資料分片。KafkaSource 采用了多分片多路複用模式,SplitEnumerator 把啟動時讀取的 partition 清單和定期監測時發現的新的 partition 清單批量配置設定給 SourceReader,SourceReader 使用 KafkaConsumer API 讀取所有配置設定到的 partition 的資料。

專家帶你吃透 Flink 架構:一個新版 Connector 的實作

容錯。SplitEnumerator 和 SourceReader 通過 Flink 的分布式快照機制持久化狀态,發生異常時從狀态恢複。通常 SplitEnumerator 狀态儲存了未配置設定的資料分片,SourceReader 狀态儲存了配置設定的資料分片以及分片讀取狀态(例如 kafka offset,檔案 offset)。例如流模式下 FileSource 的 SplitEnumerator 狀态儲存了未配置設定的分片以及處理過的檔案清單,并定期監測檔案清單的變化,為新增檔案生成資料分片;SourceReader 狀态儲存了目前讀取的分片資訊和檔案讀取 offset。

FileSource 開發實踐

下面我們進入實際操作階段,基于新架構開一個簡單的 FileSource connector,該 connector 工作于流模式,讀取指定目錄下的檔案,并定期監測新增檔案。

初始化項目

  1. 我們使用 IntelliJ IDEA 作為開發工具,并按照 Flink 編碼規範配置 IntelliJ IDEA [12],在菜單欄選擇 "File -> New -> Project...",填寫必要的資訊,按照提示操作完成項目建立。
專家帶你吃透 Flink 架構:一個新版 Connector 的實作
專家帶你吃透 Flink 架構:一個新版 Connector 的實作
  1. 在 pom.xml [13] 檔案添加必要的 Flink 依賴。
  2. Flink 基于 Java SPI 機制l發現和加載自定義 connector,我們在 resources 目錄下建立目錄 META-INF/services,并在該目錄下建立檔案 org.apache.flink.table.factories.Factory,檔案内容為:
com.tencent.cloud.oceanus.connector.file.table.FileDynamicTableFactory           
  1. 建立 Java 類 com.tencent.cloud.oceanus.connector.file.table.FileDynamicTableFactory 實作 DynamicTableSourceFactory。
  2. 現在,我們項目初始化已經完成,可以在 IntelliJ IDEA 項目右側選擇 "Maven -> flink-connector-files -> LifeCyle -> package" 建構項目,能夠在 target 目錄下正确建構出名為 flink-connector-files-1.0.0.jar 的二進制包。

Connector 開發

我們按照 Flink 官方的自定義 connector 開發文檔 [14] 來一步步完成 FileSource connector 的開發。

Metadata 層

簡單起見,我們的 connector 隻支援按行讀取指定目錄的檔案,在 SQL 語句中按如下方式使用 connector。

CREATE TABLE test (
  `line` STRING
) WITH (
  'connector' = 'file',
  'path' = 'file:///path/to/files'
);           

Planning 層

  1. 建立類 FileDynamicTableFactory [15],添加自定義 connector 辨別 file和參數 path。校驗參數并建立 FileDynamicSource [16]。
  2. FileDynamicSource [16] 建立 Runtime 層的 FileSource [17]。

Runtime 層

FileSource [17]

實作 Source [10] 接口,需要三個類型參數:第一類型參數為 Source 輸出資料類型,由于我們的 connector 用于 SQL 作業場景,這裡設定為 RowData 類型。第二個類型參數為資料分片類型 SourceSplit [18]。第三個類型參數為 SplitEnumerator checkpoint 資料類型。

FileSource 是一個工廠類,用于建立 SplitEnumerator、SourceReader、資料分片序列化器、SplitEnumerator checkpoint 序列化器。

FileSourceSplit [19]

實作 SourceSplit [18]。該類儲存了資料分片 id、檔案路徑、資料分片起始位置的檔案偏移(我們這裡整個檔案作為一個資料分片,不再細分,是以偏移始終為 0)、檔案長度、檔案讀取進度(恢複時從該位置繼續資料讀取)。

FileSourceSplitSerializer [20]

資料分片序列化器,對 FileSourceSplit [19] 序列化和反序列化。資料分片在從 SplitEnumerator 傳輸到 SourceReader,以及被 SourceReader checkpoint 持久化時都需要序列化。

PendingSplitsCheckpoint [21]

SplitEnumerator checkpoint 資料,儲存了未配置設定的分片以及處理過的檔案清單。

PendingSplitsCheckpointSerializer [22]

SplitEnumerator checkpoint 序列化器,對 PendingSplitsCheckpoint [21] 序列化和反序列化。

FileSourceEnumerator [23]

定期監測目錄下的檔案,生成資料分片,并配置設定給 SourceReader。Flink 核心提供了定時回調接口

SplitEnumeratorContext#callAsync [24] 友善我們使用。這裡我們采用 pull 模式的資料分片配置設定政策。

FileSourceReader [25]

繼承自 SingleThreadMultiplexSourceReaderBase,在讀取完一個資料分片(檔案)後再向 FileSourceEnumerator [23] 請求下一個分片。我們需要實作資料分片狀态初始化接口 initializedState [26],當新的資料分片加入時會調用該接口。實作接口 toSplitType [27],把可變的資料分片狀态 FileSourceSplitState [28] 轉換為不可變的資料分片 FileSourceSplit [19],checkpoint 時會調用該接口得到最新狀态的 FileSourceSplit 并持久化。FileSourceRecordEmitter [29] 發送資料到下遊,并更新 FileSourceSplitState 的分片讀取進度。具體分片資料讀取邏輯在 FileSourceSplitReader [30] 實作,這裡我們簡單的每次讀取一行資料。

Connector 測試

基本功能

1.從 Flink 官網下載下傳已經編譯好的二進制包 Apache Flink 1.13.3 for Scala 2.11 [31] 并解壓,進入解壓後的目錄。拷貝我們開發的 connector 二進制包 flink-connector-files-1.0.0.jar [32] 到 lib 目錄。

tar -zxvf flink-1.13.3-bin-scala_2.11.tgz
cd flink-1.13.3
cp flink-connector-files-1.0.0.jar lib/ -avi           

2.啟動本地叢集。可在本地浏覽器裡打開 http://localhost:8081 進入 Flink UI 驗證叢集是否啟動成功。

./bin/start-cluster.sh           

3.建立測試資料目錄,我們的 connector 從該目錄下讀取檔案。然後進入 sql client 指令行。

mkdir -p /tmp/file-connector
./bin/sql-client.sh           

4.在 sql client 指令行輸入。

create table `source` (
    `line` STRING
) with (
    'connector' = 'file',
    'path' = '/tmp/file-connector'
);


select * from `source`;           

5.我們往目錄 /tmp/file-connector 寫入幾個檔案測試一下,可見我們的 connector 能正常的讀取檔案資料。

cd /tmp/file-connector
echo "Hello World" > 1.txt
echo "tencent" > 2.txt
echo "oceanus" > 3.txt  
echo "我愛我的祖國" > 4.txt           
專家帶你吃透 Flink 架構:一個新版 Connector 的實作

狀态和容錯

1.在 Flink 配置 conf/flink-conf.yaml 添加狀态存儲配置,設定 checkpoint 和 savepoint 目錄,checkpoint 時間間隔,以及 Flink 重新開機政策。

state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints
state.savepoints.dir: file:///tmp/flink-savepoints
execution.checkpointing.interval: 30s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 30
restart-strategy.fixed-delay.delay: 5s           

2.重新開機叢集,并重新進入 sql client 指令行。

./bin/stop-cluster.sh
./bin/start-cluster.sh
./bin/sql-client.sh           

3.重新送出作業。

create table `source` (
    `line` STRING
) with (
    'connector' = 'file',
    'path' = '/tmp/file-connector'
);


select * from `source`;           

4.這次我們準備一個大的資料檔案 bigfile.txt,例如有上千萬行,拷貝檔案到目錄 /tmp/file-connector 。

5.在浏覽器輸入 http://localhost:8081 進入 Flink UI 界面,點選 'Running Jobs',可看到我們正在運作的作業,點選作業進入作業詳情,選擇 Checkpoints 标簽頁,然後選擇 History 的子标簽頁,點選右側的 Refresh 按鈕。

專家帶你吃透 Flink 架構:一個新版 Connector 的實作
專家帶你吃透 Flink 架構:一個新版 Connector 的實作

6.在觀察到一次成功的 checkpoint 之後,我們重新開機 taskmanager。

./bin/taskmanager.sh stop
./bin/taskmanager.sh start           

7.稍等一會兒,我們可在 Flink UI 裡觀察到作業從 checkpoint 恢複的資訊。

專家帶你吃透 Flink 架構:一個新版 Connector 的實作

8.同時我們在 taskmanager 日志裡可觀察到作業恢複時的資料分片資訊包含 checkpoint 時儲存的檔案讀取 offset 資訊。

專家帶你吃透 Flink 架構:一個新版 Connector 的實作

FileSource 開發實踐總結

作為流計算領域的事實标準,Flink 有着優秀的架構設計,其強大的可擴充能力讓我們開發一個自定義 connector 變得簡單。Flink 社群的文檔也非常豐富和詳細,這裡我們按照 Flink 自定義 connector 開發文檔,基于 FLIP-27 的 Source 新架構開發了一個簡單 FileSource connector,并示範了其基本功能和錯誤恢複功能。我們在開發新的 connector 時可以多參考社群已有 connector 的設計和開發模式,甚至可以對現有 connector 進行功能增強來滿足需求。我們這裡的 FileSource connector 就是參考核心的 filesystem connector [7],進行了簡化以友善大家了解。核心的 filesystem connector 不支援同一個檔案的增量資料讀取,如果我們實際業務場景一定需要這種能力,我們可以給 filesystem connector 添加該功能并貢獻到社群。

總結

本文首先介紹了 Flink Source Connector 開發時基于傳統的 SourceFunction 方式的不足,接着介紹了 FLIP-27 的 Source 新架構特點及其優勢,然後基于新架構從零開發了一個簡單的 FileSource connector,介紹開發 source connector 的基本要素。

參考連結

[1] Flink Improvement Proposals:https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

[2] kafka:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/

[3] jdbc:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/

[4] hive:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/

[5] hbase:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/

[6] elasticsearch:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/elasticsearch/

[7] file system:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/

[8] SourceFunction:https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html

[9] FLIP-27:https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

[10] Source:https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html

[11] Flink CDC connector 2.0:https://ververica.github.io/flink-cdc-connectors/master/

[12] 配置 IntelliJ IDEA:https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/#intellij-idea

[13] pom.xml:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/pom.xml

[14] 自定義 connector 開發:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/

[15] FileDynamicTableFactory:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/table/FileDynamicTableFactory.java

[16] FileDynamicSource:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/table/FileDynamicSource.java

[17] FileSource:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/source/FileSource.java

[18] SourceSplit:https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html

[19] FileSourceSplit:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/split/FileSourceSplit.java

[20] FileSourceSplitSerializer:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/split/FileSourceSplitSerializer.java

[21] PendingSplitsCheckpoint:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/enumerator/PendingSplitsCheckpoint.java

[22] PendingSplitsCheckpointSerializer:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/enumerator/PendingSplitsCheckpointSerializer.java

[23] FileSourceEnumerator:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/enumerator/FileSourceEnumerator.java

[24] SplitEnumeratorContext#callAsync:https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#callAsync-java.util.concurrent.Callable-java.util.function.BiConsumer-long-long-

[25] FileSourceReader:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/reader/FileSourceReader.java

[26] initializedState:https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-

[27] toSplitType:https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-

[28] FileSourceSplitState:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/reader/FileSourceSplitState.java

[29] FileSourceRecordEmitter:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/reader/FileSourceRecordEmitter.java

[30] FileSourceSplitReader:https://github.com/lzshlzsh/tflink-playgrounds/blob/master/flink-connector-files/src/main/java/com/tencent/cloud/oceanus/connector/file/reader/FileSourceSplitReader.java

[31] Apache Flink 1.13.3 for Scala 2.11:https://www.apache.org/dyn/closer.lua/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz

[32] flink-connector-files-1.0.0.jar:https://github.com/lzshlzsh/tflink-playgrounds/releases/download/v0.1.0/flink-connector-files-1.0.0.jar

作者:劉澤善,騰訊CSIG專家工程師

來源:微信公衆号:騰訊雲大資料

出處:https://mp.weixin.qq.com/s/azGu5-kFzhG6qCHIeepIMw