天天看點

TensorFlow On Flink 原了解析

作者:陳戊超(仲卓),阿裡巴巴技術專家

深度學習技術在當代社會發揮的作用越來越大。目前深度學習被廣泛應用于個性化推薦、商品搜尋、人臉識别、機器翻譯、自動駕駛等多個領域,此外還在向社會各個領域迅速滲透。

背景

目前,深度學習的應用越來越多樣化,随之湧現出諸多優秀的計算架構。其中 TensorFlow,PyTorch,MXNeT 作為廣泛使用的架構更是備受矚目。在将深度學習應用于實際業務的過程中,往往需要結合資料處理相關的計算架構如:模型訓練之前需要對訓練資料進行加工生成訓練樣本,模型預測過程中需要對處理資料的一些名額進行監控等。在這樣的情況下,資料處理和模型訓練分别需要使用不同的計算引擎,增加了使用者使用的難度。

本文将分享如何使用一套引擎搞定機器學習全流程的解決方案。先介紹一下典型的機器學習工作流程。如圖所示,整個流程包含特征工程、模型訓練、離線或者是線上預測等環節。

TensorFlow On Flink 原了解析

在此過程中,無論是特征工程、模型訓練還是模型預測,中間都會産生日志。需要先用資料處理引擎比如 Flink 對這些日志進行分析,然後進入特征工程。再使用深度學習的計算引擎 TensorFlow 進行模型訓練和模型預測。當模型訓練好了以後再用 tensor serving 做線上的打分。

上述流程雖然可以跑通,但也存在一定的問題,比如:

  1. 同一個機器學習項目在做特征工程、模型訓練、模型預測時需要用到 Flink 和 TensorFlow 兩個計算引擎,部署相對而言更複雜。
  2. TensorFlow 在分布式的支援上還不夠友好,運作過程中需要指定機器的 IP 位址和端口号;而實際生産過程經常是運作在一個排程系統上比如 Yarn,需要動态配置設定 IP 位址和端口号。
  3. TensorFlow 的分布式運作缺乏自動的 failover 機制。

    針對以上問題,我們通過結合 Flink 和 TensorFlow,将 TensorFlow 的程式跑在 Flink 叢集上的這種方式來解決,整體流程如下:

TensorFlow On Flink 原了解析

特征工程用 Flink 去執行,模型訓練和模型的準實時預測目标使 TensorFlow 計算引擎可以跑在 Flink 叢集上。這樣就可以用 Flink 一套計算引擎去支援模型訓練和模型的預測,部署上更簡單的同時也節約了資源。

Flink 計算簡介

TensorFlow On Flink 原了解析

Flink 是一款開源大資料分布式計算引擎,在 Flink 裡所有的計算都抽象成 operator,如上圖所示,資料讀取的節點叫 source operator,輸出資料的節點叫 sink operator。source 和 sink 中間有多種多樣的 Flink operator 去處理,上圖的計算拓撲包含了三個 source 和兩個 sink。

機器學習分布式拓撲

機器學習分布式運作拓撲如下圖所示:

TensorFlow On Flink 原了解析

在一個機器學習的叢集當中,經常會對一組節點(node)進行分組,如上圖所示,一組節點可以是 worker(運作算法),也可以是 ps(更新參數)。

如何将 Flink 的 operator 結構與 Machine Learning 的 node、Application Manager 角色結合起來?下面将詳細講解 flink-ai-extended 的抽象。

Flink-ai-extended 抽象

首先,對機器學習的 cluster 進行一層抽象,命名為 ML framework,同時機器學習也包含了 ML operator。通過這兩個子產品,可以把 Flink 和 Machine Learning Cluster 結合起來,并且可以支援不同的計算引擎,包括 TensorFlow。

如下圖所示:

TensorFlow On Flink 原了解析

在 Flink 運作環境上,抽象了 ML Framework 和 ML Operator 子產品,負責連接配接 Flink 和其他計算引擎。

ML Framework

TensorFlow On Flink 原了解析

ML Framework 分為 2 個角色。

  1. Application Manager(以下簡稱 am) 角色,負責管理所有 node 的節點的生命周期。
  2. node 角色,負責執行機器學習的算法程式。

在上述過程中,還可以對 Application Manager 和 node 進行進一步的抽象,Application Manager 裡面我們單獨把 state machine 的狀态機做成可擴充的,這樣就可以支援不同類型的作業。

深度學習引擎,可以自己定義其狀态機。從 node 的節點抽象 runner 接口,這樣使用者就可以根據不同的深度學習引擎去自定義運作算法程式。

