天天看點

Flink 在有贊的實踐和應用

摘要:今天主要分享的内容是 Flink 在有贊的實踐和應用。内容包括:

  1. Flink 的容器化改造和實踐
  2. Flink SQL 的實踐和應用
  3. 未來規劃

T

一、Flink 的容器化改造和實踐

1. 有贊的叢集演進曆史

  • 2014 年 7 月,第一個 Storm 任務正式上線;
  • 2016 年,引入 Spark Streaming, 運作在 Hadoop Yarn;
  • 2018 年,引入了 Flink,作業模式為 Flink on Yarn Per Job;
  • 2020 年 6 月,實作了 100% Flink Jar 任務 K8s 化, K8s 作為 Flink Jar 預設計算資源,Flink SQL 任務 On Yarn,Flink 統一實時開發;
  • 2020 年 11 月,Storm 叢集正式下線。原先的 storm 任務全部都遷移到了 Flink;
  • 2021 年,我們打算把所有的 Flink 任務 K8s 化。
Flink 在有贊的實踐和應用

2. Flink 在内部支援的業務場景

Flink 支援的業務場景有風控,埋點的實時任務,支付,算法實時特征處理,BI 的實時看闆,以及實時監控等等。目前的實時任務規模有 500+。

Flink 在有贊的實踐和應用

3. 有贊在 Flink on Yarn 的痛點

主要有三部分:

  • 第一,CPU 沒有隔離。Flink On Yarn 模式,CPU 沒有隔離,某個實時任務造成某台機器 CPU 使用過高時, 會對該機器其他實時任務造成影響;
  • 第二,大促擴縮容成本高。Yarn 和 HDFS 服務使用實體機,實體機在大促期間擴縮容不靈活,同時需要投入一定的人力和物力;
  • 第三,需要投入人力運維。公司底層應用資源統一為 K8S,單獨再對 Yarn 叢集運維,會再多一類叢集的人力運維成本。
Flink 在有贊的實踐和應用

4. Flink on k8s 相對于 Yarn 的優勢

可以歸納為 4 點:

  • 第一,統一運維。公司統一化運維,有專門的部門運維 K8S;
  • 第二,CPU 隔離。K8S Pod 之間 CPU 隔離,實時任務不互相影響,更加穩定;
  • 第三,存儲計算分離。Flink 計算資源和狀态存儲分離,計算資源能夠和其他元件資源進行 混部,提升機器使用率;
  • 第四,彈性擴縮容。大促期間能夠彈性擴縮容,更好的節省人力和物力成本。
Flink 在有贊的實踐和應用

5. 實時叢集的部署情況

總體上分為三層。第一層是存儲層;第二層是實時計算資源層;第三層是實時計算引擎層。

  • 存儲層主要分為兩部分:
    • 第一個就是雲盤,它主要存儲 Flink 任務本地的狀态,以及 Flink 任務的日志;
    • 第二部分是實時計算 HDFS 叢集,它主要存儲 Flink 任務的遠端狀态。
  • 第二層是實時計算的資源層,分為兩部分:
    • 一個是 Hadoop Yarn 叢集;
    • 另一個是 Flink k8s 叢集,再往下細分,會有 Flink k8s 和離線的 HDFS 混部叢集的資源,還有 Flink k8s 單獨類型的叢集資源。
  • 最上層有一些實時 Flink Jar,spark streaming 任務,以及 Flink SQL 任務。

我們考慮混部的原因是,離線 HDFS 叢集白天機器使用率不高。把離線 HDFS 叢集計算資源給實時任務,離線使用内部其他元件的彈性計算資源,進而提升機器使用率,更好的達到降本效果。

Flink 在有贊的實踐和應用

6. Flink on k8s 的容器化流程

如下圖所示:

  • 第一步,實時平台的 Flink Jar 任務送出,Flink Jar 任務版本管理,Docker Flink 任務鏡像建構,上傳鏡像到 Docker 鏡像倉庫;
  • 第二步,任務啟動;
  • 第三步,yaml 檔案建立;
  • 第四步,和 k8s Api Server 之間進行指令互動;
  • 第五步,從 Docker 鏡像倉庫拉取 Flink 任務鏡像到 Flink k8s 叢集;
  • 最後,任務運作。這邊有幾個 tips:
    • 作業模式為 Flink Standalone Per Job 模式;
    • 每個 Flink Jar 任務一個鏡像,通過任務名稱 + 時間截作為鏡像的版本;
    • JobManager 需要建立為 Deployment 而不是 Job 類型;
    • Dockerfile 指定 HADOOP_USER_NAME,與線上任務保持一緻。
Flink 在有贊的實踐和應用

