天天看點

網易遊戲 Flink SQL 平台化實踐

摘要:本文整理自網易遊戲資深開發工程師林小鉑在 Flink Forward Asia 2021 平台建設專場的演講。主要内容包括:
  1. 網易遊戲 Flink SQL 發展曆程
  2. 基于模闆 jar 的 StreamflySQL v1
  3. 基于 SQL Gateway 的 StreamflySQL v2
  4. 未來工作

一、網易遊戲 Flink SQL 發展曆程

網易遊戲 Flink SQL 平台化實踐

網易遊戲實時計算平台叫做 Streamfly,這個名字取名自電影《馴龍高手》中的 Stormfly。由于我們已經在從 Storm 遷移到 Flink,是以将 Stormfly 中的 Storm 替換成了更為通用的 Stream。

Streamfly 前身是離線作業平台 Omega 下的名為 Lambda 的子系統,它負責了所有實時作業的排程,最開始開始支援 Storm 和 Spark Streaming,後來改為隻支援 Flink。在 2019 年的時候我們将 Lambda 獨立出來以此為基礎建立了 Streamfly 計算平台。随後,我們在 2019 年底開發并上線了第一個版本 Flink SQL 平台 StreamflySQL。這個版本基于模闆 jar 提供了基本 Flink SQL 的功能,但是使用者體驗還有待提升,是以我們在 2021 年年初從零開始重建立設了第二個版本的 StreamflySQL,而第二個版本是基于 SQL Gateway。

要了解這兩個版本的不同,我們需要先回顧下 Flink SQL 的基本工作流程。

網易遊戲 Flink SQL 平台化實踐

使用者送出的 SQL 首先會被 Parser 解析為邏輯執行計劃;邏輯執行計劃經過 Planner Optimizer 優化,會生成實體執行計劃;實體執行計劃再通過 Planner CodeGen 代碼生成,翻譯為 DataStream API 常見的 Transformation;最後 StreamGraphGenerator 會将這些 Transformation 轉換為 Flink 作業的最終表示 JobGraph 送出到 Flink 叢集。

上述一系列過程都發生在 TableEnvironment 裡面。取決于部署模式的不同,TableEnvironment 可能運作在 Flink Client 或者 JobManager 裡。Flink 現在支援 3 種叢集部署模式,包括 Application、 Per-Job 和 Session 模式。在 Application 模式下,TableEnvironment 會在 JobManager 端運作,而在其餘兩種模式下,TableEnvironment 都運作在 Client 端。不過這三種模式都有一個共同的特點,TableEnvironment 都是一次性的,會在送出 JobGraph 之後自動退出。

網易遊戲 Flink SQL 平台化實踐

為了更好地複用 TableEnvironment 提高效率和提供有狀态的操作,有的項目會将 TableEnvironment 放到一個新的獨立 Server 端程序裡面去運作,由此産生了一種新的架構,我們稱之為 Server 端 SQL 編譯。相對地,還有 Client 端 SQL 編譯。

有同學可能會問,為什麼沒有 JobManager 端 SQL 編譯,這是因為 JobManager 是相對封閉的元件,不适合拓展,而且即使做了達到的效果跟 Client 端編譯效果基本一樣。是以總體來看,一般就有 Client 和 Server 兩種常見的 Flink SQL 平台架構。

Client 端 SQL 編譯,顧名思義就是 SQL 的解析翻譯優化都在 Client 端裡進行(這裡的 Client 是廣義的 Client,并不一定是 Flink Client)。典型的案例就是通用模闆 jar 和 Flink 的 SQL Client。這種架構的優點是開箱即用,開發成本低,而且使用的是 Flink public 的 API,版本更新比較容易;缺點是難以支援進階的功能,而且每次都要先啟動一個比較重的 TableEnvironment 是以性能比較差。

然後是 Server 端 SQL 編輯。這種架構将 SQL 解析翻譯優化邏輯放到一個獨立的 Server 程序去進行,讓 Client 變得非常輕,比較接近于傳統資料庫的架構。典型的案例是 Ververica 的 SQL Gateway。這種架構的優點是可拓展性好,可以支援很多定制化功能,而且性能好;缺點則是現在開源界沒有成熟的解決方案,像上面提到 SQL Gateway 隻是一個比較初期的原型系統,缺乏很多企業級特性,如果用到生産環境需要經過一定的改造,而且這些改造涉及比較多 Flink 内部 API,需要比較多 Flink 的背景知識,總體來說開發成本比較高,而且後續版本更新工作量也比較大。

