MapReduce切片機制
為什麼需要切片
MapReduce是一個分布式計算架構,處理的是海量資料的計算。那麼并行運算必不可免,但是到底并行多少個Map任務來計算呢?每個Map任務計算哪些資料呢?這些我們資料我們不能夠憑空估計,隻能根據實際資料的存儲情況來動态配置設定,而我們要介紹的切片就是要解決這個問題,
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iM0E2MwUzMygDN0ITOwMDO4YmN0MDMxczNjF2Y4MjYw8CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.png)
切片機制原理
切片的規則我們需要通過閱讀源代碼來了解。首先我們來看下hadoop中預設的兩個參數配置
1.預設參數
官網位址:
http://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xmlmapreduce.job.split.metainfo.maxsize 10000000
mapreduce.input.fileinputformat.split.minsize 0
2. 源碼檢視
注意:SPLIT_SLOP = 1.1,即當劃分後剩餘檔案大小除splitSize大于1.1時,循環繼續,小于1.1時退出循環,将剩下的檔案大小歸到一個切片上去。
// 128MB
long blockSize = file.getBlockSize();
// 128MB
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// 檔案的大小 260MB
long bytesRemaining = length;
// 第一次 260/128=2.x > 1.1
// 第二次 132/128=1.03 <1.1 不執行循環
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 擷取塊的索引
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 将塊的資訊儲存到splits集合中
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
// 260-128=132MB
bytesRemaining -= splitSize;
}
// 将剩餘的132MB添加到splits集合中
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
3.切片總結
FileInputFormat中預設的切片機制
簡單地按照檔案的内容長度進行切片
切片大小,預設等于block大小,可以通過調整參數修改,注意1.1的問題
切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片
一個切片(split)對應一個MapTask事例
一個job的map階段并行度由用戶端在送出job時決定
比如待處理資料有兩個檔案:
file1.txt 260M
file2.txt 10M
經過FileInputFormat的切片機制運算後,形成的切片資訊如下
file1.txt.split1-- 0~128
file1.txt.split2-- 128~260
file2.txt.split1-- 0~10M。