天天看點

美團圖靈機器學習平台性能起飛的秘密(一)

作者:美團技術團隊
美團圖靈機器學習平台在長期的優化實踐中,積累了一系列獨特的優化方法。本文主要介紹了圖靈機器學習平台在記憶體優化方面沉澱的優化技術,我們深入到源碼層面,介紹了Spark算子的原理并提供了最佳實踐。希望為讀者帶來一些思路上的啟發。

導語

圖靈平台是美團履約平台技術部2018年開始自研的算法平台,提供模型全生命周期的一站式服務,旨在幫助算法同學脫離繁瑣的工程化開發,把有限的精力聚焦于業務和算法的疊代優化中。

随着美團圖靈機器學習平台的發展,圖靈技術團隊在記憶體優化、計算優化、磁盤IO優化三個方面沉澱了一系列性能優化技術。我們将以連載的方式為大家揭秘這些技術。本文作為該系列的開篇之作,将重點為大家介紹記憶體優化。

1. 業務背景

圖靈平台主要包括機器學習平台、特征平台、圖靈線上服務(Online Serving)、AB實驗平台四大功能,具體可參考《一站式機器學習平台建設實踐》以及《算法平台線上服務體系的演進與實踐》這兩篇部落格。其中,圖靈機器學習平台的離線訓練引擎是基于Spark實作的。

随着圖靈的使用者增長,越來越多算法模型在圖靈平台上完成疊代,優化離線訓練引擎的性能和吞吐對于節約離線計算資源顯得愈發重要。經過半年持續的疊代,我們積累了一系列獨特的優化方法,使圖靈機器學習平台的離線資源消耗下降80%,生産任務平均耗時下降63%(如下圖所示),圖靈全平台的訓練任務在性能層面都得到了較為明顯的提升。

資源消耗下降:

美團圖靈機器學習平台性能起飛的秘密(一)

圖1 資源消耗

目前平台性能:

下圖是某位圖靈使用者的實驗。使用100萬資料訓練深度模型,總計約29億的資料調用深度模型,計算評估名額并儲存到Hive,整個實驗隻需要35分鐘。其中Spark開啟DynamicAllocation,maxExecutor=400 ,單個Executor為7Core16GB。

美團圖靈機器學習平台性能起飛的秘密(一)

圖2 實驗運作圖

2. 圖靈訓練引擎優化

那麼,圖靈訓練引擎的性能優化是如何做到的呢?我們的優化分為記憶體優化、計算優化、磁盤IO優化三個層面。

記憶體優化包括列裁切、自适應Cache、算子優化。我們借鑒Spark SQL原理設計了列裁切,可以自動剔除各元件中使用者實際沒有使用的字段,以降低記憶體占用。何時對Dataset Persist和Unpersist一直是Spark代碼中的取舍問題,針對使用者不熟悉Persist和Unpersist時機這個問題,我們将多年的開發經驗沉澱在圖靈中,結合列裁切技術實作自适應Cache。在計算優化方面,我們完成了圖優化、Spark源碼優化、XGB源碼優化。在磁盤IO優化方面,我們創新性的實作了自動化小檔案儲存優化,能夠使用一個Action實作多級分區表小檔案的合并儲存。

此外,我們實作的TFRecord表示優化技術,成功将Spark生成的TFRecord體積減少50%。因圖靈平台使用的優化技巧較多,我們将分成多篇文章為大家逐一介紹這些優化技術。

美團圖靈機器學習平台性能起飛的秘密(一)

圖3 圖靈訓練引擎優化

而在衆多優化中,收益最高、适用性最廣的技術的就是算子優化,這項技術極大提升了圖靈訓練引擎的吞吐量。本篇文章首先将為大家介紹記憶體優化中的算子優化技術。

3. Spark算子解讀

同樣的業務需求,不同的算子實作會有不一樣的特性。我們将多年的Spark開發技巧總結在下表中:

美團圖靈機器學習平台性能起飛的秘密(一)

