spark shuffle read调用栈如下:
1. org.apache.spark.rdd.shuffledrdd#compute()
2. org.apache.spark.shuffle.shufflemanager#getreader()
3. org.apache.spark.shuffle.hash.hashshufflereader#read()
4. org.apache.spark.storage.shuffleblockfetcheriterator#initialize()
5. org.apache.spark.storage.shuffleblockfetcheriterator#splitlocalremoteblocks()
org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()
org.apache.spark.storage.shuffleblockfetcheriterator#fetchlocalblocks()
下面是fetchlocalblocks()方法执行时涉及到的类和对应方法:
6. org.apache.spark.storage.blockmanager#getblockdata()
org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()
shufflemanager有两个子类,如果是hashshuffle 则对应的是org.apache.spark.shuffle.hash.hashshufflemanager#shuffleblockresolver()方法,该方法返回的是org.apache.spark.shuffle.fileshuffleblockresolver,再调用fileshuffleblockresolver#getblockdata()方法返回block数据
;如果是sort shuffle,则对应的是
org.apache.spark.shuffle.hash.sortshufflemanager#shuffleblockresolver(),该方法返回的是org.apache.spark.shuffle.indexshuffleblockresolver,然后再调用indexshuffleblockresolver#getblockdata()返回block数据。
下面是org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()方法执行时涉及到的类和对应方法
7.
org.apache.spark.network.shuffle.shuffleclient#fetchblocks
org.apache.spark.network.shuffle.shuffleclient有两个子类,分别是externalshuffleclient及blocktransferservice
,其中org.apache.spark.network.shuffle.blocktransferservice又有两个子类,分别是nettyblocktransferservice和nioblocktransferservice,对应两种不同远程获取block数据方式,spark 1.5.2中已经将nioblocktransferservice方式设置为deprecated,在后续版本中将被移除
下面按上述调用栈对各方法进行说明,这里只讲脉络,细节后面再讨论
task执行时,调用shuffledrdd的compute方法,其代码如下:
可以看到,其核心逻辑是通过调用shufflemanager#getreader()方法得到hashshufflereader对象,然后调用hashshufflereader#read()方法完成前一stage中shufflemaptask生成的shuffle 数据的读取。需要说明的是,无论是hash shuffle还是sort shuffle,使用的都是hashshufflereader。
跳到hashshufflereader#read()方法当中,其源码如下:
splitlocalremoteblocks()方法确定数据的读取策略,localblocks变量记录在本地机器的blockid,remoteblocks变量则用于记录所有在远程机器上的blockid。远程数据块被分割成最大为maxsizeinflight大小的fetchrequests
splitlocalremoteblocks()方法具有源码如下:
fetchlocalblocks()方法进行本地block的读取,调用的是blockmanager的getblockdata方法,其源代码如下:
跳转到blockmanager的getblockdata方法,可以看到其源代码如下:
org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()方法获取相应的shuffleblockresolver,如果是hash shuffle,则
是org.apache.spark.shuffle.fileshuffleblockresolver,如果是sort shuffle则org.apache.spark.shuffle.indexshuffleblockresolver。然后调用对应shuffleblockresolver的getblockdata方法,返回对应的filesegment。
fileshuffleblockresolver#getblockdata方法源码如下:
indexshuffleblockresolver#getblockdata方法源码如下:
sendrequest()方法用于从远程机器上获取数据
通过上面的代码可以看到,代码使用的是shuffleclient.fetchblocks进行远程block数据的获取,org.apache.spark.network.shuffle.shuffleclient有两个子类,分别是externalshuffleclient和blocktransferservice,而org.apache.spark.network.shuffle.blocktransferservice又有两个子类,分别是nettyblocktransferservice和nioblocktransferservice,shuffleclient 对象在 org.apache.spark.storage.blockmanager定义,其源码如下:
代码中的blocktransferservice在sparkenv中被初始化,具体如下: