MapReduce是面向大資料并行處理的計算模型、架構和平台,提供為資料劃分和計算任務排程的功能,系統自動将一個作業(Job)待處理的大資料劃分為很多個資料塊,每個資料塊對應于一個計算任務(Task),并自動 排程計算節點來處理相應的資料塊。作業和任務排程功能主要負責配置設定和排程計算節點(Map節點或Reduce節點),同時負責監控這些節點的執行狀态,并負責Map節點執行的同步控制。
一個完整的MapReduce程式在分布式運作時有兩步程序:
1)MapTask:負責map階段整個資料處理流程。
2)ReduceTask:負責reduce階段整個資料處理流程。
接下來我們重點講解一下二者的具體流程。
一、MR流程
1. MapTask
(1)Read階段:MapTask通過InputFormat獲得的RecordReader,從輸入InputSplit中解析出一個個Key-Value。
(2)Map階段:該節點主要是将解析出的Key-Value交給使用者編寫map()函數處理,并産生一系列新的Key-Value。
(3)Collect收集階段:在使用者編寫map()函數中,當資料處理完成後,一般會調用OutputCollector.collect()輸出結果。在該函數内部,它會将生成的Key-Value分區(調用Partitioner),并寫入一個環形記憶體緩沖區中。
(4)Spill階段:即“溢寫”,當環形緩沖區滿後,MapReduce會将資料寫到本地磁盤上,生成一個臨時檔案。需要注意的是,将資料寫入本地磁盤之前,先要對資料進行一次本地排序,并在必要時對資料進行合并、壓縮等操作。
(5)Merge階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合并,以確定最終隻會生成一個資料檔案。
2. ReduceTask
(1)Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,并針對某一片資料,如果其大小超過一定門檻值,則寫到磁盤上,否則直接放到記憶體中。
(2)Sort階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個背景線程對記憶體和磁盤上的檔案進行合并,以防止記憶體使用過多或磁盤上檔案過多。按照MapReduce語義,使用者編寫reduce()函數輸入資料是按Key進行聚集的一組資料。為了将Key相同的資料聚在一起,Hadoop采用了基于排序的政策。由于各個MapTask已經實作對自己的處理結果進行了局部排序,是以,ReduceTask隻需對所有資料進行一次歸并排序即可。
(3)Reduce階段:reduce()函數将計算結果寫到HDFS上。
二、源碼解析
1. Job送出流程
public class WordCOuntDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//擷取任務
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//設定jar路徑
job.setJarByClass(WordCOuntDriver.class);
//關聯mapper和reducer
job.setMapperClass(WordCountMappper.class);
job.setReducerClass(WordCountReduce.class);
//設定map輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定最終輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定輸入輸出
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output\\o1"));
//送出任務
boolean bool = job.waitForCompletion(true);
System.exit(bool ? 0 : 1);
}
}
if (state == JobState.DEFINE) {
submit();
}
connect(); //建立連接配接
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration()); //建立送出Job的代理
}
});
}
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf); //判斷是本地運作環境還是yarn叢集運作環境
}
submitter.submitJobInternal(Job.this, cluster); //送出job
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //建立給叢集送出資料的Stag路徑
JobID jobId = submitClient.getNewJobID(); //擷取JobID,并建立Job路徑
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
copyAndConfigureFiles(job, submitJobDir); //拷貝jar包到叢集
int maps = writeSplits(job, submitJobDir); //計算切片,生成切片規劃檔案
writeConf(conf, submitJobFile); //向Stag路徑寫xml配置檔案
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); //送出job,傳回送出狀态
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
2. MapTask源碼
public class WordCountMappper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
k.set(word);
context.write(k, v); //自定義Map方法的寫出
}
}
}
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions)); //收集方法,執行兩次,以及預設分區器HashPartitioner
}
public synchronized void collect(K key, V value, final int partition
) throws IOException {
} //Map端所有的kv全部寫出後會走下面的close方法
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush(); //溢出刷寫方法
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
public void flush() throws IOException, ClassNotFoundException,
InterruptedException {
spillLock.lock();
sortAndSpill(); //溢寫排序
mergeParts(); //合并檔案
}
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢寫排序方法
}
3. ReduceTask源碼
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
} //判斷任務
initialize(job, getJobID(), reporter, useNewApi); //任務初始化
shuffleConsumerPlugin.init(shuffleContext); //shuffle初始化
rIter = shuffleConsumerPlugin.run();
sortPhase.complete(); //排序完成,即将進入reduce階段,自定義的reduce會執行多次
}
totalMaps = job.getNumMapTasks(); //擷取MapTask的個數
merger = createMergeManager(context); //合并方法
this.inMemoryMerger = createInMemoryMerger(); //記憶體合并
this.inMemoryMerger.start(); //磁盤合并
public RawKeyValueIterator run() throws IOException, InterruptedException {
eventFetcher.start(); //開始抓取資料
eventFetcher.shutDown(); //抓取結束
copyPhase.complete(); //copy階段完成
taskStatus.setPhase(TaskStatus.Phase.SORT); //開始排序階段
return kvIter;
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
} //reduce完成之前,會最後調用一次Reducer裡的cleanup方法
三、總結
本片文章講述了MapReduce的詳細流程和源碼執行過程,有幾處重點如下:
- 任務送出過程中的資料切片,決定MapTask任務的個數
- 溢寫過程的排序與合并
- Reduce的歸并排序
對于這些内容,我們需要重點關注,在面試中遇到這些問題我們就可以對答如流了。