7. 在 Flink on k8s 的一些實踐

    • 第一個實踐是解決資源少配任務無法啟動這個問題。

      先來描述一下問題,Flink on k8s 非雲原生,無法做到實時任務資源按需申請。當使用者在平台配置的資源少于實時任務真實使用的資源時(比如使用者代碼寫死并發度,但使用者配置的并發度小于該值),會出現實時任務無法啟動的問題。針對這個問題,我們内部增加了一種 Flink Jar 任務并發度的自動檢測機制。它的主要流程如下圖所示。

      首先,使用者會在我們平台去送出 Flink Jar 作業,當他送出完成之後,在背景會把 Jar 作業以及運作參數,建構 PackagedProgram。通過 PackagedProgram 擷取到任務的預執行計劃。再通過它擷取到任務真實的并發度。

    • 如果使用者在代碼裡配置的并發度小于平台端配置的資源,我們會使用在平台端的配置去申請資源,然後進行啟動;
    • 反之,我們會使用它真實的任務并發度去申請資源,啟動任務。
Flink 在有贊的實踐和應用
    • 第二個實踐是 Flink on k8s 任務的資源分析工具。

      首先來說一下背景,Flink k8s 任務資源是使用者自行配置,當配置的并發度或者記憶體過大時,存在計算資源浪費的問題,進而會增加底層機器成本。怎麼樣去解決這個問題,我們做了一個平台管理者的工具。對于管理者來說,他可以從兩種視角去看這個任務的資源是否進行了一個超配:

      • 第一個是任務記憶體的視角。我們根據任務的 GC 日志,通過一個開源工具 GC Viewer,拿到這一個實時任務的記憶體使用名額;
      • 第二個是消息處理能力的視角。我們在 Flink 源碼層增加了資料源輸入 record/s 和任務消息處理時間 Metric。根據 metric 找到消息處理最慢的 task 或者 operator,進而判斷并發度配置是否合理。

管理者根據記憶體分析名額以及并發度合理性,結合優化規則,預設定 Flink 資源。然後我們會和業務方溝通與調整。右圖是兩種分析結果,上面是 Flink on K8S pod 記憶體分析結果。下面是 Flink K8S 任務處理能力的分析結果。最終,我們根據這些名額就可以對任務進行一個資源的重新調整,降低資源浪費。目前我們打算把它做成一個自動化的分析調整工具。

Flink 在有贊的實踐和應用
  • 接下來是 Flink on K8s 其他的相關實踐:
    • 第一,基于 Ingress Flink Web UI 和 Rest API 的使用。每個任務有一個 Ingress 域名,始終通過域名通路 Flink Web UI 以及 Resti API 使用;
    • 第二,挂載多個 hostpath volume,解決單塊雲盤 IO 限制。單塊雲盤的寫入帶寬以及 IO 能力有瓶頸,使用多塊雲盤,降低雲盤 Checkpoint 狀态和本地寫入的壓力;
    • 第三,Flink 相關通用配置 ConfigMap 化、Flink 鏡像上傳成功的檢測。為 Filebeat、Flink 作業通用配置,建立 configmap,然後挂載到實時任務中,確定每個 Flink 任務鏡像都成功上傳到鏡像倉庫;
    • 第四,HDFS 磁盤 SSD 以及基于 Filebeat 日志采集。SSD 磁盤主要是為了降低磁盤的 IO Wait 時 間,調整 dfs.block.invalidate.limit,降低 HDFS Pending delete block 數。任務日志使用 Filebeat 采集,輸出到 kafka,後面通過自定義 LogServer 和離線公用 LogServer 檢視。
Flink 在有贊的實踐和應用

8. Flink on K8s 目前面臨的痛點

  • 第一,JobManager HA 問題。JobManager Pod 如果挂掉,借助于 k8s Deployment 能力,JobManager 會根據 yaml 檔案重新開機,狀态可能會丢失。而如果 yaml 配置 Savepoint 恢複,則消息可能大量重複。我們希望後續借助于 ZK 或者 etcd 支援 Jobmanager HA;
  • 第二,修改代碼,再次上傳時間久。一旦代碼修改邏輯,Flink Jar 任務上傳時間加上打鏡像時間可能是分鐘級别,對實時性要求比較高的業務或許有影響。我們希望後續可以參考社群的實作方式,從 HDFS 上面拉取任務 Jar 運作;
  • 第三,K8S Node Down 機, JobManager 恢複慢。一旦 K8S Node down 機後, Jobmanager Pod 恢複運作需要 8分鐘左右,主要是 k8s 内部異常發現時間以及作業啟動時間,對部分業務有影響,比如CPS實時任務。如何解決,平台端定時檢測 K8s node 狀态,一旦檢測到 down 機狀态,将 node 上面有 JobManager 所屬的任務停止掉,然後從其之前 checkpoint 恢複;
  • 第四,Flink on k8s 非雲原生。目前通過 Flink Jar 任務并發度自動檢測工具解決資源少配無法啟動問題,但是如果任務的預執行計劃無法擷取,就無法擷取到代碼配置的并發度。我們的思考是:Flink on k8s 雲原生功能以及前面的 1、2 問題,如果社群支援的比較快速的話,後面可能會考慮将 Flink 版本與社群版本對齊。
