spark shuffle read調用棧如下:
1. org.apache.spark.rdd.shuffledrdd#compute()
2. org.apache.spark.shuffle.shufflemanager#getreader()
3. org.apache.spark.shuffle.hash.hashshufflereader#read()
4. org.apache.spark.storage.shuffleblockfetcheriterator#initialize()
5. org.apache.spark.storage.shuffleblockfetcheriterator#splitlocalremoteblocks()
org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()
org.apache.spark.storage.shuffleblockfetcheriterator#fetchlocalblocks()
下面是fetchlocalblocks()方法執行時涉及到的類和對應方法:
6. org.apache.spark.storage.blockmanager#getblockdata()
org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()
shufflemanager有兩個子類,如果是hashshuffle 則對應的是org.apache.spark.shuffle.hash.hashshufflemanager#shuffleblockresolver()方法,該方法傳回的是org.apache.spark.shuffle.fileshuffleblockresolver,再調用fileshuffleblockresolver#getblockdata()方法傳回block資料
;如果是sort shuffle,則對應的是
org.apache.spark.shuffle.hash.sortshufflemanager#shuffleblockresolver(),該方法傳回的是org.apache.spark.shuffle.indexshuffleblockresolver,然後再調用indexshuffleblockresolver#getblockdata()傳回block資料。
下面是org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()方法執行時涉及到的類和對應方法
7.
org.apache.spark.network.shuffle.shuffleclient#fetchblocks
org.apache.spark.network.shuffle.shuffleclient有兩個子類,分别是externalshuffleclient及blocktransferservice
,其中org.apache.spark.network.shuffle.blocktransferservice又有兩個子類,分别是nettyblocktransferservice和nioblocktransferservice,對應兩種不同遠端擷取block資料方式,spark 1.5.2中已經将nioblocktransferservice方式設定為deprecated,在後續版本中将被移除
下面按上述調用棧對各方法進行說明,這裡隻講脈絡,細節後面再讨論
task執行時,調用shuffledrdd的compute方法,其代碼如下:
可以看到,其核心邏輯是通過調用shufflemanager#getreader()方法得到hashshufflereader對象,然後調用hashshufflereader#read()方法完成前一stage中shufflemaptask生成的shuffle 資料的讀取。需要說明的是,無論是hash shuffle還是sort shuffle,使用的都是hashshufflereader。
跳到hashshufflereader#read()方法當中,其源碼如下:
splitlocalremoteblocks()方法确定資料的讀取政策,localblocks變量記錄在本地機器的blockid,remoteblocks變量則用于記錄所有在遠端機器上的blockid。遠端資料塊被分割成最大為maxsizeinflight大小的fetchrequests
splitlocalremoteblocks()方法具有源碼如下:
fetchlocalblocks()方法進行本地block的讀取,調用的是blockmanager的getblockdata方法,其源代碼如下:
跳轉到blockmanager的getblockdata方法,可以看到其源代碼如下:
org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()方法擷取相應的shuffleblockresolver,如果是hash shuffle,則
是org.apache.spark.shuffle.fileshuffleblockresolver,如果是sort shuffle則org.apache.spark.shuffle.indexshuffleblockresolver。然後調用對應shuffleblockresolver的getblockdata方法,傳回對應的filesegment。
fileshuffleblockresolver#getblockdata方法源碼如下:
indexshuffleblockresolver#getblockdata方法源碼如下:
sendrequest()方法用于從遠端機器上擷取資料
通過上面的代碼可以看到,代碼使用的是shuffleclient.fetchblocks進行遠端block資料的擷取,org.apache.spark.network.shuffle.shuffleclient有兩個子類,分别是externalshuffleclient和blocktransferservice,而org.apache.spark.network.shuffle.blocktransferservice又有兩個子類,分别是nettyblocktransferservice和nioblocktransferservice,shuffleclient 對象在 org.apache.spark.storage.blockmanager定義,其源碼如下:
代碼中的blocktransferservice在sparkenv中被初始化,具體如下: