mapTask并行度的決定機制
一個job的map階段并行度由用戶端在送出job時決定,而用戶端對map階段并行度的規劃的基本邏輯為:将待處理資料執行邏輯切片(即按照一個特定切片大小,将待處理資料劃分成邏輯上的多個split),然後每一個split配置設定一個mapTask并行執行個體處理。
FileInputFormat切片機制
原文和作者一起讨論:
http://www.cnblogs.com/intsmaze/p/6733968.html
1、預設切片定義在InputFormat類中的getSplit()方法
2、FileInputFormat中預設的切片機制:
a) 簡單地按照檔案的内容長度進行切片
b) 切片大小,預設等于hdfs的block大小
c) 切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片
比如待處理資料有兩個檔案:
file1.txt 260M
file2.txt 10M
經過FileInputFormat的切片機制運算後,形成的切片資訊如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~260 //如果剩餘的檔案長度/切片長度<=1.1則會将剩餘檔案的長度并未一個切片
file2.txt.split1-- 0~10M
3、FileInputFormat中切片的大小的參數配置
通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定。
minsize:預設值:1
配置參數: mapreduce.input.fileinputformat.split.minsize
maxsize:預設值:Long.MAXValue
配置參數:mapreduce.input.fileinputformat.split.maxsize
blocksize:值為hdfs的對應檔案的blocksize
配置讀取目錄下檔案數量的線程數:public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
是以,預設情況下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksize
maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小。
minsize(切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。
選擇并發數的影響因素:
1、運算節點的硬體配置
2、運算任務的類型:CPU密集型還是IO密集型
3、運算任務的資料量
3、hadoop2.6.4源碼解析
org.apache.hadoop.mapreduce.JobSubmitter類
//得到job的map任務的并行數量
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
切片計算邏輯,關注紅色字型代碼即可。
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
//周遊檔案,對每一個檔案進行如下處理:獲得檔案的blocksize,擷取檔案的長度,得到切片資訊(spilt 檔案路徑,切片編号,偏移量範圍)
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//保證切分的檔案長度最小不得小于1位元組
protected long getFormatMinSplitSize() {
return 1;
}
//如果沒有在conf中設定SPLIT_MINSIZE參數,則取預設值1位元組。
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
//得到切片檔案的最大長度
long maxSize = getMaxSplitSize(job);
//如果沒有在conf中設定SPLIT_MAXSIZE參數,則去預設值Long.MAX_VALUE位元組。
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
//讀取指定目錄下的所有檔案的資訊
List<FileStatus> files = listStatus(job);
//如果沒有指定開啟幾個線程讀取,則預設一個線程去讀檔案資訊,因為存在目錄下有上億個檔案的情況,是以有需要開啟多個線程加快讀取。
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
//計算切片檔案的邏輯大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
private static final double SPLIT_SLOP = 1.1; // 10% slop
//判斷剩餘檔案與切片大小的比是否為1.1.
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
map并行度
如果job的每個map或者reduce的task的運作時間都隻有30-40秒鐘(最好每個map的執行時間最少不低于一分鐘),那麼就減少該job的map或者reduce數。每一個task的啟動和加入到排程器中進行排程,這個中間的過程可能都要花費幾秒鐘,是以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
配置task的JVM重用可以改善該問題:
(mapred.job.reuse.jvm.num.tasks,預設是1,表示一個JVM上最多可以順序執行的task數目(屬于同一個Job)是1。也就是說一個task啟一個JVM)。
小檔案的場景下,預設的切片機制會造成大量的maptask處理很少量的資料,效率低下:
解決方案:
推薦:把小檔案存入hdfs之前進行預處理,先合并為大檔案後再上傳。
折中:寫程式對hdfs上小檔案進行合并再跑job處理。
補救措施:如果大量的小檔案已經存在hdfs上了,使用combineInputFormate元件,它可以将衆多的小檔案從邏輯上規劃到一個切片中,這樣多個小檔案就可以交給一個maptask操作了。
作者:
intsmaze(劉洋)出處:
http://www.cnblogs.com/intsmaze/老鐵,你的--->推薦,--->關注,--->評論--->是我繼續寫作的動力。
微信公衆号号:Apache技術研究院
由于部落客能力有限,文中可能存在描述不正确,歡迎指正、補充!
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。