Flink 在有贊的實踐和應用

9. Flink on K8s的一些方案推薦

    • 第一種方案,是平台自己去建構和管理任務的鏡像。
      • 優點是:平台方對于建構鏡像,以及運作實時任務整體流程自我掌控,具體問題能夠及時修正。
      • 缺點是:需要對 Docker 以及 K8S 相關技術要有一定了解,門檻使用比較高,同時需要考慮非雲原生相關問題。它的适用版本為 Flink 1.6 以上。
    • 第二種方案,Flink k8s Operator。
      • 優點是:對使用者整體封裝了很多底層細節,使用門檻相對降低一些。
      • 缺點是:整體使用沒有第一種方案那麼靈活,一旦有問題,由于底層使用的是其封裝的功能,底層不好修改。它的适用版本為Flink 1.7 以上。
    • 最後一種方案是,基于社群 Flink K8s 功能。
    • 優點是:雲原生,對于資源的申請方面更加友好。同時,使用者使用會更加友善,屏蔽很多底層實作。
    • 缺點是:K8s 雲原生功能還是實驗中的功能,相關功能還在開發中,比如 k8s Per job 模式。它的适用版本為Flink 1.10 以上。
Flink 在有贊的實踐和應用

二、Flink SQL 實踐和應用

1. 有贊 Flink SQL 的發展曆程

  • 2019 年 9 月,我們對 Flink 1.9 、1.10 SQL 方面的能力進行研究和嘗試,同時增強了一些 Flink SQL 功能。
  • 2019 年 10 月,我們進行了 SQL 功能驗證,基于埋點實時需求,驗證 Flink SQL Hbase 維表關聯功能,結果符合預期。
  • 2020 年 2 月,我們對 SQL 的功能進行了擴充,以 Flink 1.10 作為 SQL 計算引擎,進行 Flink SQL 功能擴充開發和優化,實時平台支援全 SQL 化開發。
  • 2020 年 4 月,開始支援實時數倉、有贊教育、美業、零售等相關實時需求。
  • 2020 年 8 月,新版的實時平台才開始正式上線,目前主推 Flink SQL 開發我們的實時任務。
Flink 在有贊的實踐和應用

2. 在 Flink SQL 方面的一些實踐

主要分為三個方面:

  • 第一,Flink Connector 的實踐包括:Flink SQL 支援 Flink NSQ Connector、Flink SQL 支援 Flink HA Hbase Sink 和維表、Flink SQL 支援無密 Mysql Connector、Flink SQL 支援标準輸出(社群已經支援)、Flink SQL 支援 Clickhouse Sink;
  • 第二,平台層的實踐包括:Flink SQL 支援 UDF 以及 UDF 管理、支援任務從 Checkpoint 恢複、支援幂等函數、支援 Json 相關函數等、支援 Flink 運作相關參數配置,比如狀态時間設定,聚合優化參數等等、Flink 實時任務血緣資料自動化采集、Flink 文法正确性檢測功能;
  • 第三,Flink Runtime的實踐包括:Flink 源碼增加單個Task 以及 Operator 單條記錄處理時間名額;修複 Flink SQL 可撤回流 TOP N 的BUG。
Flink 在有贊的實踐和應用

3. 業務實踐

    • 第一個實踐是我們内部的客服機器人實時看闆。流程分為三層:
      • 第一層是實時資料源,首先是線上的 MySQL 業務表,我們會把它的 Binlog 通過 DTS 服務同步到相應的 Kafka Topic;
      • 實時任務的 ODS 層有三個 Kafka Topic;
      • 在實時 DWD 層,有兩個 Flink SQL 任務:
        • Flink SQL A 消費兩個 topic,然後把這兩個 topic 裡面的資料去通過 Interval Join,根據一些視窗的作用關聯到對應的資料。同時,會對這個實時任務設定狀态的保留時間。Join 之後,會去進行一些 ETL 的加工處理,最終會把它的資料輸入到一個 topic C。
        • 另外一個實時任務 Flink SQL B 消費一個 topic,然後會對 topic 裡面的資料進行清洗,然後到 HBase 裡面去進行一個維表的關聯,去關聯它所需要的一些額外的資料,關聯的資料最終會輸入到 topic D。

在上遊,Druid 會消費這兩個 topic 的資料,去進行一些名額的查詢,最終提供給業務方使用。

