這一篇我們來分析Spark2.1的Shuffle流程。
其實ShuffleDependency從SparkContext初始化就已經被DAGScheduler劃分好了,本文主要探讨在Task運作過程中的ShufleWrite和ShuffleRead。
要從Task運作開始說起,就要知道Task在哪裡運作的。我們普遍認為Executor是負責執行Task的,但是我們發現Executor其實就是一個類
private[spark] class Executor(){}
而在一個Application送出後,用JPS指令檢視,會發現有AplicationMaster程序和CoarseGrainedExecutorBackend程序。沒錯,後者就是真實管理運作Task的程序。
CoarseGrainedExecutorBackend
做為一個程序,CoarseGrainedExecutorBackend負責與ApplicationMaster進行RPC通信,其中有一個receive方法重寫于RpcEndpoint,負責接收消息,當收到啟動task的指令時,就會調用Executor的launchTask方法,此方法負責啟動task。
Executor類中還有一個TaskRunner内部類,是一個實作了Runnable接口的線程。啟動task也就是将序列化task的描述資訊封裝成一個TaskRunner,将其放線上程池中運作。
//Executor類
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
// Runnable 接口的對象.
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
// 線上程池中執行 task
threadPool.execute(tr)
}
由此我們看出,其實負責執行task的還是Executor,而與ApplicationMaster通信的卻是CoarseGrainedExecutorBackend,也就是CoarseGrainedExecutorBackend是一個中間代理商。
TaskRunner的run方法是task的具體實作邏輯。
@volatile var task: Task[Any] = _
override def run(): Unit = {
//...
// 1.更新 task 的狀态
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
// 2.把任務相關的資料反序列化出來
val (taskFiles, taskJars, taskProps, taskBytes) = Task.deserializeWithDependencies(serializedTask)
//3.反序列化确定task屬于哪個類型
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
// 4.開始運作 task
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
上述代碼第3步,确定task是哪個類型,Task是個抽象類,有以下實作類:

主要實作為:ShuffleMapTask和ResultTask。
Spark中的Stage分為兩種,ShuffleMapStage和ResultStage,ResultStage就是finallStage,也就是action算子所在的Stage,前面的所有Stage都是ShuffleMapStage。ShuffleMapStage對應的Task就是ShuffleMapTask,而ResultStage對應的Task就是ResultTask。
ShufleWriter
也就是說,首先運作的task是ShuffleMapTask。那麼上面第4步的task.run會調用ShuffleMapTask的runtask方法。
//ShuffleMapTask類
override def runTask(context: TaskContext): MapStatus = {
//...
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
//1.擷取ShuffleWriter,傳入一個shuffleHandle
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//2.寫資料
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
//...
}
如上代碼中所知,ShuffleWriter為ShuffleManager.getWriter所得。
getWriter
ShuffleManager為一個trait,其實作類隻有1個,為SortShuffleManager。
并且傳入了一個shuffleHandle,點進去發現在shuffleDependency方法下被初始化:
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
點開此SortShuffleManager的registerShuffle方法:
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
此方法用于注冊shuffle,也就是選擇使用何種shuffle。
先看第一種:BypassMergeSortShuffleHandle
private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
//如果需要在map端聚合,如reduceByKey算子,則不能使用。
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {//擷取"spark.shuffle.sort.bypassMergeThreshold"參數的值,預設為200
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
//分區數 <= 200使用
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
}
使用此Shuffle方式有兩個條件:
- 在map端不能有聚合;
- 分區數<=200;
第二種:canUseSerializedShuffle
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
//需要支援序列化重定向
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
}
//不能在map端聚合
else if (dependency.aggregator.isDefined) {
log.debug(
s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined")
false
}
//分區數大于16777216(2^24)
else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
使用此Shuffle方式有三個條件:
- 支援序列化重定向,Serializer可以對已經序列化的對象進行排序,這種排序起到的效果和先對資料排序再序列化一緻。支援relocation的Serializer是KryoSerializer和SparkSQL的custom serializers。
- 不能在map端進行聚合。
- 分區數不能大于最大分區數2^24個。
如果不符合以上兩種,則選擇BaseShuffleHandle
接下來回過頭來看getWriter方法:
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
// 根據不同的 Handle, 建立不同的 ShuffleWriter
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[[email protected], [email protected]] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[[email protected], [email protected]] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[[email protected], [email protected], _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
據上所知,Handle對應的ShuffleWrite為:
- SerializedShuffleHandle -> UnsafeShuffleWriter
- BypassMergeSortShuffleHandle -> BypassMergeSortShuffleWriter
- BaseShuffleHandle -> SortShuffleWriter
Write
BypassMergeSortShuffleWriter
此ShuffleWriter缺點比較大,僅供不需要聚合排序,分區數小于等于200時使用,其内部原理簡單,但是需要注意其中一行代碼:
public void write(Iterator<Product2<K, V>> records) throws IOException {
//...
partitionWriters = new DiskBlockObjectWriter[numPartitions];
//...
}
DiskBlockObjectWriter為緩沖塊block,此方式為每個partition都開辟了一塊32K大小的block緩沖區,當partition數量為10000個時,單一個MapTask就需要約320M記憶體,記憶體消耗過大,這也就是分區數小于200個使用的原因。
最後會将資料封裝成一個MapStatus發送給Driver。
SortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 一:
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
// 将 Map 任務的輸出記錄插入到緩存中
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
// 資料 shuffle 資料檔案
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try { // 将 map 端緩存的資料寫入到磁盤中, 并生成 Block 檔案對應的索引檔案.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// 記錄各個分區資料的長度
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 生成 Block 檔案對應的索引檔案. 此索引檔案用于記錄各個分區在 Block檔案中的偏移量, 以便于
// Reduce 任務拉取時使用
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
- 根據不同的場景選取不同的方式。如果map端開啟了聚合,則
new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
傳入了一個聚合器和一個排序器。而如果沒有聚合,則
new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
是以隻有在map端需要聚合的算子才會排序,像sortByKey這種算子map不會排序,而是放在reduce端排序。
- 接下來一步 sorter.insertAll(records)
records就是write方法的形參,也就是我們傳過來的資料。那這個insertAll是幹嘛的?
insertAll方法是我們第一步new出來的ExternalSorter類中的方法,
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
//循環放資料
while (records.hasNext) {
//記錄自上次spill以來記憶體中的資料條數
addElementsRead()
//資料
kv = records.next()
//PartitionedAppendOnlyMap中update資料
map.changeValue((getPartition(kv._1), kv._1), update)
//檢查資料是否存放的下,如果存放不下則會擴充記憶體或者spill
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
//PartitionedPairBuffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
為了提高聚合和排序的性能,Spark為ShuffleWriter的聚合排序過程設計了兩種資料結構:
- PartitionedAppendOnlyMap:如果設定了Aggregator,我們可以将對象放入AppendOnlyMap中以将它們組合在一起
- PartitionedPairBuffer:如果沒有設定Aggregator,則放入buffer中。
下面我們先來看看這兩個資料結構
PartitionedAppendOnlyMap:是AppendOnlyMap的間接子類,在ShuffleWriter端使用,其内部如下:
操作資料僅有一個insert方法。調用的是其父類AppendOnlyMap的update方法。
AppendOnlyMap
此類是一個類似于HashMap的資料結構,使用Hash表進行尋址,但不同于HashMap的是,AppendOnlyMap隻有update操作(暫且認為添加也算update),因為Shuffle的資料不需要删除,隻需要聚合,是以這樣設計一個輕量級的HashMap合情合理。另外一點不同的是,AppendOnlyMap解決Hash碰撞的方法為具有2的幂的哈希表大小的開放位址法的二次探測法,而HashMap為連結清單法。另外需要注意,AppendOnlyMap最多可存放375809638(0.7 * 2 ^ 29)個元素,0.7為加載因子,也就是當使用率為70%時擴容為原來的2倍。
至于怎麼實作添加操作,如果讀者感興趣可以搜此類的update方法詳細了解。
PartitionedPairBuffer:本質上類似于一個Array數組,僅供排序使用,可按partitionID或partitionID+Key排序。最多支援1073741823 (2 ^ 30 - 1)個元素。
回過頭來看上面的insertAll方法,如果有Aggregator,則使用PartitionedAppendOnlyMap,否則使用PartitionedPairBuffer。在最後,都有一個maybeSpillCollection操作,此方法是判斷每條資料插入記憶體緩沖區之後,是否需要spill檔案。
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
//估計目前集合大小
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {//判斷是否可以spill
//如果spill了,就新開辟一塊新記憶體
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
//記錄最大使用記憶體量
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
首先估計了目前緩沖區(集合)的大小,這一步是比較困難的。因為雖然我們知道AppendOnlyMap中持有的資料的長度和大小,但是數組裡面存放的是Key和Value的引用,并不是實際對象的大小,而且Value會不斷更新,實際大小不斷變化。是以想要準确的擷取到其大小相當困難,如果簡單的每次插入一條資料都掃描數組中的record并對其大小相加,那麼時間複雜度太高會極大影響效率。此estimateSize方法是Spark設計的一個增量式的高效估算算法,在每個record插入或更新時根據曆史統計值和目前變化量直接估算目前AppendOnlyMap的大小,算法時間複雜度為O(1),開銷很小。在record插入和聚合過程中會定期對目前AppendOnlyMap中的record進行抽樣,然後精确計算這些record的總大小,總個數,更新個數及平均值等,并作為曆史統計值。進行抽樣是因為record個數過多,難以對每個record進行精确計算。之後,每當有record插入或更新時,會根據曆史統計值和曆史平均的變化值,估算AppendOnlyMap的總大小。
拿到估算大小之後,就會嘗試spill,maybeSpill方法如下:
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
//elementsRead:自上次spill後,讀取的元素個數。myMemoryThreshold:預設5M
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
//申請記憶體,擴容2倍。
//擴容2倍需要開辟的新記憶體。比如已占用6M,則需要新申請 12-5 = 7M
val amountToRequest = 2 * currentMemory - myMemoryThreshold
//實際申請到的新記憶體
val granted = acquireMemory(amountToRequest)
//現在實際擁有的記憶體。在記憶體足夠用也就是可以全部申請來的時候,myMemoryThreshold=2*currentMemory
myMemoryThreshold += granted
//如果擁有的記憶體小于已使用的記憶體,則會spill
shouldSpill = currentMemory >= myMemoryThreshold
}
//如果資料量過大,已經超出了long的最大值,則仍然會spill
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
if (shouldSpill) {// 執行spill
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
elementsRead為緩沖區元素個數,如果是32的倍數,并且使用記憶體已經大于預設的spill門檻值(預設5M),則擴容兩倍。如果擴容之後的容量還不足以放下目前使用容量,則溢出。舉個例子:假若目前使用了8M記憶體,也就是currentMemory=8M,而myMemoryThreshold=5M,是以會擴容到16M,需要新申請16-5=11M,但是我隻申請到了1M,那麼此時myMemoryThreshold=6M 小于 currentMemory,就需要溢出。另外如果元素個數超過了Long類型的最大值,則也會spill。
在insertAll方法之後,會将資料封裝成一個MapStatus發送給Driver。
UnsafeShuffleWriter
此方式是Spark2.0為提高效率新加入的方式,核心思想是在堆外記憶體中操作序列化的record對象(二進制資料),降低記憶體消耗和GC開銷。正好彌補了分區數超過200時BypassMergeSortShuffleWriter的不足。
Stop
至此stop,Shuffle的Wirte階段結束。