編者按:Apache Flink 社群目前正在開發 SQL Gateway 元件,将原生提供 Flink SQL 服務化的能力,并相容 HiveServer2 協定,計劃于 1.16 版本中釋出,敬請期待。感興趣的同學可以關注 FLIP-91 ​​1​​​ 和 FLIP-223 ​​2​​

回到我們 Flink SQL 平台,我們 StreamflySQL v1 是基于 Client 端 SQL 編譯,而 v2 是基于 Server 端的 SQL 編譯。下面就讓我逐個介紹一下。

二、基于模闆 jar 的 StreamflySQL v1

StreamflySQL v1 選擇 Client 端 SQL 編譯的主要原因有三個:

網易遊戲 Flink SQL 平台化實踐
  • 首先是平台內建。不同于很多公司的作業排程器用大資料中比較主流的 Java 編寫,我們的 Lambda 排程器是用 Go 開發的。這是因為 Lambda 在設計之初支援了多種實時計算架構,出于松耦合和公司技術棧的考慮,Lambda 以 Go 作為開發語言,會采用與 YARN 類似的動态生成 Shell 腳本的方式來調用不同架構的指令行接口。這樣松耦合的接口方式給我們帶來很大的靈活性,比如我們可以輕松支援多個版本的 Flink,不需要強制使用者随着系統版本更新,但同時也導緻沒辦法直接去調用 Flink 原生的 Java API。
  • 第二個原因是松耦合。開發的時候 Flink 版本是1.9,當時 Client API 比較複雜,不太适合平台內建,并且當時社群也在推動 Client 的重構,是以我們盡量避免依賴 Client API去開發 Flink SQL 平台。
  • 第三個原因是實踐經驗。因為模闆 jar + 配置中心模式在網易遊戲内部已經有了比較多的應用,是以我們在這方面積累了很多實踐經驗。綜合之下我們很自然地采用了模闆 jar + 配置中心的架構來實作 v1 版本。
網易遊戲 Flink SQL 平台化實踐

上圖是 v1 版本的整體架構圖。我們在主要在 Lambda 作業平台的基礎上新增了 StreamflySQL 後端作為配置中心,負責根據使用者送出的 SQL 和作業運作配置加上通用的模闆 jar 來生成一個 Lambda 作業。

總體的作業送出流程如下:

  1. 使用者在前端的 SQL 編輯器送出 SQL 和運作配置。
  2. StreamflySQL 後端收到請求後生成一個 Lambda 作業并傳遞配置 ID。
  3. 然後 Lambda 啟動作業,背後是執行 Flink CLI run 指令來送出作業。
  4. Flink CLI run 指令會啟動 Flink Client 來加載并執行模版 jar 的 main 函數,這時會讀取 SQL 和配置,并初始化 TableEnvironment。
  5. TableEnvironment 會從 Catalog 讀取必要的 Database/Table 等元資訊。這裡順帶一提是,在網易遊戲我們沒有使用統一的 Catalog 來維護不同元件的元資訊,而是不同元件有自己的中繼資料中心,對應不同的 Catalog。
  6. 最後 TableEnvironment 編譯好 JobGraph,以 Per-Job Cluster 的方式部署作業。

StreamflySQL v1 實作了 Flink SQL 平台從零到一的建設,滿足了部分業務需求,但仍有不少痛點。

第一個痛點是響應慢。

網易遊戲 Flink SQL 平台化實踐

以一個比較典型的 SQL 來說,以模闆 jar 的方式啟動作業需要準備 TableEnviroment,這可能會花費 5 秒鐘,然後執行 SQL 的編譯優化包括與 Catalog 互動去擷取中繼資料,也可能會花費 5 秒鐘;編譯得到jobgraph之後還需要準備 per-job cluster,一般來說也會花費 20 秒以上;最後還需要等待 Flink job的排程,也就是作業從 scheduled 變成 running 的狀态,這個可能也需要 10 秒鐘。

總體來說,v1 版本啟動一個 Flink SQL 作業至少需要 40 秒的時間,這樣的耗時相對來說是比較長的。但是仔細分析這些步驟,隻有 SQL的編譯優化和 job 排程是不可避免的,其他的比如 TableEnvironment 和 Flink cluster 其實都可以提前準備,這裡的慢就慢在資源是懶初始化的,而且幾乎沒有複用。

第二個痛點是調試難。

網易遊戲 Flink SQL 平台化實踐

