天天看點

MapReduce切片機制

MapReduce切片機制

為什麼需要切片

 MapReduce是一個分布式計算架構,處理的是海量資料的計算。那麼并行運算必不可免,但是到底并行多少個Map任務來計算呢?每個Map任務計算哪些資料呢?這些我們資料我們不能夠憑空估計,隻能根據實際資料的存儲情況來動态配置設定,而我們要介紹的切片就是要解決這個問題,

MapReduce切片機制

切片機制原理

 切片的規則我們需要通過閱讀源代碼來了解。首先我們來看下hadoop中預設的兩個參數配置

1.預設參數

官網位址:

http://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml

mapreduce.job.split.metainfo.maxsize    10000000
mapreduce.input.fileinputformat.split.minsize   0      
MapReduce切片機制

2. 源碼檢視

MapReduce切片機制
MapReduce切片機制
MapReduce切片機制
MapReduce切片機制
MapReduce切片機制

注意:SPLIT_SLOP = 1.1,即當劃分後剩餘檔案大小除splitSize大于1.1時,循環繼續,小于1.1時退出循環,将剩下的檔案大小歸到一個切片上去。

// 128MB
 long blockSize = file.getBlockSize();
 // 128MB
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
 // 檔案的大小 260MB
 long bytesRemaining = length;
 // 第一次 260/128=2.x > 1.1
 // 第二次 132/128=1.03 <1.1 不執行循環
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
   // 擷取塊的索引
   int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
   // 将塊的資訊儲存到splits集合中
   splits.add(makeSplit(path, length-bytesRemaining, splitSize,
               blkLocations[blkIndex].getHosts(),
               blkLocations[blkIndex].getCachedHosts()));
   // 260-128=132MB
   bytesRemaining -= splitSize;
 }
 // 将剩餘的132MB添加到splits集合中
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
           blkLocations[blkIndex].getHosts(),
           blkLocations[blkIndex].getCachedHosts()));
}      

3.切片總結

FileInputFormat中預設的切片機制

   簡單地按照檔案的内容長度進行切片

   切片大小,預設等于block大小,可以通過調整參數修改,注意1.1的問題

   切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片

   一個切片(split)對應一個MapTask事例

   一個job的map階段并行度由用戶端在送出job時決定

比如待處理資料有兩個檔案:
    file1.txt    260M
    file2.txt    10M
經過FileInputFormat的切片機制運算後,形成的切片資訊如下
    file1.txt.split1--  0~128
    file1.txt.split2--  128~260
    file2.txt.split1--  0~10M。      

繼續閱讀