表1 Spark算子開發技巧總結

  1. 多行輸入多行輸出:多行資料一起進入記憶體處理。輸出多行資料。
  2. 多列輸出:特定場景下,我們希望輸出多個字段。 SQL場景下隻能輸出Struct,再從Struct中SELECT各字段。map/flatMap/mapPartitions可以輕松輸出任意個字段。
  3. 中間結果複用: SQL場景下:SQL場景下隻能先SELECT一次得到中間變量,再SELECT中間變量完成後續處理。map/flatMap/mapPartitions可将計算邏輯封裝在函數内。
  4. 重量級對象複用: Executor級别,例如可以通過廣播變量實作,或者通過靜态類成員變量的“懶漢”模式實作。Partition級别,mapPartitions時,先建立對象,後疊代資料,這個對象可在Partition内複用。

通過對比我們發現,mapPartitions是各類算子中最為靈活——可以靈活實作輸入M條輸出N條資料,可以輸出任意數量的字段,還可以實作重量級對象在Partition或Executor級别上的複用。mapPartitions因其強大的功能和靈活可定制性,在圖靈訓練引擎的開發中有着舉足輕重的地位(例如按Batch調用深度模型、上下采樣、Partition統計等元件,都是基于該算子實作)。但是mapPartitions也有一個不足之處。

4. mapPartitions之殇

相信大部分讀者都曾經寫過這樣的代碼,建立一個重量級對象在Partition内完成複用,而不是像map算子那樣每處理一行資料建立一個對象。

mapPartitions模闆,重量級對象複用

dataset.mapPartitions((MapPartitionsFunction<Row, Row>) iterator -> {
  HeavyObject obj = new HeavyObject();
  List<Row> list = new ArrayList<>();
  // 周遊處理資料
  while (iterator.hasNext()) {
    Row row = iterator.next();
    // 拼湊batch或逐條處理
    // ....
    obj.process(row)
    // batch add或逐條add
    list.add(...);
  }
  // 傳回list的疊代器
  return list.iterator();
}, RowEncoder.apply(schema));
           

熟悉mapPartitions的同學都知道,這段代碼完成了重量級對象的複用,相比map算子好像已經減少了大量GC,但這樣仍舊非常容易溢出。那麼:

  1. 為什麼mapPartitions算子容易溢出呢?
  2. 當多個mapPartitions算子串聯的時候又是如何GC的呢?

5. Spark Pipeline中的mapPartitions

在進行下一部分講解之前,我們先簡要介紹一下Spark的懶執行機制。Spark的算子分為Action和Transformation兩大類。RDD的依賴關系構成了資料處理的有向無環圖DAG。隻有當Action算子出現時,才會執行Action算子與前面一系列Transformation算子構成的DAG。Spark還會根據Shuffle将DAG劃分成多個Stage進行計算,Shuffle過程需要跨節點交換資料,會産生大量的磁盤IO和網絡IO。而每個Stage内的計算則構成了Pipeline,在記憶體中進行。

美團圖靈機器學習平台性能起飛的秘密(一)

圖4 多列詞典映射實驗圖

我們以上圖為例,該同學實驗中的多列詞典映射元件,對大量的特征做了詞典映射計算。多列詞典映射元件包含兩個部分,計算詞典和應用詞典。

計算詞典:通過去重和collect生成了各個特征的詞典,每個特征詞典的計算都伴随着1次Shuffle和1次Action。

應用詞典:将特征根據詞典映射成唯一ID,不存在Shuffle。

與Spark StringIndexer的Pipeline優化相似,當進行多個特征的詞典映射計算時,圖靈機器學習平台會将計算詞典的Action單獨執行,而多個應用詞典則一起執行。

詞典生成後,所有應用詞典的計算邏輯(mapPartitions Transformation)不存在Shuffle,是以被劃分到同一個Stage中,所有mapPartitions算子将串聯成一條非常長的Pipeline。最終由後面的Action算子觸發送出Job,執行該Pipeline。Stage的劃分可參考下圖:

美團圖靈機器學習平台性能起飛的秘密(一)

圖5 應用多個詞典Stage

應用詞典的實作中,每個mapPartitionsFunction中都建立了一個ArrayList充當Buffer來存儲計算後的資料,最終傳回ArrayList.iterator()。執行時,每次應用詞典都會将整個Partition的資料拉入ArrayList當中。上述詞典映射串聯構成Pipeline的時候,記憶體中會有多少資料呢?

帶着這個疑問,讓我們走進Spark的源代碼,看看mapPartitionsFunction是如何構成Spark Pipeline的。