我們對 SQL 調試的需求有以下幾點:

  • 第一點是調試的 SQL 與線上的 SQL 要基本一緻。
  • 第二點是調試 SQL 不能對線上的資料産生影響,它可以去讀線上的資料,但不能去寫。
  • 第三點,因為調試的 SQL 通常隻需要抽取少量的資料樣本就可以驗證 SQL 的正确性,是以我們希望限制調試 SQL 的資源,一方面是出于成本的考慮,另外一方面也是為了防止調試的 SQL 與線上作業産生資源競争。
  • 第四點,因為調試 SQL 處理的資料量比較少,我們希望以更快更便捷的方式擷取到結果。

在 v1 版本中,我們對上述需求設計了如下解決方案:

  1. 首先對于調試的 SQL,系統會在 SQL 翻譯的時候将原來的一個 Sink 替換為專用的 PrintSink,這解決了需求中的前兩點。
  2. 然後對 PrintSink 進行限流,通過 Flink 的反壓機制達到總體的限流,并且會限制作業的最長執行時間,逾時之後系統會自動把作業結束掉,這解決了需求中的資源限制這點。
  3. 最後為了更快地響應,調試的作業并不會送出到 YARN 叢集上去運作,而是會在 Lamdba 伺服器本地開啟開啟一個 MiniCluster 去執行,同時也友善我們從标準輸出去提取 PrintSink 的結果,這點解決了需求中的最後一點。
網易遊戲 Flink SQL 平台化實踐

調試模式的架構如上圖所示,比起一般的 SQL 送出流程,主要差別在于作業不會送出到 YARN 上,而是在 Lambda 伺服器的本地執行,進而節省了準備 Flink 叢集的開銷,并且更容易管控資源和擷取結果。

上述調試解決方案基本可用,但是實際使用過程中依然存在不少問題。

  • 第一,如果使用者送出的 SQL 比較複雜,那麼 SQL 的編譯優化可能會耗費比較久的時間,這會導緻作業很容易逾時,在有結果輸出之前可能就被系統結束掉,同時這樣的 SQL 也會給伺服器造成比較大的壓力。
  • 第二,該架構沒法去調試時間視窗比較長的作業或者需要 Bootstrap State 的作業。
  • 第三,因為執行結果是在作業結束之後才批量傳回的,不是在作業執行過程中就流式傳回,是以使用者需要等到作業結束——通常是 10 分鐘以上才可以看到結果。
  • 第四,在 SQL 的翻譯階段把調試 SQL 的 Sink 替換掉,這個功能是通過改造 Flink 的 Planner 來實作的,相當于業務邏輯入侵到了 Planner 裡面,這樣并不優雅。

第三個痛點是 v1 版本隻允許單條 DML。

網易遊戲 Flink SQL 平台化實踐

相比傳統的資料庫,我們支援的 SQL 語句是很有限的,比如,MySQL 的 SQL 可以分成 DML、DQL、DDL 和 DCL。

  • DML 用于操控資料,常見的語句有 INSERT / UPDATE / DELETE。StreamflySQL v1 隻支援了 INSERT,這和 Flink SQL 是保持一緻的。Flink SQL 用 Retract 模式 — 也就是類似 Changelog 的方式 — 來表示 UPDATE/DELETE,是以隻支援 INSERT,這點其實沒有問題。
  • DQL 用于查詢資料,常見語句是 SELECT。這在 Flink SQL 是支援的,但因為缺乏 Sink 不能生成一個有意義的 Flink 作業,是以 StreamflySQL v1 不支援 DQL。
  • DDL 用于定義中繼資料,常見語句是 CREATE / ALTER /DROP 等。這在 StreamflySQL v1 版本是不支援的,因為模闆 jar 調用 SQL 的入口是 sqlUpdate,不支援純中繼資料的操作,而且為純中繼資料的操作單獨啟動一個 TableEnvironment 來執行也是完全不劃算。
  • 最後是 DCL,用于管理資料權限,比如 GRANT 跟 REVOKE 語句。這個 Flink SQL 是不支援的,原因是 Flink 目前隻是資料的使用者而不是管理者,DCL 并沒有意義。

綜合來看,v1 版本隻支援了單條 DML,這讓我們很漂亮的 SQL 編輯器變得空有其表。基于以上這些痛點,我們在今年調研并開發了 StreamflySQL v2。v2 采用的是 Server 端 SQL 編譯的架構。

三、基于 SQL Gateway 的 StreamflySQL v2

網易遊戲 Flink SQL 平台化實踐

