天天看點

網易二面:MapReduce的運作完整流程(内含源碼分析)

MapReduce是面向大資料并行處理的計算模型、架構和平台,提供為資料劃分和計算任務排程的功能,系統自動将一個作業(Job)待處理的大資料劃分為很多個資料塊,每個資料塊對應于一個計算任務(Task),并自動 排程計算節點來處理相應的資料塊。作業和任務排程功能主要負責配置設定和排程計算節點(Map節點或Reduce節點),同時負責監控這些節點的執行狀态,并負責Map節點執行的同步控制。

一個完整的MapReduce程式在分布式運作時有兩步程序:

1)MapTask:負責map階段整個資料處理流程。

2)ReduceTask:負責reduce階段整個資料處理流程。

接下來我們重點講解一下二者的具體流程。

一、MR流程

1. MapTask

網易二面:MapReduce的運作完整流程(内含源碼分析)

(1)Read階段:MapTask通過InputFormat獲得的RecordReader,從輸入InputSplit中解析出一個個Key-Value。

(2)Map階段:該節點主要是将解析出的Key-Value交給使用者編寫map()函數處理,并産生一系列新的Key-Value。

(3)Collect收集階段:在使用者編寫map()函數中,當資料處理完成後,一般會調用OutputCollector.collect()輸出結果。在該函數内部,它會将生成的Key-Value分區(調用Partitioner),并寫入一個環形記憶體緩沖區中。

(4)Spill階段:即“溢寫”,當環形緩沖區滿後,MapReduce會将資料寫到本地磁盤上,生成一個臨時檔案。需要注意的是,将資料寫入本地磁盤之前,先要對資料進行一次本地排序,并在必要時對資料進行合并、壓縮等操作。

(5)Merge階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合并,以確定最終隻會生成一個資料檔案。

2. ReduceTask

網易二面:MapReduce的運作完整流程(内含源碼分析)

(1)Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,并針對某一片資料,如果其大小超過一定門檻值,則寫到磁盤上,否則直接放到記憶體中。

(2)Sort階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個背景線程對記憶體和磁盤上的檔案進行合并,以防止記憶體使用過多或磁盤上檔案過多。按照MapReduce語義,使用者編寫reduce()函數輸入資料是按Key進行聚集的一組資料。為了将Key相同的資料聚在一起,Hadoop采用了基于排序的政策。由于各個MapTask已經實作對自己的處理結果進行了局部排序,是以,ReduceTask隻需對所有資料進行一次歸并排序即可。

(3)Reduce階段:reduce()函數将計算結果寫到HDFS上。

二、源碼解析

1. Job送出流程

public class WordCOuntDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//擷取任務

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//設定jar路徑

job.setJarByClass(WordCOuntDriver.class);

//關聯mapper和reducer

job.setMapperClass(WordCountMappper.class);

job.setReducerClass(WordCountReduce.class);

//設定map輸出

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//設定最終輸出

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//設定輸入輸出

FileInputFormat.setInputPaths(job, new Path("D:\\input"));

FileOutputFormat.setOutputPath(job, new Path("D:\\output\\o1"));

//送出任務

boolean bool = job.waitForCompletion(true);

System.exit(bool ? 0 : 1);

}

}

if (state == JobState.DEFINE) {

submit();

}

connect(); //建立連接配接

private synchronized void connect()

throws IOException, InterruptedException, ClassNotFoundException {

if (cluster == null) {

cluster =

ugi.doAs(new PrivilegedExceptionAction<Cluster>() {

public Cluster run()

throws IOException, InterruptedException,

ClassNotFoundException {

return new Cluster(getConfiguration()); //建立送出Job的代理

}

});

}

}

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)

throws IOException {

this.conf = conf;

this.ugi = UserGroupInformation.getCurrentUser();

initialize(jobTrackAddr, conf); //判斷是本地運作環境還是yarn叢集運作環境

}

submitter.submitJobInternal(Job.this, cluster); //送出job