Spark的一個Stage中會劃分為多個Task,除了union和coalesce的場景,1個Partition對應1個Task。Task的執行通過抽象方法runTask()完成,以實作類ResultTask為例,最後runTask()方法調用了rdd.iterator()。

ResultTask.scala

override def runTask(context: TaskContext): U = {
  ...... // 源碼縮略不進行展示:初始化一些需要的對象
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
  ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L
  // 這裡的func()調用了rdd.iterator()
  func(context, rdd.iterator(partition, context))
}
           

而RDD的iterator方法的源碼如下,其調用邏輯最終都會進入computeOrReadCheckpoint方法,若沒有CheckPoint則進入compute方法執行計算。以MapPartitionsRDD類為例,擷取父RDD的Iterator并傳入自己的計算邏輯函數f中。

RDD.scala

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context) // 内部依然調用下面的computeOrReadCheckpoint(partition, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}
// StorageLevel不為NONE時調用的方法
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
  ...... // 初始化相關變量
  SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
    readCachedBlock = false
    // 内部依然調用iterator()中的computeOrReadCheckpoint方法
    computeOrReadCheckpoint(partition, context)
  }) match {
    ...... // 源碼縮略不進行展示:按case包裝為對應iterator傳回
  }
}
// 預設調用該方法
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
  if (isCheckpointedAndMaterialized) {
    // 有checkpoint或materialized則傳回依賴關系中第一個父RDD的iterator
    firstParent[T].iterator(split, context)
  } else {
    // 調用目前RDD的compute方法計算,内部的計算邏輯包含了使用者編寫的代碼
    compute(split, context)
  }
}
           

MapPartitionsRDD.scala

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  // 使用者編寫的代碼邏輯被封裝為函數‘f’,在此接受參數後執行
  f(context, split.index, firstParent[T].iterator(split, context))
           

為了更清晰的解釋這個問題,以下述代碼為例。

Example

val rddA = initRDD(); // 擷取一個RDD
//funcA、funcB、funcC均為使用者的代碼邏輯
val rddB = rddA.mapPartitions(funcA)
val rddC = rddB.mapPartitions(funcB)
val rddD = rddC.mapPartitions(funcC)
rddD.count()
           

在遇到count算子時會進行RDD回溯,最終的形成計算鍊路為fCount(funcC(funcB(funcA(rddA.iterator=>iterator)))),由此構成了Pipeline,以多個mapPartitions + ArrayList.iterator()串聯的代碼展開則如下所示:

Example

iteratorA => // iteratorA:初始RDD對應Partition的輸出疊代器
  var list = List[Row]()
  while (iteratorA.hasNext) {
    list = process(iteratorA.next()) +: list // funcA:每條拉至記憶體處理後加入resultList
  }
  val iteratorB = list.iterator
iteratorB => // iteratorB:rddA對應Partition的輸出疊代器
  var list = List[Row]()
  while (iteratorB.hasNext) {
    list = process(iteratorB.next()) +: list // funcB:每條資料拉至記憶體處理後加入resultList
  }
  val iteratorC = list.iterator
iteratorC =>  // iteratorC:rddB對應Partition的輸出疊代器
  var list = List[Row]()
  while (iteratorC.hasNext) {
    list = process(iteratorC.next()) +: list // funcC:每條資料拉至記憶體處理後加入resultList
  }
  val iteratorD = list.iterator
iteratorD => count()
           

回看mapPartitions模闆,作為Buffer的ArrayList是每個mapPartitionsFunction的局部變量,ArrayList.iterator()引用了這個Buffer,結合上面的源碼我們知道,子RDD會引用父RDD的Iterator。結合該同學的實驗分析,每個RDD中的計算都形成了一個Array Buffer,在RDD的function調用鍊路中Array Buffer2依賴Array Buffer1.iterator(),Array Buffer3依賴Array Buffer2.iterator()。

以此類推,在計算RDD-3時,RDD-1的func1已經出棧,且RDD-3不依賴Array Buffer1.iterator(),是以局部變量Array Buffer1可以被GC。由此可見在Stage-應用多個詞典的計算過程中,記憶體占用的峰值達到了兩個Array Buffer,也就是兩倍partitionSize。

美團圖靈機器學習平台性能起飛的秘密(一)

圖6 應用多個詞典記憶體占用