我們的核心需求是解決 v1 版本的幾個痛點,包括改善使用者體驗和提供更完整的 SQL 支援。總體的思路是采用 Server 端的 SQL 編譯的架構,提高可拓展性和性能。此外,我們的叢集部署模式也改成 Session Cluster,預先準備好叢集資源,省去啟動 YARN application 的時間。

這裡會有兩個關鍵問題。

  • 首先是我們要完全自研還是基于開源項目?在調研期間我們發現 Ververica 的 SQL Gateway 項目很符合我們需求,容易拓展而且是 Flink 社群 FLIP-91 SQL Gateway 的一個基礎實作,後續也容易與社群的發展方向融合。
  • 第二個問題是,SQL Gateway 本身有送出作業的能力,這點跟我們已有的 Lambda 平台是重合的,會造成重複建設和難以統一管理的問題,比如認證授權、資源管理、監控告警等都會有兩個入口。那麼兩者應當如何進行分工?我們最終的解決方案是,利用 Session Cluster 的兩階段排程,即資源初始化和作業執行是分離的,是以我們可以讓 Lambda 負責 Session Cluster 的管理,而 StreamflySQL 負責 SQL 作業的管理,這樣能複用 Lambda 大部分的基礎能力。
網易遊戲 Flink SQL 平台化實踐

這是 StreamflySQL v2 的架構圖。我們将 SQL Gateway 内嵌到 SpringBoot 應用中,開發了新的後端。總體看起來比 v1 版本要複雜,原因是原本的一級排程變成了會話和作業的兩級排程。

首先使用者需要建立一個 SQL 會話,StreamflySQL 後端會生成一個會話作業。在 Lambda 看來會話作業是一種特殊作業,啟動時會使用 yarn-session 的腳本來啟動一個 Flink Session Cluster。在 Session Cluster 初始化之後,使用者就可以在會話内去送出 SQL。StreamflySQL 後端會給每個會話開啟一個 TableEnvironment,負責執行 SQL 語句。如果是隻涉及中繼資料的 SQL,會直接調用 Catalog 接口完成,如果是作業類型的 SQL,會編譯成 JobGraph 送出到 Session Cluster 去執行。

網易遊戲 Flink SQL 平台化實踐

v2 版本很大程度上解決了 v1 版本的幾個痛點:

  • 在響應時間方面,v1 常常會需要 1 分鐘左右,而 v2 版本通常在 10 秒内完成。
  • 在調試預覽方面,v2 不需要等作業結束,而是在作業運作時,将結果通過 socket 流式地傳回。這點是依賴了 SQL gateway 比較巧妙的設計。對于 select 語句,SQL Gateway 會自動注冊一個基于 socket 的臨時表,并将 select 結果寫入到這個表。
  • 在 SQL 支援方面,v1 隻支援 DML,而 v2 借助于 SQL Gateway 可以支援 DML/DQL/DDL。

不過 SQL Gateway 雖然有不錯的核心功能,但我們使用起來并不是一帆風順,也遇到一些挑戰。

首先最為重要的是中繼資料的持久化。

網易遊戲 Flink SQL 平台化實踐

SQL Gateway 本身的中繼資料隻儲存在記憶體中,如果程序重新開機或是遇到異常崩潰,就會導緻中繼資料丢失,這在企業的生産環境裡面是不可接受的。是以我們将 SQL Gateway 內建到 SpringBoot 程式之後,很自然地就将中繼資料儲存到了資料庫。

中繼資料主要是會話中繼資料,包括會話的 Catalog、Function、Table 和作業等等。這些中繼資料按照作用範圍可以分為 4 層。底下的兩層是全局的配置,以配置檔案的形式存在;上面兩層是運作時動态生成的中繼資料,存在資料庫中。上層的配置項優先級更高,可以用于覆寫下層的配置。

我們從下往上看這些中繼資料:

  • 最底層是全局的預設 Flink Configuration,也就是我們在 Flink Home 下的 flink-conf yaml 配置。
  • 再上面一層是 Gateway 自身的配置,比如部署模式(比如是 YARN 還是 K8S),比如預設要出冊的 Catalog 和 Function 等等。
  • 第三層是 Session 會話級别的 Session Configuraion,比如會話對應的 Session Cluster 的叢集 ID 或者 TaskManager 的資源配置等等。
  • 最上面一層是 Job 級别的配置,包括作業動态生成的中繼資料,比如作業 ID、使用者設定 checkpoint 周期等等。

這樣比較靈活的設計除了解決了中繼資料持久化的問題,也為我們的多租戶特性奠定了基礎。