Flink 在有贊的實踐和應用
  • 第二個實踐是實時使用者行為中間層。使用者在我們平台上面會去搜尋、浏覽、加入購物車等等,都會産生相應的事件。原先的方案是基于離線來做的。我們會把資料落庫到 Hive 表,然後算法那邊的同學會結合使用者特征、機器學習的模型、離線的資料去生成一些使用者評分預估,再把它輸入到 HBase。

    在這樣的背景下面,會有如下訴求:目前的使用者評分主要是基于離線任務,而算法同學希望結合實時的使用者特征,更加及時、準确的提高推薦精準度。這其實就需要建構一個實時的使用者行為中間層,把使用者産生的事件輸入到 Kafka 裡面,通過 Flink SQL 作業對這些資料進行處理,然後把相應的結果輸出到 HBase 裡面。算法的同學再結合算法模型,實時的更新模型裡面的一些參數,最終實時的進行使用者的評分預估,也會落庫到 HBase,然後到線上使用。

Flink 在有贊的實踐和應用

        使用者行為中間層的建構流程分為三個步驟:

    • 第一層,我們的資料源在 Kafka 裡面;
    • 第二層是 ODS 層,在 Flink SQL 作業裡面會有一些流表的定義,一些 ETL 邏輯的處理。然後去定義相關的 sink 表、維表等等。這裡面也會有一些聚合的操作,然後輸入到 Kafka;
    • 在 DWS 層,同樣有使用者的 Flink SQL 作業,會涉及到使用者自己的 UDF Jar,多流 Join,UDF 的使用。然後去讀取 ODS 層的一些資料,落庫到 HBase 裡面,最終給算法團隊使用。

這裡有幾個實踐經驗:

  • 第一,Kafka Topic、Flink 任務名稱,Flink SQL Table 名稱,按照數倉命名規範。
  • 第二,名額聚合類計算,Flink SQL 任務要設定空閑狀态保留時間,防止任務狀态無限增大。
  • 第三,如果存在資料傾斜或者讀狀态壓力較大等情況,需要配置 Flink SQL 優化參數。
Flink 在有贊的實踐和應用

4. 在 HAHBase Connector 的實踐

社群 HBase Connector 資料關聯或者寫入是單 HBase 叢集使用,當 HBase 叢集不可用時,實時任務資料的寫入或者關聯會受到影響,進而可能會影響到業務使用。至于怎麼樣去解決這個問題。首先,在 HBase 方面有兩個叢集,主叢集和備叢集。它們之間通過 WAL 進行主從的複制。Flink SQL 作業先寫入主叢集,當主叢集不可用的時候,自動降級到備叢集,不會影響到線上業務的使用。

Flink 在有贊的實踐和應用

5. 無密 Mysql Connector 和名額擴充實踐

左圖是 Flink 無密 Mysql Sink 文法,解決的問題包括三點:

  • 第一,Mysql 資料庫使用者名和密碼不以明文方式向外進行暴露和存儲;
  • 第二,支援 Mysql 使用者名和密碼周期性更新;
  • 第三,内部自動根據使用者名鑒定表權限使用。這樣做最主要的目的還是保證明時任務資料庫使用更安全。

然後是左下圖,我們在 Flink 源碼層面增加 Task 和 Operator 單條消息處理時間 Metric。目的是幫助業務方,根據消息處理時間的監控名額,排查和優化 Flink 實時任務。

Flink 在有贊的實踐和應用

6. Flink 任務血緣中繼資料自動化采集的實踐

Flink 任務血緣中繼資料采集的流程如下圖所示,平台啟動實時任務後,根據目前任務是 Flink Jar 任務,還是 Flink SQL 任務,分别走兩條不同的路徑,來擷取任務的血緣資料,再把血緣資料上報中繼資料系統。這樣做的價值有兩點:

  • 第一,幫助業務方了解實時任務加工鍊路。業務方能夠更清晰的認知實時任務之間的關系和影響,當操作任務時,能夠及時通知下遊其他業務方;
  • 第二,更好的建構實時數倉。結合實時任務血緣圖,提煉實時資料公共層,提升複用性,更好的建構實時數倉。
Flink 在有贊的實踐和應用

三、未來規劃

最後是未來的規劃,包括四點:

  • 第一,推廣 Flink 實時任務 SQL 化。推廣 Flink SQL 開發實時任務,提升 Flink SQL 任務比例。
  • 第二,Flink 任務計算資源自動優化配置。從記憶體、任務處理能力、輸入速率等,對任務資源進行分析,對資源配置不合理任務自動化配置,進而降低機器成本。
  • 第三,Flink SQL 任務 k8s 化以及 K8s 雲原生。Flink 底層計算資源統一為 k8s,降低運維成本,Flink k8s 雲原生,更合理使用 K8s 資源。
  • 第四,Flink 與資料湖以及 CDC 功能技術的調研。新技術的調研儲備,為未來其他實時需求奠定技術基礎。
Flink 在有贊的實踐和應用