TensorFlow On Flink 原了解析

ML Operator

ML Operator 子產品提供了兩個接口:

  1. addAMRole,這個接口的作用是在 Flink 的作業裡添加一個 Application Manager 的角色。Application Manager 角色如上圖所示就是機器學習叢集的管理節點。
  2. addRole,增加的是機器學習的一組節點。

利用 ML Operator 提供的接口,可以實作 Flink Operator 中包含一個Application Manager 及 3 組 node 的角色,這三組 node 分别叫 role a、 role b,、role c,三個不同角色組成機器學習的一個 cluster。如上圖代碼所示。Flink 的 operator 與機器學習作業的 node 一一對應。

機器學習的 node 節點運作在 Flink 的 operator 裡,需要進行資料交換,原理如下圖所示:

TensorFlow On Flink 原了解析

Flink operator 是 java 程序,機器學習的 node 節點一般是 python 程序,java 和 python 程序通過共享記憶體交換資料。

TensorFlow On Flink

TensorFlow 分布式運作

TensorFlow On Flink 原了解析

TensorFlow 分布式訓練一般分為 worker 和 ps 角色。worker 負責機器學習計算,ps 負責參數更新。下面将講解 TensorFlow 如何運作在 Flink 叢集中。

TensorFlow Batch 訓練運作模式

TensorFlow On Flink 原了解析

Batch 模式下,樣本資料可以是放在 HDFS 上的,對于 Flink 作業而言,它會起一個source 的 operator,然後 TensorFlow 的 work 角色就會啟動。如上圖所示,如果 worker 的角色有三個節點,那麼 source 的并行度就會設為 3。同理下面 ps 角色有 2 個,是以 ps source 節點就會設為 2。而 Application Manager 和别的角色并沒有資料交換,是以 Application Manager 是單獨的一個節點,是以它的 source 節點并行度始終為 1。這樣 Flink 作業上啟動了三個 worker 和兩個 ps 節點,worker 和 ps 之間的通訊是通過原始的 TensorFlow 的 GRPC 通訊來實作的,并不是走 Flink 的通信機制。

TensorFlow stream 訓練運作模式

TensorFlow On Flink 原了解析

如上圖所示,前面有兩個 source operator,然後接 join operator,把兩份資料合并為一份資料,再加自定義處理的節點,生成樣本資料。在 stream 模式下,worker 的角色是通過 UDTF 或者 flatmap 來實作的。

同時,TensorFlow worker node 有3 個,是以 flatmap 和 UDTF 相對應的 operator 的并行度也為 3, 由于ps 角色并不去讀取資料,是以是通過 flink source operator 來實作。

下面我們再講一下,如果已經訓練好的模型,如何去支援實時的預測。

使用 Python 進行預測

TensorFlow On Flink 原了解析

使用 Python 進行預測流程如圖所示,如果 TensorFlow 的模型是分布式訓練出來的模型,并且這個模型非常大,比如說單機放不下的情況,一般出現在推薦和搜尋的場景下。那麼實時預測和實時訓練原理相同,唯一不同的地方是多了一個加載模型的過程。

在預測的情況下,通過讀取模型,将所有的參數加載到 ps 裡面去,然後上遊的資料還是經過和訓練時候一樣的處理形式,資料流入到 worker 這樣一個角色中去進行處理,将預測的分數再寫回到 flink operator,并且發送到下遊 operator。

使用 Java 進行預測

TensorFlow On Flink 原了解析

如圖所示,模型單機進行預測時就沒必要再去起 ps 節點,單個 worker 就可以裝下整個模型進行預測,尤其是使用 TensorFlow 導出 save model。同時,因為 saved model 格式包含了整個深度學習預測的全部計算邏輯和輸入輸出,是以不需要運作 Python 的代碼就可以進行預測。

此外,還有一種方式可以進行預測。前面 source、join、UDTF 都是對資料進行加工處理變成預測模型可以識别的資料格式,在這種情況下,可以直接在 Java 程序裡面通過 TensorFlow Java API,将訓練好的模型 load 到記憶體裡,這時會發現并不需要 ps 角色, worker 角色也都是 Java 程序,并不是 Python 的程序,是以我們可以直接在 Java 程序内進行預測,并且可以将預測結果繼續發給 Flink 的下遊。

總結

在本文中,我們講解了 flink-ai-extended 原理,以及Flink 結合 TensorFlow 如何進行模型訓練和預測。希望通過本文大分享,大家能夠使用 flink-ai-extended, 通過 Flink 作業去支援模型訓練和模型的預測。