天天看點

PyFlink 在聚美優品的應用實踐

大家好,我是來自聚美優品刷寶大資料部門的吳攀剛,本文将跟大家分享 PyFlink 在刷寶的應用,包括:背景介紹、架構演進、技術選型以及一個問題的解決思路分享。

刷寶是一款短視訊 APP,涵蓋短視訊、直播視訊等内容,為使用者提供快樂視訊和優質的主播。在來到聚美之前,我主要做離線數倉開發和資料開發,來到刷寶之後,部門也并沒有現成的實時架構,需要自行搭建。是以,當實時的需求來到我面前的時候,内心是忐忑的。

下面我将分享下,我與 PyFlink 的緣分。

1.背景介紹

業務場景

刷寶有許多重要的業務場景,其中之一是為使用者實時推薦短視訊。其中推薦的實時性,決定了使用者在視訊上的停留時長、觀看視訊時長、留存等名額,進而影響到廣告位的收益,比如廣告的單價等。

刷寶從 2019 年開始,業務飛速發展,截止到 2020 年 5 月份,使用者行為資料峰值每秒過百萬,每天有 200 億資料。這個業務量,對我們現有的技術架構、資料計算的實時性提出了挑戰。

實時化挑戰

我們的資料流程整個環節完成需要1小時左右時間,遠達不到實時的要求。如何更快速的根據使用者浏覽習慣實時推薦相關視訊會對使用者觀看視訊時長、停留時長、留存等有重大的影響,比如在現有基礎上提升10-20%。

我們更期望資料的計算實時化,也就是将原有技術架構中的批量計算(hive)變成實時計算(Flink SQL),架構圖如下。

2.架構演進

架構演進

PyFlink 在聚美優品的應用實踐
  • 第一層:最開始是離線計算,完成一次計算需要30分鐘,還不包括後續的模型處理;
  • 第二層:考慮實時計算後,我們打算采取 Flink 架構來處理,整體主件過程如圖;
  • 第三層:考慮到人力和時間等成本,還有技術人員技能比對度,最終選擇第三層;

我們成員更多的是對 Python 和 SQL 熟悉,是以 PyFlink 更加适合我們。我們用 PyFlink 開發了 20 個業務作業,目前每秒過百萬,每天有 200 億,業務平穩運作(PyFlink 1.10)。

3.技術選型

面對實時化的業務和架構更新需求,我們團隊本身沒有 Spark、Flink 等架構的背景積累,但是一個偶然的機會,我們觀看了金竹老師的直播,了解到了 PyFlink 是 Flink 的 Python API 和我團隊現有的開發人員語言技能比較吻合。是以就想利用 PyFlink 進行業務的實時化更新。

PyFlink 在聚美優品的應用實踐

看完金竹老師的分享,我對 PyFlink 有了一個簡單的了解,就和團隊同學一起規劃了解 PyFlink,進行技術選型。

初識與困難

雖然 PyFlink 和團隊的語言技能比較 match,但是其中還是涉及到很多 Flink 的環境、文檔、算子等的使用問題,遇到了很多困難:

  • PyFlink 的知識文檔、示例、答疑等都非常少,除了官網和阿裡雲,基本無其他參考。
  • PyFlink 官方文檔缺少很多細節,比如:給了方法不給參數格式。
  • PyFlink 的内容不明确,官網上沒有明确具體寫出哪些 PyFlink 沒有,哪些有。沒法将 Flink 和 PyFlink 清晰的區分開。
  • PyFlink 本身等局限性,比如:left/rigint Join 産生 retraction 無法寫入 Kafka,要寫入需要改寫 Flink SQL 讓流改為 append 模式,或者修改 kafka-connector 源碼支援 retraction。

是以一時感覺利用 PyFlink 的學習時間也比較漫長。大家比較擔心短時間内很難滿足業務開發。

機遇

在我和團隊擔心開發進度時候,我也一直關注 Flink 社群的動态,恰巧發現 Flink 社群在進行 “

PyFlink 扶持計劃

”,是以我和團隊都眼前一亮,填寫了 PyFlink 調查問卷。也和金竹老師進行了幾次郵件溝通。最終有幸參與了 PyFlink 社群扶持計劃。

4. OOM 報錯解決思路分享

其實了解下來 PyFlink 的開發是非常便捷的,在完成了第一個作業的開發之後,大家逐漸熟悉 PyFlink 的使用,3周左右就完成了 20 個業務邏輯的開發,進入了測試階段。這個快速一方面是團隊成員不斷的熟悉 PyFlink,一方面是由社群 PyFlink 團隊金竹/付典等老師的幫助和支援。這裡,不一一為大家分享全部内容,我這裡列舉一個具體的例子。

■ 背景:

從接觸到 Flink 開始,有個别 job,一直有 running beyond physical memory limits 問題。多次調整 tm 記憶體,修改 tm 和 slos 的比例,都沒用,最終還是會挂。最後妥協的方案是,增加自動重新開機次數,定期重新開機任務

■ 現象:

