摘要:本文由網易雲音樂資料智能部資深資料平台開發工程師蔣文偉分享,主要介紹 Flink SQL 在雲音樂的産品化實踐。分享内容如下:
- 簡介
- 産品功能
- 性能優化
- 運維完善
- 未來規劃
一、背景簡介
1.Flink in Music
先簡單的介紹下雲音樂的現狀,目前音樂這邊的用戶端日志,服務端日志大概在每日大千億條左右,次元表資料源像 Redis,MySQL 這些大概有上百個。而服務的實時計算任務開發的人員有上百名,其中不僅包擴資料開發工程師,分析師,也包括算法,背景業務等同學。這些同學們也累積開發了上千個實時計算任務,這些任務不僅有統計任務,還有些一線業務,比如排行榜,實時熱度等。

2.應用場景
這裡我稍微列舉了一些業務場景,比如我們的内容分發、實時數倉、算法推薦,還有索引任務、實時監控,AB test 等,幾乎涵蓋了整個資料鍊路中的大部分業務場景,可以說我們現在的業務已經離不開 Flink 的體系,那後面我們會以其中幾個場景為例,看看我們在這些場景中使用原生的 Flink 會遇到那些問題。
■ 内容分發
第一個介紹的場景是分發,分發是個非常典型的場景。它會根據一定條件對資料流進行劃分,把輸入資料流切分成多個子流。
一般情況下分發會是整個資料鍊路的上遊,是以相對來說這類任務非常重要,那麼我們在這個場景中會遇到什麼問題呢?
- 問題1:開發效率低,業務标準流程難以複用
首先是開發效率低下,這類業務邏輯非常簡單,核心開發工作其實就是個 where 篩選,但是傳統的開發方式需要使用者了解很多額外的東西,比如 HDFS 的定時清理功能,如果這個元件交由使用者開發,勢必要開放權限,那麼就可能會導緻 HDFS 倉庫檔案被誤删等安全事故,是以我們需要建立一套統一架構,且可以提供一系列标準化的元件,使用者僅需要關心其核心業務邏輯即可。
- 問題2:學習成本高
第二個問題是學習成本較高,SQL 是一種非常優秀的資料處理語言,很多同學也都會,但是 Flink SQL 的配置卻沒普通 SQL 那麼簡單。Flink SQL 要求使用者對每個元件的配置都非常熟悉,這是一個 HDFS 的 sink 操作,需要在 SQL 中配置輸出目錄,分區字段,檔案大小,keytab,壓縮格式等一系列的參數,而這些參數需要使用者通過文檔來學習。
針對這個,我們需要一種前端上的優化,通過給使用者可視化的提示,來簡化配置,降低學習成本。
- 問題3:外部環境混亂
第三,對一個穩定性,以及性能要求比較高的任務來說,所有的這些監控、報警的配套體系建設也都是必不可少的。
■ 特征快照
第二個例子,特征快照,先簡單的說下什麼是特征快照,特征快照簡單的可以了解成把特征的曆史版本進行存儲,為什麼要這麼做呢,因為特征是動态變化的,每個事件未必能保準順序到達,一個使用者先點喜歡 DJ,在播放歌曲,再點選喜歡了動漫,我們最終這次播放關聯的應該是 DJ 而不是動漫,但按照時間序可能就是錯的,是以,對一每個版本的特征和 tag,都會有其唯一的 traceid 來進行管理,也就是一個 traceid 一個特征版本,這塊在實時機器學習場景使用的非常廣泛。
這邊可以看到任務的流程圖,包括資料清洗,收集,抽樣,去重,join 等流程,這些流程也有很多業務邏輯,像 join 這個流程如果 join 不上怎麼辦,是放在記憶體裡等,還是再次回流到 kafka 等待下一輪比對,亦或是使用降級方案。相對來說,對穩定性、兜底方案等都有較多要求。
我們來看特征快照場景下會遇到哪些問題。
- 第一,開發成本較高,調試也比較複雜。上述提到我們有很多種子產品共同組成這個任務,如果通過傳統的日志列印,很難進行調試和維護。
- 第二,特殊的需求沒有辦法被快速的滿足。像抽樣功能完全取決于業務,比較難以抽象和複用。
其它的一些問題,包括調優問題、運維問題、監控的問題,我歸納為技術賦能。這些問題不能推給使用者來做,隻能由平台來進行統一的管理和維護。
綜上所述,我們的産品化目标也出來了,我們希望打造一套能降低使用者學習成本、運維成本,提升開發效率,并在全方位進行一個賦能的一站式的實時計算平台。也就是我們的主要介紹的産品,音樂的實時計算 Notebook 服務。
二、雲音樂的實時計算 Notebook 服務
1.NoteBook with block
我們的 notebook 服務是由多個 block 快組成的,每個 block 可以自由組合,比如 SQL 類型的塊之後加 2 個 sink 的元件,再加一個 source 元件,都可以。
同樣,每個 block 塊擁有子類型,也就是二級類型,比如一個 sink,會有一些類似 MySQL sink,Redis sink 的子類型,他們都以插件的形式進行加載,是以主類型可以被快速擴充,子類型也同樣可以被快速擴充,元件開發就變得非常友善。
而整個 notebook 執行的過程,也會按照順序由上至下的執行每一個 block,同時我們的 block 中除了 sink 之外都支援在頁面的 debug 服務。
詳細分享一下目前支援的 block 類型。
- 第一種就是 SQL block,支援 Flink SQL,當然我們也在中間做了一些優化。比如建立自己的 catalog 以及 function 管理中心,自定義的一些參數 set 文法等。
- 第二種是 custom block,我們會提供給使用者一套标準的 API,通過 API 就可以進行個性化的開發,類似一個有限條件下的 Flink 的 jar 包任務。
source 和 sink 類型,他們比純 SQL 類型有更多的優勢,比如同樣的功能在 SQL 中也可以實作,但是需要加很多 set config 配置。而現在,不僅 catalog DB 支援搜尋,并且有些配置也将更加直覺的被呈現。就像 HDFS sink,他的歸檔大小,序列化類型,字首,等等正常的配置就直接可以讓使用者進行選擇。
第二個優勢,我們可以通過可視化 block 來提供元件,比如 source 端的動态限流,可以開啟一個 source 的 slot 來做全局 config 的輪訓,比如 sink 有類似 HDFS 檔案過期清理或者小檔案合并的功能。類似的标準元件可以通過 block 塊更直覺的提供給使用者。
整個平台的頁面大概介紹到這裡,這裡是些小的歸納總結,良好的互動也就是跟簡單直覺的操作我們剛剛介紹了一些,良好的擴充,也就是我們的 block 的插件化,中繼資料中心以及監控配套等,我們先看如何用這套 notebook 來解決實際例子。
■ Snapshot 場景的應用
首先是 snapshot 場景的應用,我們可以看到這個場景中,鍊路上的 4 種任務分别通過不同的 block 類型實作。
- Clollect,通過 SQL 就能完成;
- ETL,會有一些業務上的抽樣方法和去重邏輯,是以會通過自定義 source 來實作;
- Snapshut join,除了 join 操作之外,可能會有業務降級方案等一系列非标需求,通過 SQL 無法完成,需要通過自定義的 transform 元件來實作 ;
- Extrect,通過可視化 sink來進行最後樣本檔案的落盤。
2.功能實作
說完前端功能,我們再來說下實作。
整套 block notebook 在執行的時候,都會找到對應類型的 Interpreter,每個 Interpreter 再通過子類型去發現真正的實作類。像第一個 source 指向 MySQL 的 blog 日志訂閱,最終會建立一個 MySQL 的 blog 日志訂閱 source 實作,每個 Interpreter 都會接收到所有上遊執行過的所有結果,這個結果統一以 table 的方式進行流轉,在沒有指定的情況下,預設使用離自己最近的一個 table 作為輸入。
■ Block Structure
這是 notebook 的資料結構,屬于 JobContext。作為一個全局的共享的内容,裡面包含着一個可執行環境和一些全局共享的配置。blockList 數組裡每一個都是加載一個 interpreter 具體事件類。如果業務上有需求,擴充一個 block 也非常簡單,隻要實作一個接口即可,其他的部分都交給架構來完成。
■ 送出執行
以上是 block 的内部執行邏輯,我們再來單獨講一下送出的服務。
首先,服務會判斷任務中的所用到的插件和依賴的檔案,然後通過一些機制來确定主程式版本,這麼做的原因很簡單,我們經常會做更新,而更新之後可能執行計劃可能會變動,有些使用者停止任務後再通過 checkpoint 可能就起不來了,通過這個服務來實作多版本就非常簡單了。我們還可以根據一些條件,比如任務是否是 checkpoint 啟動的等來判斷主程式版本。最後送出服務會通過一些邏輯對叢集和隊列的進行智能選擇。
■ 整體架構
通過以上對整體架構的分享,想必大家也就比較清晰了。上層會有一層 notebook 的 server 的服務,下層會有一個 submit 的送出服務和一個 debug server。外圍會有中繼資料管理中心來管理我們的 catalog,然後還有權限或者一些其他的外部系統來進行輔助。
整個體系最大的好處在于,使用者的代碼都在架構的範圍内進行執行,這樣的話我們可以快速的進行性能優化和功能調整。
三、性能優化
團隊對整個平台有非常多的性能優化,這裡隻能抛磚引玉的說幾點。
1.Table source 優化插件
第一個就是 Table source 的優化。生産過程中的遇到的性能問題,如:
- 原生的 KafkaTableSource 由于需要解析 Schema 無法将反序列化過程提取出來,而 Kafka 的并發受制于業務的 partition 數量,如果反序列化計算量要求較大,會造成性能瓶頸。
- 維表 Join 如果先進行 keyBy,可以提升緩存命中率。
我們的解決方案包括 3 個大的步驟。
1、動态代理方式擷取 byte 流。
2、将 byte 流進行二次處理:
- 動态的加載所需執行的插件。
- 配置設定插件到 Map 和 Filter 算子 中去(保證插件變動時 ckp 啟動)。
- 調整并發。
- 反序列化成目标 Row。
3、傳回 ProxyTableSource。這樣就可以無限的擴充功能。
這是一個優化執行個體:降低由于序列化計算量過大而導緻的 source 端性能瓶頸我們把有 5 個 kafka partition 的一個任務拆分成 5 個 source 跟 10 個 Deserialization 的并發,進而提升了整個任務的吞吐。
2.多 Sink 合并
第二個是多 sink 合并,這塊高版本已經有了,因為我們還在使用 1.10,是以才需要優化,這塊就不詳細說了。
3.流量降級方案
第三個是我們的流量降級方案,這塊優化涉及到一些體系的建設。一般來說,我們的分發任務會有 2 個 sink,會分發到主消息隊列和備份消息隊列,這 2 個隊列的 catalog 完全一緻,隻會通過 tag 進行區分,是以相關代碼開發也完全一緻,下遊讀取的時候,會根據任務的 tag 分析說我們讀主還是備。舉例,歸檔任務,測試任務,等會從備份流來讀取,這樣我們就對 kafka 叢集進行了壓力分流。
四、運維監控增強
介紹完性能優化,我們來聊下運維和監控。Flink 自己已經帶有不少監控,比如failover 次數、QPS、checkpoint 成功率,但是在生産過程中,這可能還不夠,是以會增加很多内置監控,并且對血緣進行收集。
此外我們做了 4 個智能診斷功能,包括記憶體診斷、性能診斷、checkpoint 診斷和日志診斷,進而來判斷我們的任務運作情況。
這是我們之前看過架構圖,紅色就是監控診斷部分,由一個統一的資料收集服務來擷取資訊,并提供給監控報警等服務使用。
監控增強這塊其實就是在在架構中增加很多内置的名額,比如 sink 平均寫入時間,序列化錯誤率,維表 join 命中率,sink 寫入時間等等一系列的名額,這些參數也有助于我們更多元度的了解任務實際情況,當然我們可以選擇是否開啟這些監控,以保證高峰期任務性能。
第二塊是血緣,它是資料治理中非常重要的一塊。我們收集資料血緣比較簡單,通過Block 的參數進行一個靜态解析來看所有的輸入源和輸出表,然後上報給資料中心,最終組成一個血緣的關系圖。
我們的智能診斷其實是利用所有監控資料來綜合判斷一個任務是不是處于正常狀态,包括四塊。
- 第一,記憶體診斷
- 第二,性能診斷
- 第三,Checkpoint 診斷
- 第四,日志診斷,舉例說明,記憶體診斷,它主要通過 GC 如何觸發報警,會有很多條判斷标準,比如 young gc 頻率,full gc 頻率, gc 耗時等等是否大于或者小于一定值,通過這些條件綜合來判斷一個任務是否存在 gc 異常,以及異常的等級。而日志會通過一定條件對錯誤日志進行篩選,這個條件可以是使用者自定義的,比如統計某個具體的 exception 的次數,當達到統計次數時候進行報警的觸發。這樣的話,某些業務不想被 failover,又想收到異常報警,就通過簡單的 try catch 就能完成。
這是我們智能診斷的一個記憶體診斷的前端的頁面,展示了一個實際任務的記憶體的使用情況分布。
五、未來規劃
最後講一下對未來的一些規劃。
第一,體系建設:權限體系将更加靈活,資料安全/使用者管理等配套将更新完善。
第二,Notebook 體系将引入其他資料計算引擎(如 druid 等)。
第三,擁抱新版本,現在的 Flink 1.12 有非常多的新特性,有些也非常吸引人。我們希望能積極的跟上社群版本。
活動推薦:
僅需99元即可體驗阿裡雲基于 Apache Flink 建構的企業級産品-實時計算 Flink 版!點選下方連結了解活動詳情:
https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506