為了完全證明這個想法,又進行了實際的測試驗證:初始化1個單Partition的RDD,并且該Partition的資料量為300萬,占用記憶體大約為180M。接着将這些資料利用多個mapPartitions + ArrayList.iterator()串聯,每輸入1個對象,生成1個新對象放入Buffer中,最後用rdd.count()觸發Action,整個執行流程中隻包含一個Stage。運作的JVM堆記憶體設定為512M,以此來觀察堆記憶體中的執行個體對象及其GC活動是否符合隻有兩個Buffer的預期。

觀察結果如下,每一行資料以一個GenericRowWithSchema執行個體存在并加入ArrayList中,其計算過程中最大的峰值正好為600萬即兩倍的分區資料量。GC以周期性的活動去銷毀上上個mapPartitions中的無用Buffer,并且堆記憶體保持在了最大約兩倍的資料占用量(約360M),是以驗證了推斷。以下是測試中的GenericRowWithSchema對象執行個體計數圖、記憶體實時占用以及GC活動統計圖。

美團圖靈機器學習平台性能起飛的秘密(一)

圖7 對象統計

美團圖靈機器學習平台性能起飛的秘密(一)

圖8 記憶體統計

經過測試驗證,mapPartitions + ArrayList.iterator()導緻了兩倍partitionSize的記憶體占用。

使用mapPartitions + ArrayList.iterator()僅僅隻是造成OOM或GC壓力大嗎?偏偏不巧,在Spark的記憶體管理中另有一番天地,會牽扯到更多的性能問題。

Spark記憶體管理機制

Spark從2.0開始使用的是統一記憶體管理機制,主要分為四大區域,System Reserved、User Memory、Storage Memory和Execution Memory。System Reserved是為系統預留使用的記憶體,User Memory是使用者定義的資料結構和Spark的中繼資料。存儲記憶體Storage Memory與執行記憶體Execution Memory在運作期間會共享一塊記憶體區域,預設有由spark.storage.storageFraction參數控制。Spark使用動态占用機制來管理這兩塊記憶體。

美團圖靈機器學習平台性能起飛的秘密(一)

圖9 Spark記憶體邏輯模型

Storage和Execution的動态占用機制

  1. 當Storage或Execution的記憶體不足、而對方的記憶體空餘時,可以占用對方的記憶體空間。
  2. Storage占用Execution時,如果Execution需要更多記憶體,則會将Storage占用的記憶體淘汰(根據RDD的StorageLevel決定是溢寫到磁盤還是直接删除),歸還借用的記憶體空間。
  3. Execution占用Storage時,如果Storage需要更多記憶體,則直接發生淘汰(Execution的邏輯複雜,歸還記憶體的難度非常高)。
  4. 從Storage中淘汰掉的RDD Cache會在RDD重新使用時再次Cache。

在涉及到mapPartitions + ArrayList.iterator()的執行過程中,由于大量的記憶體占用,導緻Execution Memory不足,借用Storage Memory,并且借用後仍存在記憶體不足情況時,Storage Memory中的已緩存的Block會進行淘汰機制,根據其存儲級别進行落盤或直接删除,這會導緻緩存資料多次的IO操作與重複計算,極大的降低了資料處理的效率。

美團圖靈機器學習平台性能起飛的秘密(一)

圖10 淘汰機制

讓我們小結一下mapPartitions + ArrayList.iterator()的實作方式:

  1. Spark通過mapPartitionsFunction嵌套實作Pipeline,例如fCount(funcC(funcB(funcA))),func中的Buffer是方法中的局部變量。
  2. 在mapPartitionsFunction中使用不限制長度的Buffer,會導緻partitionSize兩倍的資料拉入記憶體。
  3. 可能觸發Spark記憶體管理的淘汰機制,導緻緩存資料多次的IO操作與重複計算。

6. 最佳實踐

以多輸入多輸出為例,假設我們需要處理一批單個分區資料量達到千萬級别的資料集,以單個分區中每5行資料為一批次,每批次随機輸出2行資料,那麼在mapPartitions基礎上,可以這樣寫:

BatchIteratorDemo:mapPartitions處理多輸入->多輸出——以單分區每5行資料為一批次,每批次随機輸出2行資料的Demo

