spark core源碼閱讀-Storage shuffle(八)
本節主要介紹
RDD.aggregateByKey
導緻的shuffle,分兩部分map shuffle,reduce shuffle
一 map shuffle
ShuffleMapTask
中Task如何處理
rdd.iterator
,shuffle中Map端如何根據根據分區把資料寫入檔案.
主要類簡述
-
ShuffleManager
Driver/Executors建立SparkEnv時建立該類,不同的ShuffleManager對應不同ShuffleWriter,
通過Driver通過
spark.shuffle.manager
指定使用政策
三個重要方法:
:driver端初始化registerShuffle
生成ShuffleDependency
:在map任務中生成shuffle後的檔案getWriter
getReader
:在reduce任務中擷取,以從mappers中讀取被組合的記錄
分類:
-
(1) HashShuffleManager
1.2.0版本shuffle方式,有一些缺點,主要是大量小檔案,每一個mapper task根據reducer數量建立分區檔案,假設有M個Mapper,
R個Reducer,總共會建立M*R個小檔案,如果有46k個Mapper和46k個reducer就會在叢集産生2billion的檔案
目前的版本已經做了優化,在mapper端優化,executor中task寫同樣檔案,不過在1.5版本删除了SPARK-9808,
因為有更好的基于排序的shuffle改進
- 快,不需要排序,不需要維護hash表
- 沒有額外的排序記憶體消耗
- 沒有IO開銷,一次性寫硬碟,一次性讀硬碟
- partitions數量多,會産生大量小檔案,影響性能
- 寫入檔案系統的大量檔案導緻IO傾向于随機IO,這通常比順序IO慢100倍
- (2) SortShuffleManager
default,在基于排序的shuffle中,rdd iter records将根據其目标分區ID進行排序寫入單個map輸出檔案.
reducer擷取此檔案的連續區域以便閱讀他們的map輸出部分。 在map輸出資料太大而不适合緩存的情況下
排序的輸出子集可以被分散到磁盤上,并且這些磁盤上的檔案被合并生成最終的輸出檔案。
-
- ShuffleHandle
,根據不同場景選擇不同registerShuffle
ShuffleWriter
- ShuffleWriter
-
HashShuffleManager=>HashShuffleWriter
過程如圖:
-
SortShuffleManager
Manager按照以下順序,根據不同條件選擇
:ShuffleHandle
-
UnsafeShuffleWriter(Tungsten中優化項)
SPARK-7081
使用特殊高效記憶體排序
ShuffleExternalSorter
, 它對壓縮記錄指針和分區ID數組進行排序,通過在排序陣列中每個記錄
僅使用8個位元組的空間,CPU緩存可以更有效地工作
觸發條件:
- 序列化政策支援如
,能直接在二進制上操作不需要反序列化資料UnsafeRowSerializer,KryoSerializer
- 不需要聚合
- 分區數不能超過16 million
- 序列化政策支援如
-
BypassMergeSortShuffleWriter
HashShuffleWriter
放在該類,在SPARK-9808
writePartitionedFile
方法中合并所有分區檔案,這樣每個map一個
output檔案,
HashShuffleManager
2billion的shuffle檔案就減少到46k
觸發條件:
-
spark.shuffle.manager=sort
- “reducers”<“spark.shuffle.sort.bypassMergeThreshold” (預設200)
-
-
SortShuffleWriter
過程如圖:
-
-
ExternalSorter
通過Partitioner把key分區,在每個分區内排序,為每個分區輸出單個分區檔案
-
IndexShuffleBlockResolver
邏輯塊與實體檔案Mapping,生成資料檔案塊索引檔案
-
PartitionedAppendOnlyMap
具有以下特征:
- AppendOnlyMap 記憶體存儲,隻能添加
- WritablePartitionedPairCollection
- 鍵值對都有分區資訊
- 支援高效的記憶體排序iter
- 支援WritablePartitionedIterator直接以位元組形式寫入内容
-
Sorter
TimSort:是結合了合并排序(merge sort)和插入排序(insertion sort)而得出的排序算法
主要方法
-
insertAll
疊代器循環處理資料
記憶體不足溢出到磁盤
// Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() // map=>PartitionedAppendOnlyMap map.changeValue((getPartition(kv._1), kv._1), update) // 記憶體不足溢出到磁盤 maybeSpillCollection(usingMap = true) }
-
将所有添加到ExternalSorter中的資料寫入磁盤存儲中的檔案writePartitionedFile
二 reduce shuffle
看一下
ShuffledRDD
中的
compute
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + , context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
-
BlockStoreShuffleReader
該方法中:
- 通過
遠端擷取各個map端相應的分區資料ShuffleBlockFetcherIterator
- 需要map端合并,
,使用combineCombinersByKey
資料結構緩存,記憶體不足溢出檔案ExternalAppendOnlyMap.insertAll
- 需要排序,
循環排序ExternalSorter.insertAll
-
合并資料集(如果有多個溢出檔案,合并成一個)sorter.iterator
- 傳回
CompletionIterator
- 通過
參考
- spark-architecture-shuffle
-
TungstenSecret
TODO
- TimSort JDK ComparableTimSort
- RoaringBitmap