JobStatus submitJobInternal(Job job, Cluster cluster)

throws ClassNotFoundException, InterruptedException, IOException {

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //建立給叢集送出資料的Stag路徑

JobID jobId = submitClient.getNewJobID(); //擷取JobID,并建立Job路徑

job.setJobID(jobId);

Path submitJobDir = new Path(jobStagingArea, jobId.toString());

JobStatus status = null;

try {

copyAndConfigureFiles(job, submitJobDir); //拷貝jar包到叢集

int maps = writeSplits(job, submitJobDir); //計算切片,生成切片規劃檔案

writeConf(conf, submitJobFile); //向Stag路徑寫xml配置檔案

status = submitClient.submitJob(

jobId, submitJobDir.toString(), job.getCredentials()); //送出job,傳回送出狀态

if (status != null) {

return status;

} else {

throw new IOException("Could not launch job");

}

} finally {

if (status == null) {

LOG.info("Cleaning up the staging area " + submitJobDir);

if (jtFs != null && submitJobDir != null)

jtFs.delete(submitJobDir, true);

}

}

}

2. MapTask源碼

public class WordCountMappper extends Mapper<LongWritable, Text, Text, IntWritable> {

Text k = new Text();

IntWritable v = new IntWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(" ");

for (String word : words) {

k.set(word);

context.write(k, v); //自定義Map方法的寫出

}

}

}

public void write(K key, V value) throws IOException, InterruptedException {

collector.collect(key, value,

partitioner.getPartition(key, value, partitions)); //收集方法,執行兩次,以及預設分區器HashPartitioner

}

public synchronized void collect(K key, V value, final int partition

) throws IOException {

} //Map端所有的kv全部寫出後會走下面的close方法

public void close(TaskAttemptContext context

) throws IOException,InterruptedException {

try {

collector.flush(); //溢出刷寫方法

} catch (ClassNotFoundException cnf) {

throw new IOException("can't find class ", cnf);

}

collector.close();

}

public void flush() throws IOException, ClassNotFoundException,

InterruptedException {

spillLock.lock();

sortAndSpill(); //溢寫排序

mergeParts(); //合并檔案

}

private void sortAndSpill() throws IOException, ClassNotFoundException,

InterruptedException {

sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢寫排序方法

}

3. ReduceTask源碼

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, InterruptedException, ClassNotFoundException {

if (isMapOrReduce()) {

copyPhase = getProgress().addPhase("copy");

sortPhase = getProgress().addPhase("sort");

reducePhase = getProgress().addPhase("reduce");

} //判斷任務

initialize(job, getJobID(), reporter, useNewApi); //任務初始化

shuffleConsumerPlugin.init(shuffleContext); //shuffle初始化

rIter = shuffleConsumerPlugin.run();

sortPhase.complete(); //排序完成,即将進入reduce階段,自定義的reduce會執行多次

}

totalMaps = job.getNumMapTasks(); //擷取MapTask的個數

merger = createMergeManager(context); //合并方法

this.inMemoryMerger = createInMemoryMerger(); //記憶體合并

this.inMemoryMerger.start(); //磁盤合并

public RawKeyValueIterator run() throws IOException, InterruptedException {

eventFetcher.start(); //開始抓取資料

eventFetcher.shutDown(); //抓取結束

copyPhase.complete(); //copy階段完成

taskStatus.setPhase(TaskStatus.Phase.SORT); //開始排序階段

return kvIter;

}

protected void cleanup(Context context

) throws IOException, InterruptedException {

} //reduce完成之前,會最後調用一次Reducer裡的cleanup方法

三、總結

本片文章講述了MapReduce的詳細流程和源碼執行過程,有幾處重點如下:

  • 任務送出過程中的資料切片,決定MapTask任務的個數
  • 溢寫過程的排序與合并
  • Reduce的歸并排序

對于這些内容,我們需要重點關注,在面試中遇到這些問題我們就可以對答如流了。

繼續閱讀