導讀:Flink ML 是一個基于 DataStream 的疊代引擎和機器學習算法庫。本文将對 Flink ML 庫進行介紹,主要包括以下幾部分内容:
- 概況
- Flink ML 疊代執行引擎
- Flink ML 高效算法庫
分享嘉賓|高赟/趙偉波 阿裡巴巴 技術專家/算法專家
編輯整理|劉浩平 東北大學
出品平台|DataFunTalk
01
概況
1. Flink ML現狀
Flink ML 是 Flink 生态的子項目,目标是為使用者提供高效的離線和線上算法庫。Flink ML 實作了端到端的性能測試架構,是保障整個算法性能的基礎。它提供了完整的 Python 支援,使用者可以通過 Python 送出任務。并且提供了完善的幫助文檔和網站,在 Flink 官網的左側導航欄可以檢視對應文檔。它也在補充更多的離線和線上算法。
圖1 Flink Table Store v0.2 的架構
2. 基于 Flink 的機器學習算法庫發展曆史
Alink 是最早基于 Flink 的機器學習生态項目,始于 2017 年(阿裡巴巴),開源于 2019 年,包含豐富的機器學習算法,并且已經在服務大量雲上的客戶。現在希望将 Alink 的設計思路、代碼貢獻回 Flink 社群。
--
02
Flink ML 疊代執行引擎
1. 疊代執行引擎:場景
Flink 是基于 DAG 描述流批一體的處理引擎,但在許多場景下,如機器學習或圖計算場景,以及線上預測希望根據預測結果實時調整參數,這些場景下需要用到資料疊代處理的能力。比如某些算法,在機器學習中進行離線或線上的訓練,這種情況下需要對訓練資料和模型不斷地疊代更新直到模型收斂,此時需要在 Flink 中支援後續節點将結果傳回給前面節點,即支援帶環的資料處理能力。預設 Flink 是基于 DAG 的處理引擎,是以就需要在 Flink 之上提供資料疊代處理的執行引擎,同時希望可以支援離線和線上場景(如圖 2 所示)。
圖2 支援場景
接下來以邏輯回歸為例了解一下算法如何在這幾種場景下執行。前面提到了離線訓練、線上訓練、線上動态調整模型參數三個場景(如圖 3 所示)。這三種場景之間既有共性又有差別。
首先看離線訓練場景。以邏輯回歸為例,在邏輯回歸當中需要一個初始的模型參數将結果發送到疊代中的節點,比如用來做模型緩存的節點。簡單起見假設該節點的并發度為 1,儲存了目前最新的模型。
另外有一個并發為 N 的訓練節點。由于是離線訓練,是以訓練節點在疊代開始之前可以預先把所有資料讀取到訓練節點當中。然後在每一輪疊代中,當訓練節點收到從模型節點發送的最新的模型之後,就會從預先緩存的全量訓練資料中選擇一小部分資料(Mini-batch)。基于目前最新的模型計算一個模型更新,模型更新通過某種方式發揮到模型緩存的節點。模型緩存節點把這個模型更新應用到之前儲存的最新模型上就可以完成一輪疊代,最後得到下一輪疊代開始的模型,模型緩存節點再把模型發送到訓練節點并開始下一輪疊代。
對于線上訓練,訓練資料就無法進行預先緩存,訓練資料是實時的。在這種情況下,每次從模型緩存收到下一個模型之後,從訓練資料源讀取下一個 Mini-batch 資料,也就是讀取下一部分固定條數的資料。基于這部分資料和擷取的最新模型計算一個模型更新,然後把模型更新發回到訓練節點實作模型的實際更新。在這個過程中訓練節點要能夠在讀取模型和讀取下一個 Mini-batch 之間進行動态切換。
對于離線和線上訓練,有同步和異步兩種實作方式。如果是同步,模型緩存節點需要收齊所有訓練節點的更新,模型緩存節點應用全部更新之後再發送給訓練節點進行下輪疊代。相當于每個訓練節點的更新次數都是同步的。
另一種是異步更新,對于模型緩存節點,它從一部分或者一個訓練節點收到模型更新之後,會立即應用模型的更新,然後向所有節點廣播模型最新的結果。還有一種是線上上進行預測的時候,有可能需要根據預測結果實時調整參數。比如某些算法當中,根據預測的結果動态的在延遲和精度之間選擇。
可見這幾種場景中都需要下遊節點把結果發送給上遊節點,通過這種方式實作資料循環處理。
圖3 疊代執行引擎場景
2. 疊代執行引擎:需求
基于前面提到的三個場景,對疊代的需求作個總結,如圖 4 所示。
首先,統一的疊代結構圖。這三個場景都需要在 Flink 的作業中引入統一的有環疊代結構支援資料的循環處理。
其次,統一的疊代終止判斷。這三種場景都需要在某種條件下能夠終止疊代,也就是疊代執行引擎要為使用者提供判斷疊代終止的邏輯。由于疊代就是在整個作業中引入了環,是以它的終止邏輯和普通的 DAG 執行邏輯是有差別的。
最後,提供整個資料集處理完一輪的通知。在疊代中需要為使用者提供每處理完一輪疊代,對算子進行通知的能力。比如之前提到的模型緩存的算子,例如在同步情況下,當從所有訓練節點收到模型更新資料之後,就需要進行一次模型的更新。這時候就需要有消息通知訓練節點所有模型的更新已經收齊。而訓練節點從模型緩存節點收到最新的模型之後,也需要通知緩存節點整個模型已經收到了,可以開始計算下一輪模型更新。是以就需要當處理完整個資料集之後通知的能力。
無論線上訓練還是離線訓練中,都需要對資料做 Mini-batch 切分。這裡有兩個選擇,一個是,在整個疊代層提供 Mini-batch 的語義。然後疊代算法可以直接看到一個 Mini-batch,同時也提供 Mini-batch 處理完成之後通知的能力。
另一種可能是将 Mini-batch 交給上層,最後我們選擇了在疊代層提供整個資料集處理完成之後進行通知的能力。而把 Mini-batch 處理就是交給了算法層進行處理。主要有三個原因,第一點,Mini-batch 可能有兩種處理邏輯:一是,串行順序處理 Mini-batch;另一種是,多個 Mini-batch 并行處理。如果要在疊代層同時支援這兩種語義的話,就必須在疊代層引入一套非常複雜的關于 Mini-batch 資料流的描述。這樣就需要提供一套完全獨立于 Flink DataStream API 的 API,如果引入 Mini-batch 就需要引入一套複雜的基于 Mini-batch 的 DataStream API,并且所有算子需要進行重寫來支援 Mini-batch 的語義。
第二點,對于圖 3 中的第三種情況,可能每處理完一條資料都需要向上遊節點發送一部分資料。這種情況下把它歸結為 Per-record 的 Mini-batch,否則就要提供一個無限大的 Mini-batch,或者每個 Record 切分一次 Mini-batch,這些操作都有較大的額外開銷。考慮到上述情況,在疊代層提供了整個資料集處理完成之後通知的能力。而劃分 Mini-batch 可以由實作算法的算子來 Mini-batch 處理。後續也會實作在疊代之上提供整個 Mini-batch 的處理的邏輯。
圖4 疊代執行引擎:需求
3. 疊代執行引擎:設計
下面詳細介紹一下疊代執行引擎的設計和實作。
疊代的設計如圖 5 所示,主要由四個部分組成。第一,指定一個有回邊的輸入,比如圖 5 中的初始輸入就是模型的初始參數,每一輪疊代都會對模型進行更新,是以模型的輸入就會有一個和它對應的回邊,回邊傳遞的是模型更新;第二是,沒有回邊的輸入,比如訓練資料集,這個資料集是隻讀的不需要更新;第三個是回邊;第四個是模型疊代終止之後的輸出。前面提到在疊代過程中需要具備進度追蹤能力,即每處理完一輪,都需要通知算子本輪疊代處理完了,此時算子可以做一些特定的操作,比如更新模型或者計算下一個模型更新。
對于疊代中無回邊的輸入,Flink ML 提供了兩種語義,一種是重放,即每一輪疊代都會從邊上讀取相同的輸入;另一種是不重放,即資料集隻傳輸一次。
對于算子的生命周期也提供了兩種,一種是每輪重建,即每輪建立新的算子執行個體來處理收到的資料;第二種是每輪不重建,即用一個算子執行個體處理每一輪疊代。無論是初始輸入還是回邊收上的輸入,模型的更新都交給一個算子執行個體處理。針對這兩種生命周期使用者有兩種使用方法:第一,如果不重建算子,隻建立一個模型緩存算子執行個體,既處理模型的初始輸入,也處理每一輪的模型更新。這樣可以在本地緩存最新的模型,避免在疊代中傳輸整個模型;第二,選擇每輪重建算子,并選擇重放算子使用的無回邊輸入。這樣在疊代中可以複用疊代外的算子。比如疊代的算子既有無回邊輸入,也包含有回邊輸入,每輪重建算子就可以友善地使用比如 Join 或 Reduce 算子等疊代外的實作。
最後,疊代提供了兩種終止邏輯。其一,當疊代的所有輸入都疊代處理完成,這種情況就認為疊代終止。在這種終止邏輯中,會發送一個特殊消息,當疊代一輪也沒發現有新的資料輸入,就會終止疊代;其二,對于有限資料流的疊代,允許使用者指定一個特定的節點,當該節點某一輪沒有輸出的時候,就認為疊代終止。比如運作最短路徑的圖算法,如果某個節點負責輸出下一輪需要的更新。當該節點不再有更新就認為疊代已經終止。
圖5 疊代執行引擎:設計
3. 疊代執行引擎:API
基于上述 API 實作一個疊代的例子,如圖 6 所示,擷取 initParameters 和 Dataset 兩個 DataStream 之後,調用 Iterations.iterate 方法。iterate 方法需要傳入四個參數:有回邊輸入、無回邊輸入、算子每輪是否重建、疊代體處理邏輯。疊代體處理邏輯基于有回邊和無回邊的輸入清單建構疊代體。疊代體将傳回兩個輸出清單,一個是與有回邊輸入資料一一對應的有回邊清單,并将執行 Union 操作後,再交給疊代體處理;第二個是疊代完成後的最終輸出。由于 Flink 需要預先建構 DAG 圖,是以要根據使用者指定的執行邏輯建構包含回邊的 DAG 圖,這一點會在後續做詳細的介紹。
圖6 疊代執行引擎:API
疊代内的算子可以實作 IterationListener 接口,如圖 7 所示,實作該接口後會得到兩個通知,一是每輪資料輸入完成的通知,另一個是疊代終止的通知。
圖7 疊代執行引擎:API
以模型緩存算子為例,圖 7 中左上角橙色的模型緩存算子,當收到模型更新後,就将更新加到目前的模型上。緩存算子收齊模型更新便将最新的模型發送給下遊的訓練節點。當疊代終止就得到最終的輸出,這裡使用了 Flink 的 Sideoutput 機制。
4. 疊代執行引擎:實作
接下來看一下疊代引擎的具體實作,如圖 8 所示。對于圖 8 左側的疊代體結構,其中有回邊和輸入邊是一一對應的。如圖 8 右側的結構,在圖中引入一些特殊算子,包括 Input、Output、Head、Tail 等算子。另外會對疊代體中的算子做 Wrap,它負責管理其中算子的生命周期,比如每輪是否重建。Head 和 Tail 算子通過 Colocation 機制被排程到同一個 TM 程序中。這樣可以實作基于記憶體的回邊消息隊列,由于 Tail 和 Head 節點在同一個程序中,當資料到達 Tail 節點後可以基于記憶體直接傳輸資料。
圖8疊代執行引擎:實作
疊代中當資料通過 Input 算子輸入疊代體時,會對每條記錄做 Wrap 操作并添加疊代頭,疊代頭上記錄了該記錄所屬的疊代的輪次。Wrap 後的資料輸入到 Wrapper 算子時,Wrapper 算子去掉疊代頭後,再交給使用者編寫的算子處理。當所有元素輸入完後,将插入一個 Barrier 節點标記本輪輸入結束。
此外當疊代執行完成,如果中間算子通過 Output 算子輸出,Output 算子将去掉疊代頭。在疊代中 Head 節點會判斷疊代是否終止以及每輪疊代是否結束。
當 Input 算子讀取完輸入資料,如果資料是有限的将在其後插入一個特殊的 Barrier,Barrier 将跟随資料進入循環。當 Head 節點讀取完每一輪輸入,将通過 Head Operator Coordinator 和所有的 Head Task 并發通信,Head Operator Coordinator 是 JM 中的全局元件。Head Task 會通知 Head Operator Coordinator 已經讀取完輸入,然後 Head Operator Coordinator 會通知所有 Head 節點本輪輸入結束。這種情況下,Head 節點首先拿到本輪輸入完成的通知,然後 Head 節點将資料和這個特殊的消息(圖 9 中藍色塊)廣播發送給下遊節點。然後下遊節點從所有的輸入讀到這個特殊的消息之後,就代表這一輪的輸入已完成,然後他所 Wrapper 的算子這一輪輸入完成。通過這種方式實作了進度追蹤。這個邏輯與 Flink 中 Watermark 管理邏輯是類似的。當資料經過 Tail 節點時,就會把頭的資料和特殊消息上所記錄的疊代輪次都做加 1 的操作。
圖9疊代執行引擎:實作
因為 Flink 預設的容錯機制不支援環,是以引入環後就必須對 Checkpoint 機制做擴充。擴充主要包括兩部分:一部分是在正常 Checkpoint 時,當 Head 節點收到Barrier之後,除了做 Snapshot 之外,Head 節點還會記錄回邊上收到的資料,并往下發送Barrier。當Barrier經過Tail節點通過環回到 Head 節點後,此時就記錄了 Checkpoint 當中回邊上的所有資料。這樣就實作了帶環的 Chandy-Lamport 算法,最後儲存的 Checkpoint 就是這些算子的 Snapshot 以及回邊的資料清單。
除此之外,Head 節點在每一輪 Checkpoint 時,Head 節點即要等待輸入邊上的Barrier 對齊,還要等待 OperatorCoordinator 的虛拟 Barrier。這個 Barrier 保證它與 OperatorCoordinator 本輪消息收集的通知不會交叉,保證在疊代中,所有 Task 都在同一輪。這樣在算子擴并發和縮并發時,可以很容易地合并同一個算子不同執行個體的狀态。
最後還有一個正在做的優化,在某一輪結束之後立即發送緩存的虛拟 Barrier 資料。這樣如果疊代中有些節點是通過緩存所有的資料,等到某一輪結束時發送,這種情況下保證它儲存的資料量最小。
圖10疊代執行引擎:實作
5. 疊代執行引擎:總結
以上是關于疊代執行引擎的介紹,下面對疊代執行引擎做一個總結,如圖 11 所示。首先疊代執行引擎實作了統一支援離線和線上算法訓練。其次提供了 Exactly-Once 容錯機制。當輸入是有限集合時,将進支援 Batch 執行模式。并且未來将提供統一的上層算法開發工具,如前面提到的 Mini-batch 工具或者模型緩存的統一實作。疊代執行引擎在 Flink ML 庫中實作了一些依賴疊代算法的基礎,比如邏輯回歸或 Kmeans。
下面是關于 Flink ML 中離線和線上算法的介紹。
圖11疊代執行引擎:總結
--
03
Flink ML 高效算法庫
1. Alink 簡介
下面介紹基于疊代的 Flink ML 高效的算法庫。基于 Flink 的 ML 算法庫 Alink 開始于 2017 年,到目前為止已經做了 5 年,内置了豐富的算子,如圖 12 所示,包括:
① 分類算法,包括邏輯回歸、Softmax、樹相關的 GBDT、随機森林相關的分類算法
② 回歸算法,包括線性回歸、GBDT 回歸等
③ 聚類算法
④ 算法評估
⑤ 關聯分析相關的FPGrowth 、PrefixSpan和ALS
⑥ 相似度計算以及模型調優的算子
⑦ 文本相關的算法
⑧ 特征工程
⑨ 統計分析算子
⑩ 資料處理算子
經過多年發展,Alink 内置了一套比較完備的算法庫,基于 Alink 可以很友善地搭建業務流程,解決實際業務中遇到的問題。
圖12高效的離線/線上學習算法庫
在做的 Flink ML 目的是将 Alink 算法回饋到 Flink 社群,如圖 13 所示,即通過流的方式在 Flink ML 的 1.15 版本中重新實作這些算法并做相關的優化。
圖13 高效的離線/線上學習算法庫
2. 算法性能
将 Flink ML 與 Spark ML 的部分算法做了一個對比,如圖 14 所示,其中包括Kmeans、String Indexer、MinMaxScaler 以及 OneHotEncoder 這幾個算法。通過圖 14 中的結果可知 Flink ML 的性能不弱于 Spark,個别算法明顯優于Spark 的性能。這也給我們基于 Flink 開發 ML 增加了很大的信心,基于Flink也可以做出高性能的 ML 算法庫。
圖14 Flink ML 與 Spark ML 部分算法性能對比
3. 疊代引擎在 Online LR 的應用
基于前一小節的疊代引擎實作 Online LR 算法的應用。前一小節是基于疊代引擎來講 Online LR。而這裡是從應用的角度,或者從線上邏輯回歸算法的角度來講疊代引擎在實際算法中的應用。
如圖 15 是 Online LR 算法的流程圖。其中左邊是實時資料,右邊是初始模型。實時資料切分成 Mini-batch 然後分布到各個 Work 上計算梯度。計算梯度的時候需要用到模型隊列,它包含兩部分:一部分是初始模型;另一部分是計算完之後的更新模型,更新模型會廣播分發到 Worker 上。各個 Worker 節點使用更新模型以及資料來計算梯度。最後再通過 Reduce 操作計算得到最終的梯度,并用這個梯度更新模型。輸出模型一方面輸出為預測使用;另一方面它會回報(Feedback)給模型隊列,作為下一步 Mini-batch 疊代提供 Base 模型。
圖15 疊代引擎在 Online LR 的應用
以上是 Online LR 的基本流程。下面來看基于疊代引擎如何實作 Online LR,如圖16 所示。
首先,圖 16 中 IterationBodyResult 是核心的疊代體,其中第一個參數DataStreamList 傳入 Model 隊列,第二個參數DataStreamList把實時資料傳進來。可以看到參數并不是單獨的 DataStream 而是一個 List,List 說明可以支援複雜的多組資料的模型。另外實時訓練資料也可以是數組,示例的 Online LR 算法中隻有一個訓練資料。實際在設計疊代引擎時,需要考慮更複雜的場景。
圖16疊代引擎在 Online LR 的應用
接下來看圖 17 中第一個綠色背景的代碼是計算本地梯度。當拿到 Mini-batch 後,将本地計算梯度。計算完梯度之後,圖 17 中第二個綠色背景的代碼執行行 Reduce 操作并得到整體的梯度值。之後,在圖 17 中的 Update Model 部分用該梯度對模型進行更新。
圖17疊代引擎在 Online LR 的應用
更新之後,在圖 18 中通過疊代架構的機制将模型資料 Feedback 再重新廣播做循環計算。在回流疊代的同時将模型輸出,為下一個流程做準備。以上就是 Online LR 在疊代引擎中實作的基本流程。在整個過程中,計算梯度以及更新模型都封裝成了獨立的函數。如果不是 Online LR 而是 Online SVM 隻要調整梯度計算以及模型更新就可以了。是以這套機制不光适用于 Online LR 隻要是基于梯度更新的模型都可以實作 Online 版本的,基于疊代更新實作一個線上算法。
圖18疊代引擎在 Online LR 的應用
4. Online LR 算法使用
如圖 19 中的示例代碼,第一步,算法需要一個初始的 LR 模型,示例中調用的是offlineTrainDenseTable 的邏輯回歸,訓練得到邏輯回歸的初始模型。第二部,基于初始模型構造 Online 邏輯回歸。從 Online 邏輯回歸的 Fit 得到實時的模型流,相當于 Fit 完之後,Online 邏輯回歸模型是一個不斷地有模型流出來的 DataStream。
圖19 Online LR 算法使用
第三步,是使用線上 LR 模型推理并寫出結果如圖 20 所示。最後,寫出線上 LR模型。以上是 Online LR 算法的使用。
圖20 Online LR 算法使用
5. KMeansDemo
下面再展示一個 KMeans Demo。這個 Demo 是對 KMeans 算法做的線上實時學習的例子。如圖 21 所示,首先是參數設定,Demo 設定了本地接收資料的端口為 9999,并行度是 1,輸入資料的次元是 2,聚類個數是 2,GlobalBatchSize 是 1 (GlobalBatchSize 設定為 1 主要是為了示範,實作來一條資料,就疊代更新一次模型),DecayFactor 參數值為 0.5。
圖21 KMeans Demo
如圖 22 所示,啟動 9999 端口并寫資料。如圖寫入(1,2)。這裡是初始模型,可以看到輸入資料是 1 和 2,這兩個資料更接近于圖中 Model version-0 中的 1-th center 這個點([0.509, 0.400], 1.0),是以模型會更新這個聚類點。從圖 22 中 Model version-1 可以看到 0-th center 這個點沒有變化,而下面的 1-th center 點根據輸入資料更新了一次。
圖22 KMeans Demo
如圖 23 所示,再輸入幾組資料再觀察一下模型的輸出,從圖中可以看到第一個聚類點做了兩次更新,可以看到每發送一條資料,就會實時的疊代更新模型。
圖23 KMeansDemo
接下來再輸入一組其他資料(-1,-2),這組資料離上一個聚類點(0-th Center)比較近。當輸入(-1,-2)時 0 号聚類就不斷疊代更新。而 1 号聚類點就不受影響。以上示例示範了實時線上學習的 Demo。
圖24 KMeans Demo
6. Flink ML 發展路線
最後介紹一下 Flink ML 未來的發展,如圖 25 所示。Flink ML 在 2022 年 1 月份的時候釋出了 Flink ML API,其中結合 Alink 的 API 設計,考慮到流處理算法的開發,對 API 做了重新的架構設計。在 2022 年 7 月份的時候釋出了一個版本,主要是開發了高效的 Flink ML 的基礎設施。未來,預計在 2022 年 11 月份的下一個釋出版本中,側重于特征工程算法,并考慮一些業務場景進行落地。最終希望将 Flink ML 做成我們傳統實時機器學習實施标準。
圖25 Flink ML 發展路線
7. 參考資料
大家如果對 Flink ML 感興趣,可以看一下項目的 GitHub 和文檔,連結位址如圖26 所示。
圖26 參考資料
--
04
問答環節
Q1:Flink ML 與 Alink 是什麼關系?
A1:Alink 是 Flink 生态之外的一塊項目,我們希望把 Alink 的所有能力都賦能給 Flink ML。
Q2:一般用什麼類型的存儲?
A2:對于是中繼資料或者模型這部分的存儲,是 Flink ML 内置的功能。如果是一些偏表格的資料,可以考慮用傳統的方式來統計。
Q3:是否支援模型評估?
A3:目前支援二分類評估。将來會逐漸完善評估子產品,比如回歸的評估。
Q4:Online 的業務場景
A4:現在主要是推薦和廣告業務中用得比較多,因為它們對實時性要求比較高,是以這些領域比較多一些。
Q5:線上更新怎麼保證模型不會學壞了?
A5:主要是因為學習率太大,或者資料裡邊有壞資料把整個模型拉壞了。要避免這種情況主要考慮使用 Rebase 的方式,隔段時間把模型往後拉一下。不要一直基于一個模型持續訓練很長時間。
Q6:是否支援低品質資料的過濾?
A6:這實際上就是資料過濾,對于資料品質比如填充或者是資料過濾,這個應該是在訓練之前就應該做的,資料清洗這塊不應該在訓練過程支援,這需要單獨的操作或元件來完成。
Q7:如果模型退化了,怎麼復原到指定的版本?
A7:這是一個模型管理的問題。Online 算法隻輸出模型,之後會有一個對模型進行評估的機制。這個機制需要在算法之外實作。
今天的分享就到這裡,謝謝大家。
|分享嘉賓|
高赟博士
阿裡巴巴 技術專家
阿裡巴巴技術專家,Apache Flink PMC。高赟博士畢業于中國科學院大學,加入阿裡巴巴實時計算團隊,主要從事 Flink Runtime / DataStream API / 疊代引擎方向的開發與改進。
趙偉波
阿裡巴巴 算法專家
2010畢業于北京大學數學系,2017年進入阿裡巴巴從事算法研發相關的工作。
|DataFun新媒體矩陣|
|關于DataFun|
專注于大資料、人工智能技術應用的分享與交流。發起于2017年,在北京、上海、深圳、杭州等城市舉辦超過100+線下和100+線上沙龍、論壇及峰會,已邀請超過2000位專家和學者參與分享。其公衆号 DataFunTalk 累計生産原創文章800+,百萬+閱讀,15萬+精準粉絲。