天天看點

spark core源碼閱讀-Storage shuffle(八)一 map shuffle二 reduce shuffle參考

spark core源碼閱讀-Storage shuffle(八)

本節主要介紹

RDD.aggregateByKey

導緻的shuffle,分兩部分map shuffle,reduce shuffle

一 map shuffle

ShuffleMapTask

中Task如何處理

rdd.iterator

,shuffle中Map端如何根據根據分區把資料寫入檔案.

主要類簡述

  • ShuffleManager

    Driver/Executors建立SparkEnv時建立該類,不同的ShuffleManager對應不同ShuffleWriter,

    通過Driver通過

    spark.shuffle.manager

    指定使用政策

    三個重要方法:

    registerShuffle

    :driver端初始化

    ShuffleDependency

    生成

    getWriter

    :在map任務中生成shuffle後的檔案

    getReader

    :在reduce任務中擷取,以從mappers中讀取被組合的記錄

    分類:

    • (1) HashShuffleManager

      1.2.0版本shuffle方式,有一些缺點,主要是大量小檔案,每一個mapper task根據reducer數量建立分區檔案,假設有M個Mapper,

      R個Reducer,總共會建立M*R個小檔案,如果有46k個Mapper和46k個reducer就會在叢集産生2billion的檔案

      目前的版本已經做了優化,在mapper端優化,executor中task寫同樣檔案,不過在1.5版本删除了SPARK-9808,

      因為有更好的基于排序的shuffle改進

    優點:
    • 快,不需要排序,不需要維護hash表
    • 沒有額外的排序記憶體消耗
    • 沒有IO開銷,一次性寫硬碟,一次性讀硬碟
    缺點:
    • partitions數量多,會産生大量小檔案,影響性能
    • 寫入檔案系統的大量檔案導緻IO傾向于随機IO,這通常比順序IO慢100倍
    • (2) SortShuffleManager

    default,在基于排序的shuffle中,rdd iter records将根據其目标分區ID進行排序寫入單個map輸出檔案.

    reducer擷取此檔案的連續區域以便閱讀他們的map輸出部分。 在map輸出資料太大而不适合緩存的情況下

    排序的輸出子集可以被分散到磁盤上,并且這些磁盤上的檔案被合并生成最終的輸出檔案。

  • ShuffleHandle

    registerShuffle

    ,根據不同場景選擇不同

    ShuffleWriter

  • ShuffleWriter
    • HashShuffleManager=>HashShuffleWriter

      過程如圖:

      spark core源碼閱讀-Storage shuffle(八)一 map shuffle二 reduce shuffle參考
    • SortShuffleManager

      Manager按照以下順序,根據不同條件選擇

      ShuffleHandle

      :
    • UnsafeShuffleWriter(Tungsten中優化項)

      SPARK-7081

      使用特殊高效記憶體排序

      ShuffleExternalSorter

      , 它對壓縮記錄指針和分區ID數組進行排序,通過在排序陣列中每個記錄

      僅使用8個位元組的空間,CPU緩存可以更有效地工作

      觸發條件:

      • 序列化政策支援如

        UnsafeRowSerializer,KryoSerializer

        ,能直接在二進制上操作不需要反序列化資料
      • 不需要聚合
      • 分區數不能超過16 million
    • BypassMergeSortShuffleWriter

      HashShuffleWriter

      SPARK-9808

      放在該類,在

      writePartitionedFile

      方法中合并所有分區檔案,這樣每個map一個

      output檔案,

      HashShuffleManager

      2billion的shuffle檔案就減少到46k

      觸發條件:

      • spark.shuffle.manager=sort

      • “reducers”<“spark.shuffle.sort.bypassMergeThreshold” (預設200)
    • SortShuffleWriter

      過程如圖:

      spark core源碼閱讀-Storage shuffle(八)一 map shuffle二 reduce shuffle參考
  • ExternalSorter

    通過Partitioner把key分區,在每個分區内排序,為每個分區輸出單個分區檔案

  • IndexShuffleBlockResolver

    邏輯塊與實體檔案Mapping,生成資料檔案塊索引檔案

  • PartitionedAppendOnlyMap

    具有以下特征:

    • AppendOnlyMap 記憶體存儲,隻能添加
    • WritablePartitionedPairCollection
      • 鍵值對都有分區資訊
      • 支援高效的記憶體排序iter
      • 支援WritablePartitionedIterator直接以位元組形式寫入内容
  • Sorter

    TimSort:是結合了合并排序(merge sort)和插入排序(insertion sort)而得出的排序算法

spark core源碼閱讀-Storage shuffle(八)一 map shuffle二 reduce shuffle參考

主要方法

  • insertAll

    spark core源碼閱讀-Storage shuffle(八)一 map shuffle二 reduce shuffle參考

    疊代器循環處理資料

    記憶體不足溢出到磁盤

    // Combine values in-memory first using our AppendOnlyMap
    val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
    if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    while (records.hasNext) {
    addElementsRead()
    kv = records.next()
    // map=>PartitionedAppendOnlyMap
    map.changeValue((getPartition(kv._1), kv._1), update)
    // 記憶體不足溢出到磁盤
    maybeSpillCollection(usingMap = true)
    }
               
  • writePartitionedFile

    将所有添加到ExternalSorter中的資料寫入磁盤存儲中的檔案
spark core源碼閱讀-Storage shuffle(八)一 map shuffle二 reduce shuffle參考

二 reduce shuffle

看一下

ShuffledRDD

中的

compute

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + , context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }
           
  • BlockStoreShuffleReader

    該方法中:

    • 通過

      ShuffleBlockFetcherIterator

      遠端擷取各個map端相應的分區資料
    • 需要map端合并,

      combineCombinersByKey

      ,使用

      ExternalAppendOnlyMap.insertAll

      資料結構緩存,記憶體不足溢出檔案
    • 需要排序,

      ExternalSorter.insertAll

      循環排序
    • sorter.iterator

      合并資料集(如果有多個溢出檔案,合并成一個)
    • 傳回

      CompletionIterator

參考

  • spark-architecture-shuffle
  • TungstenSecret

    TODO

  • TimSort JDK ComparableTimSort
  • RoaringBitmap