Dataset<Row> dataset = initDataset();// 初始化資料集
// mapPartitions中調用BatchIterator完成計算邏輯
Dataset<Row> result = dataset.mapPartitions((MapPartitionsFunction<Row, Row>) inputIterator -> new Iterator<Row>() {
  // 一批處理的資料行數
  private static final int INPUT_BATCH_PROCESS_SIZE = 5;
  // 目前批次處理的資料集
  private final List<Row> batchRows = new ArrayList<>(INPUT_BATCH_PROCESS_SIZE);
  // 目前批次輸出iterator
  private Iterator<Row> batchResult = Collections.emptyIterator();

  @Override
  public boolean hasNext() {
    // 本輪結果已全部消費,進入下一批次batch
    if (!batchResult.hasNext()) {
      batchRows.clear();
      int count = 0;
      // 按一個 batch 5條資料加入集合
      while (count++ < INPUT_BATCH_PROCESS_SIZE && inputIterator.hasNext()) {
        batchRows.add(inputIterator.next());
      }
      // 上遊資料全部消費
      if (batchRows.size() == 0) {
        return false;
      }
      // 随機擷取2條資料
      batchResult = processBatch(batchRows);// 随機抽取2條資料建立新對象傳回
    }
    return true;
  }

  @Override
    public Row next() {
      return batchResult.next();// 消費目前批次的結果
    }
}, RowEncoder.apply(dataset.schema()));
           

當該方式應用到fCount(funcC(funcB(funcA(rddA.iterator=>iterator))))構成的Pipeline時,以多個mapPartitions + ArrayList.iterator()串聯的代碼展開則如下所示:

Example

iteratorA => iteratorB =  // iteratorA:初始RDD對應Partition的輸出疊代器
  new Iterator[Row] {
    override def hasNext: Boolean = {
      processBatch(iteratorA) // 隻處理一個batch的資料
    }
    override def next(): Row = nextInBatch() // 擷取目前batch的下個輸出
  }
iteratorB => iteratorC =  // iteratorB:rddA對應Partition的結果疊代器
  new Iterator[Row] {
    override def hasNext: Boolean = {
      processBatch(iteratorB) // 隻處理一個batch的資料
    }
    override def next(): Row = nextInBatch() // 擷取目前batch的下個輸出
  }
iteratorC => iteratorD =  // iteratorC:rddB對應Partition的結果疊代器
  new Iterator[Row] {
    override def hasNext: Boolean = {
      processBatch(iteratorC) // 隻處理一個batch的資料
    }
    override def next(): Row = nextInBatch() // 擷取目前batch的下個輸出
  }
iteratorD => count()
           

我們可以看到,多輸入多輸出Demo以inputBatch=5、outputBatch=2作為消費機關,記憶體占用隻有Batch=7(inputBatch + outputBatch),每次處理完一個批次,直到目前批次産生的2條資料全部被下一個RDD Iterator消費完之後,才會繼續嘗試從上一個RDD Iterator讀取下一個批次進入記憶體計算,不需要為了傳回分區Iterator而直接消費整個分區資料。将随機抽取資料的邏輯串聯處理,其Stage将如下圖所示,每個Buffer僅為一個Batch,記憶體消耗幾乎可以忽略不計。

美團圖靈機器學習平台性能起飛的秘密(一)

圖11 Demo Stage

最終的資料處理效果對比如下圖:

美團圖靈機器學習平台性能起飛的秘密(一)

圖12 資料處理效果對比

7. 總結

本文作為《圖靈機器學習平台性能起飛的秘密》系列的第一篇,主要講述了記憶體優化中的算子優化技巧,深入分析了mapPartitions算子的原理,并提供了mapPartitions算子的最佳實踐。圖靈機器學習平台基于此方案進一步開發了BufferIterator架構,能夠靈活應對輸入M條資料輸出N條資料的場景,極大提升了圖靈的吞吐量。後續我們将繼續為大家介紹更多的優化技巧,敬請期待。

8. 作者簡介

  • 琦帆、立煌、兆軍等,均來自美團到家事業群/履約平台技術部。

| 本文系美團技術團隊出品,著作權歸屬美團。歡迎出于分享和交流等非商業目的轉載或使用本文内容,敬請注明“内容轉載自美團技術團隊”。本文未經許可,不得進行商業性轉載或者使用。任何商用行為,請發送郵件至[email protected]申請授權。

繼續閱讀