文章目錄
- 一、 Job送出的流程
- 二、 MapTask的工作機制
- 三、 Shuffle流程(溢寫,歸并)
- 四、 ReduceTask工作機制
注意:
- 通過WordCount程式為例進行調試
- 是在本地模式進行的,是以N個MapTask 和 N個 ReduceTask沒有并行的效果。
- 如果在叢集上,N個 MapTask 和 N 個ReduceTask 是并行運作.
一、 Job送出的流程
方法層級:1 > 1) > (1) > <1> > ① > [1] > {1}
1. job.waitForCompletion(true); //在Driver中送出job
1)sumbit() //送出
(1)connect():
<1>return new Cluster(getConfiguration());
/*
通過YarnClientProtocolProvider或LocalClientProtocolProvider
根據配置檔案的參數資訊擷取目前job需要執行到本地還是Yarn
最終:LocalClientProtocolProvider ==> LocalJobRunner
*/
① initialize(jobTrackAddr, conf);
//送出job
(2) return submitter.submitJobInternal(Job.this, cluster);
// 檢查job的輸出路徑。
<1> .checkSpecs(job);
<2> . Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//生成Job送出的臨時目錄:
//D:\tmp\hadoop\mapred\staging\Administrator1777320722\.staging
<3> . JobID jobId = submitClient.getNewJobID(); //為目前Job生成Id
<4> . Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//Job的送出路徑d:/tmp/hadoop/mapred/staging/Administrator1777320722/.staging/job_local1777320722_0001
<5> . copyAndConfigureFiles(job, submitJobDir);
① rUploader.uploadResources(job, jobSubmitDir);
[1] uploadResourcesInternal(job, submitJobDir);
{1}.submitJobDir = jtFs.makeQualified(submitJobDir);
//建立Job的送出路徑
mkdirs(jtFs, submitJobDir, mapredSysPerms);
//生成切片資訊 ,并傳回切片的個數
<6> . int maps = writeSplits(job, submitJobDir);
//通過切片的個數設定MapTask的個數
<7> . conf.setInt(MRJobConfig.NUM_MAPS, maps);
//将目前Job相關的配置資訊寫到job送出路徑下
<8> . writeConf(conf, submitJobFile);
//此時通過檢視發現路徑下有: job.split job.splitmetainfo job.xml xxx.jar
//此時才真正送出Job
<9> .status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
//等job執行完成後,删除Job的臨時工作目錄的内容
<10> . jtFs.delete(submitJobDir, true);
二、 MapTask的工作機制
1. 從Job送出流程代碼解析裡面的的(2)--><9> 進去
//構造真正執行的Job , LocalJobRunnber$Job
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
2. LocalJobRunnber$Job 的run()方法
1)TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);// 讀取job.splitmetainfo
2)int numReduceTasks = job.getNumReduceTasks(); // 擷取ReduceTask個數
3) // 根據切片的個數, 建立執行MapTask的 MapTaskRunnable
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);
4)ExecutorService mapService = createMapExecutor(); // 建立線程池
5) runTasks(mapRunnables, mapService, "map"); //執行 MapTaskRunnable
6) 因為Runnable送出給線程池執行,接下來會執行MapTaskRunnable的run方法。
7) 執行 LocalJobRunner$Job$MapTaskRunnable 的run()方法.
(1) MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1); //建立MapTask對象
(2) map.run(localConf, Job.this); //執行MapTask中的run方法
<1> .runNewMapper(job, splitMetaInfo, umbilical, reporter);
① org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = JobContextImpl
② org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = WordConutMapper
③ org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = TextInputFormat
④ split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset();
//重構切片對象
//切片對象的資訊 : file:/D:/input/inputWord/JaneEyre.txt:0+36306679
⑤org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = MapTask$NetTrackingRecordReader
⑥output = new NewOutputCollector(taskContext, job, umbilical, reporter); //構造緩沖區對象
[1] collector = createSortingCollector(job, reporter); //擷取緩沖區對象
MapTask$MapOutputBuffer
{1} . collector.init(context); //初始化緩沖區對象
1>>.final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);// 預設溢寫百分比 0.8
2>>.final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,MRJobConfig.DEFAULT_IO_SORT_MB); // 預設緩沖區大小 100M
3>>.sorter = ReflectionUtils.newInstance(job.getClass(MRJobConfig.MAP_SORT_CLASS, QuickSort.class,IndexedSorter.class), job);
// 排序對象。排序使用的是快排,并且隻是基于索引排序。
4>> . // k/v serialization // kv序列化
5>> . // output counters // 計數器
6>> . // compression // 壓縮
7>> . // combiner // combiner
⑦ mapper.run(mapperContext);// 執行WordCountMapper中的run方法。 實際執行的是WordCountMapper繼承的Mapper中的run方法。
[1] . 在Mapper中的run方法中 : map(context.getCurrentKey(), context.getCurrentValue(), context);//執行到WordCountMapper中的map方法。
[2] . 在WordCountMapper中的map方法中将kv寫出 : context.write(outK,outV);
三、 Shuffle流程(溢寫,歸并)
1. map中的kv持續往 緩沖區寫, 會達到溢寫條件,發生溢寫,最後發生歸并。
2. map中的 context.write(k,v)
1) . mapContext.write(key, value);
(1). output.write(key, value);
<1> collector.collect(key, value,partitioner.getPartition(key, value, partitions));
// 将map寫出的kv 計算好分區後,收集到緩沖區中。
<2> . 當滿足溢寫條件後 ,開始發生溢寫
startSpill();
① spillReady.signal(); //線程間通信,通知溢寫線程開始溢寫
② 溢寫線程調用 sortAndSpill() 方法發生溢寫操作
③ final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename)
//根據分區的個數,建立溢寫檔案:
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local277309786_0001/attempt_local277309786_0001_m_000000_0/output/spill0.out
④ sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); // 溢寫前先排序
⑤ writer.close(); 通過writer進行溢寫,溢寫完成後,關閉流,可以檢視磁盤中的溢寫檔案
⑥ if (totalIndexCacheMemory >= indexCacheMemoryLimit)
// create spill index file
Path indexFilename =mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
// 判斷索引使用的記憶體空間是否超過限制的大小,如果超過也需要溢寫到磁盤
⑦ map持續往緩沖區寫,達到溢寫條件,就繼續溢寫 ........ 可能整個過程中發生N次溢寫。
⑧ MapTask中的runNewMapper 中 output.close(mapperContext);
假如上一次溢寫完後,剩餘進入的到緩沖區的資料沒有達到溢寫條件,那麼當map中的所有的資料
都已經處理完後,在關閉output時,會把緩沖區中的資料刷到磁盤中(其實就是沒有達到溢寫條件的資料也要寫到磁盤)
[1] collector.flush(); //刷寫
{1} . sortAndSpill(); 通過溢寫的方法進行剩餘資料的刷寫
{2} . 最後一次刷寫完後,磁盤中會有N個溢寫檔案
spill0.out spill1.out .... spillN.out
{3} . 歸并 mergeParts();
>>1. for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
//根據溢寫的次數,得到要歸并多少個溢寫檔案
>>2. Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
/tmp/hadoopAdministrator/mapred/local/localRunner/Administrator/jobcache/job_local1987086776_0001/attempt_local1987086776_0001_m_000000_0/output/file.out
Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1987086776_0001/attempt_local1987086776_0001_m_000000_0/output/file.out.index
//生成最終存儲資料的兩個檔案
>>3. for (int parts = 0; parts < partitions; parts++) {
// 按照分區的, 進行歸并。
>>4. awKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter, sortSegments,
null, spilledRecordsCounter, sortPhase.phase(),
TaskType.MAP);
//歸并操作
>>5 Writer<K, V> writer = new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter);
//通過writer寫歸并後的資料到磁盤
>>6 .
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector);
}
在歸并時,如果有combine,且溢寫的次數大于等于minSpillsForCombine的值3才會使用Combine
>>7.
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
歸并完後,将溢寫的檔案删除
>> 8. 最後在磁盤中存儲map處理完後的資料,等待reduce的拷貝。
file.out file.out.index
四、 ReduceTask工作機制
1. 在LocalJobRunner$Job中的run()方法中
if (numReduceTasks > 0) {
//根據reduceTask的個數,建立對應個數的LocalJobRunner$Job$ReduceTaskRunnable
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
jobId, mapOutputFiles);
//線程池
ExecutorService reduceService = createReduceExecutor();
//将 ReduceTaskRunnable送出給線程池執行
runTasks(reduceRunnables, reduceService, "reduce");
}
1) . 執行LocalJobRunner$Job$ReduceTaskRunnable 中的run方法
(1) . ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId, mapIds.size(), 1);
//建立ReduceTask對象
(2) . reduce.run(localConf, Job.this); // 執行ReduceTask的run方法
<1> . runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
[1] . org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = TaskAttemptContextImpl
[2] . org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = WordCountReducer
[3] . org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = ReduceTask$NewTrackingRecordWriter
[4] . reducer.run(reducerContext);
//執行WordCountReducer的run方法 ,實際執行的是WordCountReducer繼承的Reducer類中的run方法.
{1} .reduce(context.getCurrentKey(), context.getValues(), context);
//執行到WordCountReducer中的 reduce方法.
{2} . context.write(k,v) 将處理完的kv寫出.
>>1 . reduceContext.write(key, value);
>>2 . output.write(key, value);
>>3 . real.write(key,value); // 通過RecordWriter将kv寫出
>>4 . out.write(NEWLINE); //通過輸出流将資料寫到結果檔案中