Flink job 通常會穩定運作5-6天,然後就報出這個錯誤。一直持續和反複。

■ 詳細資訊:

Closing TaskExecutor connection container_e36_1586139242205_122975_01_000011 because: Container [pid=45659,containerID=container_e36_1586139242205_122975_01_000011] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.8 GB of 32 GB virtual memory used. Killing container.
    Dump of the process-tree for container_e36_1586139242205_122975_01_000011 :
    |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
    |- 45659 45657 45659 45659 (bash) 0 0 115814400 297 /bin/bash -c /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.out 2> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.err 
    |- 45705 45659 45659 45659 (java) 13117928 609539 6161567744 1048471 /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .

    Container killed on request. Exit code is 143
    Container exited with a non-zero exit code 143           

我們的解決思路:

1. 從内容上看是 oom 問題,是以一開始調整了 tm 大小,直接到最大記憶體,2調整 tm 和 slot 的比例,盡量做到 1v1.
    2. dump heap 的記憶體,分析占用情況。
    3. 調整 backend state 的類型
           

結果:以上手段都失敗了,在持續一段時間後,依然一定報錯。

PyFlink 團隊處理思路:

1.分析目前作業的 state 情況,作業情況,作業環境參數情況。通過 flink-conf 可以看 backend state 情況,通過 flinkdashboard 可以知道作業圖和環境參數。

  1. 由于 1.10 中,rocksdb statebackend 占用的記憶體預設為非 managed memory,通過在 PyFlink 作業中增加如下代碼,可以将其設定為 managed memory:env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)
  2. 為了分析 OOM 是否是由于 rocksdb statebackend 占用的記憶體持續增長導緻的,開啟了關于 rocksdb 的監控,因為我們使用的是 rocksdb,這裡需要在 flink-conf 中增加如下配置:
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-usage: true
                            state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.num-running-flushes: true
state.backend.rocksdb.metrics.size-all-mem-tables: true           

然後通過自建的 metrics 系統來收集展示和分析,我們使用的 grafana。

  1. 通過前面的步驟,觀察到 rocksdb 的記憶體基本是穩定的,記憶體占用符合預期,懷疑是“rocksdb 超用了一點點,或者是 jvm overhead 不夠大”導緻的。這兩種問題,都可以通過調整 jvm overhead 的相關參數來解決。于是在 flink-conf 中添加了配置:
taskmanager.memory.jvm-overhead.min: 1024m

taskmanager.memory.jvm-overhead.max: 2048m           

用大佬的原話:rocksdb 超用了一點點,或者是 jvm overhead 不夠大,這兩種情況調大 jvm overhead 應該都能解決。

  1. 調整 flink.size 的大小,讓 flink 自動計算出 process.size,這部分在 flink-conf:
taskmanager.memory.flink.size: 1024m           

完成所有調整後,經曆了14天的等待,job 運作正常,這裡充分說明了問題被解決了。同時開始觀察 rocksdb 的 metrics 情況,發現 native 記憶體會超用一些,但是 rocksdb 整體保持穩定的。目前能判斷出某個地方用到的 native 記憶體比 flink 預留的多,大機率是使用者代碼或者第三方依賴,是以加大下 jvm-overhead 大數值,能解決問題。

  1. 最終需要修改的參數有:

1) 在 pyflink 作業中增加如下代碼:

env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)           

2) flink-conf 修改或增加:

taskmanager.memory.jvm-overhead.min: 1024m
taskmanager.memory.jvm-overhead.max: 2048m
taskmanager.memory.process.size: 6144m           

其實針對這個業務更新,老闆為了不影響最終的業務上線,起初我們準備了2套方案同時進行:

  • 基于某個雲平台進行平台搭建和開發;
  • 基于開源 PyFlink 進行代碼開發;

兩個方案同時進行,最終我們團隊基于 PyFlink 開發快速的完成了業務開發和測試。最終達到了我前面所說的每秒百萬/每天200億的穩定業務支撐。

重點,重點,重點,參與這個業務更新的開發隻有2個人。

5.總結和展望

通過 PyFlink 的學習,刷寶大資料團隊,在短時間能有了實時資料開發的能力。目前穩定運作了 20+PyFlink 任務,我們對接了多個需求部門,如推薦部門、營運、廣告等;在多種場景下,模型畫像計算、AB 測試系統、廣告推薦、使用者召回系統等,使用了 PyFlink。為我們的業務提供了堅實穩定的實時資料。

此外,我們将搭建 Flink on Zeppelin 這樣的實時計算平台,擴大 Flink 開發使用者群體,進一步簡化 Flink 開發成本。Flink 1.11 版本也準備上線,Python UDF 功能會有進一步的優化,Pandas 子產品也會被引入。假如讀者和我們一樣,期望能快速擁有實時的能力,以 Python 語言為主,并且還有資料開發/數倉的能力,PyFlink 将是不二之選。

如果您也對 PyFlink 社群扶持計劃感興趣,可以填寫下方問卷,與我們一起共建 PyFlink 生态。

PyFlink 在聚美優品的應用實踐