先上圖
目錄
〇、Job送出流程
0.WordCount源碼:
1.waitForCompletion
2.submit
3.submitJobInternal
一、getSplits:輸入檔案分片
二、RecordReader:讀取檔案
三、Map
四、環形緩沖區:溢寫到磁盤
五、shuffle:分發
〇、Job送出流程
0.WordCount源碼:
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1.waitForCompletion
我們在自己寫的MR程式中通過org.apache.hadoop.mapreduce.Job來建立Job,配置好之後通過waitForCompletion方法來送出Job并列印MR執行過程的log。waitForCompletion源碼及注釋如下:
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
if (state == JobState.DEFINE) {
submit(); //判斷狀态state為DEFINE狀态,則可以送出Job後,執行submit()方法。
}
if (verbose) { //verbose表示是否列印Job運作資訊
monitorAndPrintJob(); //不斷的重新整理擷取job運作的進度資訊,并列印。
} else {
// 從配置裡取得輪訓的間隔時間,來分析目前job是否執行完畢
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
2.submit
其中調用的函數submit()源碼及注釋如下:
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE); // 確定目前的Job的狀态是處于DEFINE,否則不能送出Job。
setUseNewAPI(); // 啟用新的API,即org.apache.hadoop.mapreduce下的Mapper和Reducer
connect(); // Connect方法會産生一個JobClient執行個體,用來和JobTracker通信。
final JobSubmitter submitter = // 構造送出器
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster); // 送出
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
3.submitJobInternal
送出函數 submitJobInternal() 源碼太長,就不貼了,它主要幹了以下事情:
1.checkSpecs:檢查輸出目錄,如果已存在則報錯
2.getStagingDir:初始化Job執行過程中會用到的檔案的存放路徑
3.getHostAddress/Name:擷取和設定送出job機器的位址和主機名
4.getNewJobID:擷取JobID
5.從HDFS的NameNode擷取驗證用的Token,并将其放入緩存。攜帶這個Token就可以去NameNode查詢task運作情況
6. copyAndConfigureFiles:上傳指令中配置的檔案,比如我們打的WordCount.jar
7.writeSplits:對輸入檔案分片,将分片資訊寫入HDFS中
8.submitJob:正式送出Job到Yarn
至此,Job已經正式送出到Yarn去運作了。
參考部落格:mapreduce job送出流程源碼級分析: https://www.cnblogs.com/lxf20061900/p/3643581.html
一、getSplits:輸入檔案分片
假設我們有一個大小為200M的檔案,裡面每行是一個單詞。在上面的Job送出流程中,有一步就是對輸入檔案進行分片。
預設情況下我們調用的是TextInputFormat類來對檔案進行分片,分片函數getSplits繼承自它的抽象父類FileInputFormat,以下是FileInputFormat.getSplits()函數的流程圖。
(1)周遊輸入檔案
(2)如果檔案大小=0,則新增一個長度為0的split;否則到第3步
(3)判斷檔案是否可切分(預設傳回True),如果不可切分,則新增一個和檔案相同大小的split(即把這個檔案全部放到一個split中),否則到第4步
(4)計算一個切片的大小,是minSize,maxSize,blockSize這三個數中的不大不小的那個
(5)如果檔案剩餘帶下/splitSize>1.1,則切分出長度為splitSize的大小的一個切片;否則把剩下的全部都作為一個切片(這樣做是為了防止隻剩129M的時候被切成了128M+1M,防止了過多小檔案的産生)
(6)重複1-5直到檔案周遊結束,傳回所有切分資訊。
minSize 可通過 mapreduce.input.fileinputformat.split.minsize 來設定
maxSize 可通過mapreduce.input.fileinputformat.split.maxsize 來設定
blockSize在Hadoop2中預設128M,Hadoop1中預設64M
FileSplit主要屬性如下圖:
從上圖可以看出一個split隻是記錄了一個檔案的位置、開始、結束等資訊,隻是邏輯上的一個分片,并不是真正的切出來這樣的一個檔案放在磁盤上。
是以經過上面切分之後,我們得到了兩個split,第一個是0~128M,第二個是128~200M。
二、RecordReader:讀取檔案
InputFormat除了對輸入檔案進行切片,還有一個重要的作用就是讀取輸入檔案,轉化為key-value形式的資料傳遞給Map來處理。InputFormat.createRecordReader()會傳回一個RecordReader執行個體,然後調用RecordReader中的方法對檔案進行讀取。以LineRecordReader為例,主要變量和函數如下:
其中start、end記錄了目前split的開始和結束,pos記錄了目前讀取的位置。nextKeyValue會判斷是否還有下一個k-v,如果有将會擷取下一個k-v,然後調用getCurrentKey()擷取目前key,getCurrentValue擷取目前value。LineRecordReader輸出的key是偏移量,value是每一行的内容。是以輸入檔案的轉化過程如下:
三、Map
Mapper的驅動函數如下:
run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
可以看出,context是RecordReader的一層封裝,調用的都是RecordReader中的函數。擷取到k-v之後就傳遞給我們自己寫的map函數,輸出新的k-v。這一步的轉化過程如下:
四、環形緩沖區:溢寫到磁盤
Mapper的輸出去向如何呢?我們在map()中通過Context.write(k,v)來輸出計算好的k-v,通過outputCollector收集之後寫到寫到環形緩沖區中。
環形内緩沖區就是記憶體中一塊連續的位址,我們從它的一端寫資料,也就是Mapper輸出的k-v,另一端寫這些資料的索引,包括第index個k-v、屬于第partition個分區、key的起始位置keystart、value的起始位置valuestart。資料和索引是根據equator區分的,這個equator在發生溢寫之後是可以變化的。
這個環形緩沖區的預設大小是100M,當這個環形數組的資料存儲量達到80%的時候就開始執行splill溢寫操作,它會鎖定這80%的記憶體資料,并把這些資料給它寫到本地磁盤上。在溢寫的時候,剩下的20%依然可以接受來自Mapper的資料。
上面所說的partition表示這個k-v屬于第幾個分區,是通過Partitioner.getPartition()确定的,預設是按照key的哈希值&int最大值,然後對reduce個數取模。當達到80%開始溢寫的時候,有幾個分區就會産生幾個分區檔案。在寫入檔案之前,還會對同一個分區中的資料按照key進行快速排序,然後把排好序的檔案寫到檔案中。
假如我們有兩個reduce,第一個80%會産生兩個分區檔案,分區1和分區2。當第二次達到80%的時候會再産生兩個分區檔案,分區3和分區4。如果此時map執行結束了,同一個map産生的這四個分區檔案還會按照相同的partition進行合并,合并的時候會進行歸并排序以保證合并後的檔案也是有序的。上面這些操作圖示如下:
這是在我們沒有配置Combiner的情況下的執行方式。如果我們配置了Combiner,則會在spill到磁盤的時候對相同key的資料進行合并,四個分區檔案進行合并的時候對相同的key也會執行合并操作。是以有combiner的時候執行過程如下:
接下來詳細看一下Combiner。Combiner繼承于Reducer,是對同一key的value清單進行處理。Combiner是對同一key的部分value進行操作,而Reducer是對同一key的所有value進行操作。是以我們隻能在部分操作不影響總體操作的時候才能使用Combiner,比如最大值、最小值等。不能用的情況有平均值、中位數等。
Combiner的好處有:它會先對對map輸出的結果進行一次合并,減少了map和reduce節點中的資料傳輸量。同時map階段已經對部分資料進行了合并,減少了reduce要處理的資料量。
五、shuffle:分發
Shuffle過程,也稱Copy階段。reduce task從各個map task上遠端拷貝一片資料,并針對某一片資料,如果其大小超過一定的閥值,則寫到磁盤上,否則直接放到記憶體中。
map輸出資料完成之後,通過運作一個HTTP Server暴露出來,供reduce端擷取。同時也會通知Application Master,以便Reducer能夠及時來拉取資料。
Reducer需要從各map任務中提取自己的那一部分資料(對應的partition)。每個map任務的完成時間可能是不一樣的,reduce任務在map任務結束之後會盡快取走輸出結果,這個階段叫copy。map任務完成之後,就會通過正常心跳通知應用程式的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有資料。
資料被reduce提走之後,map機器不會立刻删除資料,這是為了預防reduce任務失敗需要重做。是以map輸出資料是在整個作業完成之後才被删除掉的。
如果map輸出的資料足夠小,則會被拷貝到reduce任務的JVM記憶體中。如果資料太大容不下,則被拷貝到reduce的機器磁盤上。當緩沖中資料達到配置的門檻值時,這些資料在記憶體中被合并、寫入機器磁盤。在合并的過程中,會對被合并的檔案做全局的排序。如果作業配置了Combiner,則會運作combine函數,減少寫入磁盤的資料量。
在copy過來的資料不斷寫入磁盤的過程中,一個背景線程會把這些檔案合并為更大的、有序的檔案。
未完待續。。。