第二個挑戰是多租戶。

網易遊戲 Flink SQL 平台化實踐

多租戶分為資源和認證兩個方面:

  • 在資源方面,StreamflySQL 利用 Lambda 作業平台可以在不同的隊列啟動 Session Cluster,它們的 Master 節點和資源很自然就是隔離的,是以沒有像 Spark Thrift Server 那樣不同使用者共用一個 Master 節點和混用資源的問題。
  • 在認證方面,因為 Session Cluster 屬于不同使用者,是以 StreamflySQL 後端需要實作多租戶的僞裝。在網易遊戲,元件一般會使用 Kerberos 認證。我們采用多租戶實作的方式是使用 Hadoop 的 Proxy User,先登入為超級使用者,然後僞裝成項目使用者來向不同元件擷取 delegation token,這裡的元件主要是 Hive MetaStore 跟 HDFS,最後把這些 token 存到 UGI 裡面并用 doAS 的方式來送出作業。

第三個挑戰是水準拓展。

網易遊戲 Flink SQL 平台化實踐

為了高可用和拓展服務能力,StreamflySQL 很自然需要以多執行個體的架構部署。因為我們已經将主要的狀态中繼資料存到資料庫,我們可以随時從資料庫建構出一個新的 TableEnvironment,是以 StreamflySQL 執行個體類似普通 Web 服務一樣非常輕,可以很容易地擴容縮容。

但是并不是所有狀态都可以持久化的,另外有些狀态我們故意會不持久化。比如使用者使用 SET 指令來改變 TableEnvironment 的屬性,比如開啟 Table Hints,這些屬于臨時屬性,會在重建 TableEnvironment 後被重置。這是符合預期的。再比如使用者送出 select 查詢做調試預覽時,TaskManager 會與 StreamflySQL 後端建立 socket 連結,而 socket 連結顯然也是不可持久化的。是以我們在 StreamflySQL 的多執行個體前加了親和性的負載均衡,按照 Session ID 來排程流量,讓在正常情況下同一個使用者的請求都落到同一個執行個體上,確定使用者使用體驗的連續性。

第四個挑戰是作業狀态管理。

網易遊戲 Flink SQL 平台化實踐
  • 第一個含義是作業的運作狀态。SQL gateway 目前隻是送出 SQL 并不監控後續的運作狀态。是以,StreamflySQL 設定了監控線程池來定時輪詢并更新作業狀态。因為 StreamflySQL 是多執行個體的,它們的監控線程同時操作同一個作業的話,可能會有更新丢失的問題,是以我們這裡使用了 CAS 樂觀鎖來保證過時的更新不會生效。然後我們會在作業異常退出或者無法擷取狀态時進行告警,比如 JobManager 進行 failover 的情況下,我們無法得知 Flink 作業的狀态,這時系統就會發出 disconnected 的異常狀态告警。
  • 第二個含義是 Flink 的持久化狀态,即 Flink State。原生的 SQL gateway 并沒有管理 Flink 的 Savepoint 和 Checkpoint,是以我們加上了 stop 和 stop-with-savepoint 的功能,并強制開啟 retained checkpoint。這使得在作業遇到異常終止或者簡單 stop 之後,再次重新開機時系統可以自動查找到最新的 checkpoint。

四、未來工作

  • 未來我們首先要解決的一個問題是 State 遷移的問題,即使用者對 SQL 進行變更後,如何從原先的 Savepoint 進行恢複。目前隻能通過變更類型來告知使用者風險,比如通常而言加減字段不會造成 Savepoint 的不相容,但如果新增一個 join 表,造成的影響就很難說了。是以後續我們計劃通過分析 SQL 變更前後的執行計劃,來預先告知使用者變更前後的狀态相容性。
  • 第二個問題是細粒度的資源管理。目前我們并不能在作業編譯時去指定 SQL 的資源,比如 TaskManager 的 CPU 和記憶體在 Session Cluster 啟動之後就确定了,是會話級别的。目前調整資源隻能通過作業并行度調整,很不靈活并且容易造成浪費。現在 Flink 1.14 已經支援了 DataStream API 的細粒度資源管理,可以在算子級别設定資源,但 SQL API 現在還沒有計劃,後續我們可能參與進去推動相關議案的進展。
  • 最後是社群貢獻。我們對 SQL Gateway 有一定使用經驗,而且也對其進行了不少的改進,後續希望這些改進能回饋給 Flink 社群,推動 FLIP-91 SQL Gateway 的進展。