天天看點

Spark Shuffle子產品——Suffle Read過程分析

spark shuffle read調用棧如下:

1. org.apache.spark.rdd.shuffledrdd#compute()

2. org.apache.spark.shuffle.shufflemanager#getreader()

3. org.apache.spark.shuffle.hash.hashshufflereader#read()

4. org.apache.spark.storage.shuffleblockfetcheriterator#initialize()

5. org.apache.spark.storage.shuffleblockfetcheriterator#splitlocalremoteblocks()

org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()

org.apache.spark.storage.shuffleblockfetcheriterator#fetchlocalblocks()

下面是fetchlocalblocks()方法執行時涉及到的類和對應方法:

6. org.apache.spark.storage.blockmanager#getblockdata()

org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()

shufflemanager有兩個子類,如果是hashshuffle 則對應的是org.apache.spark.shuffle.hash.hashshufflemanager#shuffleblockresolver()方法,該方法傳回的是org.apache.spark.shuffle.fileshuffleblockresolver,再調用fileshuffleblockresolver#getblockdata()方法傳回block資料

;如果是sort shuffle,則對應的是

org.apache.spark.shuffle.hash.sortshufflemanager#shuffleblockresolver(),該方法傳回的是org.apache.spark.shuffle.indexshuffleblockresolver,然後再調用indexshuffleblockresolver#getblockdata()傳回block資料。

下面是org.apache.spark.storage.shuffleblockfetcheriterator#sendrequest()方法執行時涉及到的類和對應方法

7.

org.apache.spark.network.shuffle.shuffleclient#fetchblocks

org.apache.spark.network.shuffle.shuffleclient有兩個子類,分别是externalshuffleclient及blocktransferservice

,其中org.apache.spark.network.shuffle.blocktransferservice又有兩個子類,分别是nettyblocktransferservice和nioblocktransferservice,對應兩種不同遠端擷取block資料方式,spark 1.5.2中已經将nioblocktransferservice方式設定為deprecated,在後續版本中将被移除

下面按上述調用棧對各方法進行說明,這裡隻講脈絡,細節後面再讨論

task執行時,調用shuffledrdd的compute方法,其代碼如下:

可以看到,其核心邏輯是通過調用shufflemanager#getreader()方法得到hashshufflereader對象,然後調用hashshufflereader#read()方法完成前一stage中shufflemaptask生成的shuffle 資料的讀取。需要說明的是,無論是hash shuffle還是sort shuffle,使用的都是hashshufflereader。

跳到hashshufflereader#read()方法當中,其源碼如下:

splitlocalremoteblocks()方法确定資料的讀取政策,localblocks變量記錄在本地機器的blockid,remoteblocks變量則用于記錄所有在遠端機器上的blockid。遠端資料塊被分割成最大為maxsizeinflight大小的fetchrequests

splitlocalremoteblocks()方法具有源碼如下:

fetchlocalblocks()方法進行本地block的讀取,調用的是blockmanager的getblockdata方法,其源代碼如下:

跳轉到blockmanager的getblockdata方法,可以看到其源代碼如下:

org.apache.spark.shuffle.hash.shufflemanager#shuffleblockresolver()方法擷取相應的shuffleblockresolver,如果是hash shuffle,則

是org.apache.spark.shuffle.fileshuffleblockresolver,如果是sort shuffle則org.apache.spark.shuffle.indexshuffleblockresolver。然後調用對應shuffleblockresolver的getblockdata方法,傳回對應的filesegment。

fileshuffleblockresolver#getblockdata方法源碼如下:

indexshuffleblockresolver#getblockdata方法源碼如下:

sendrequest()方法用于從遠端機器上擷取資料

通過上面的代碼可以看到,代碼使用的是shuffleclient.fetchblocks進行遠端block資料的擷取,org.apache.spark.network.shuffle.shuffleclient有兩個子類,分别是externalshuffleclient和blocktransferservice,而org.apache.spark.network.shuffle.blocktransferservice又有兩個子類,分别是nettyblocktransferservice和nioblocktransferservice,shuffleclient 對象在 org.apache.spark.storage.blockmanager定義,其源碼如下:

代碼中的blocktransferservice在sparkenv中被初始化,具體如下:

繼續閱讀