天天看点

Spark Shuffle模块——Suffle Read过程分析

spark shuffle read调用栈如下:

1. org.apache.spark.rdd.shuffledrdd#compute()

2. org.apache.spark.shuffle.shufflemanager#getreader()

3. org.apache.spark.shuffle.hash.hashshufflereader#read()

4. org.apache.spark.storage.shuffleblockfetcheriterator#initialize()

5. org.apache.spark.storage.shuffleblockfetcheriterator#splitlocalremoteblocks()

org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()

org.apache.spark.storage.shuffleblockfetcheriterator#fetchlocalblocks()

下面是fetchlocalblocks()方法执行时涉及到的类和对应方法:

6. org.apache.spark.storage.blockmanager#getblockdata()

org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()

shufflemanager有两个子类,如果是hashshuffle 则对应的是org.apache.spark.shuffle.hash.hashshufflemanager#shuffleblockresolver()方法,该方法返回的是org.apache.spark.shuffle.fileshuffleblockresolver,再调用fileshuffleblockresolver#getblockdata()方法返回block数据

;如果是sort shuffle,则对应的是

org.apache.spark.shuffle.hash.sortshufflemanager#shuffleblockresolver(),该方法返回的是org.apache.spark.shuffle.indexshuffleblockresolver,然后再调用indexshuffleblockresolver#getblockdata()返回block数据。

下面是org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()方法执行时涉及到的类和对应方法

7.

org.apache.spark.network.shuffle.shuffleclient#fetchblocks

org.apache.spark.network.shuffle.shuffleclient有两个子类,分别是externalshuffleclient及blocktransferservice

,其中org.apache.spark.network.shuffle.blocktransferservice又有两个子类,分别是nettyblocktransferservice和nioblocktransferservice,对应两种不同远程获取block数据方式,spark 1.5.2中已经将nioblocktransferservice方式设置为deprecated,在后续版本中将被移除

下面按上述调用栈对各方法进行说明,这里只讲脉络,细节后面再讨论

task执行时,调用shuffledrdd的compute方法,其代码如下:

可以看到,其核心逻辑是通过调用shufflemanager#getreader()方法得到hashshufflereader对象,然后调用hashshufflereader#read()方法完成前一stage中shufflemaptask生成的shuffle 数据的读取。需要说明的是,无论是hash shuffle还是sort shuffle,使用的都是hashshufflereader。

跳到hashshufflereader#read()方法当中,其源码如下:

splitlocalremoteblocks()方法确定数据的读取策略,localblocks变量记录在本地机器的blockid,remoteblocks变量则用于记录所有在远程机器上的blockid。远程数据块被分割成最大为maxsizeinflight大小的fetchrequests

splitlocalremoteblocks()方法具有源码如下:

fetchlocalblocks()方法进行本地block的读取,调用的是blockmanager的getblockdata方法,其源代码如下:

跳转到blockmanager的getblockdata方法,可以看到其源代码如下:

org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()方法获取相应的shuffleblockresolver,如果是hash shuffle,则

是org.apache.spark.shuffle.fileshuffleblockresolver,如果是sort shuffle则org.apache.spark.shuffle.indexshuffleblockresolver。然后调用对应shuffleblockresolver的getblockdata方法,返回对应的filesegment。

fileshuffleblockresolver#getblockdata方法源码如下:

indexshuffleblockresolver#getblockdata方法源码如下:

sendrequest()方法用于从远程机器上获取数据

通过上面的代码可以看到,代码使用的是shuffleclient.fetchblocks进行远程block数据的获取,org.apache.spark.network.shuffle.shuffleclient有两个子类,分别是externalshuffleclient和blocktransferservice,而org.apache.spark.network.shuffle.blocktransferservice又有两个子类,分别是nettyblocktransferservice和nioblocktransferservice,shuffleclient 对象在 org.apache.spark.storage.blockmanager定义,其源码如下:

代码中的blocktransferservice在sparkenv中被初始化,具体如下:

继续阅读