天天看點

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

先上圖

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

目錄

〇、Job送出流程

0.WordCount源碼:

1.waitForCompletion

2.submit

3.submitJobInternal

一、getSplits:輸入檔案分片

二、RecordReader:讀取檔案

三、Map

四、環形緩沖區:溢寫到磁盤

五、shuffle:分發

〇、Job送出流程

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

0.WordCount源碼:

public class WordCount {
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}
	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		public void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length < 2) {
			System.err.println("Usage: wordcount <in> [<in>...] <out>");
			System.exit(2);
		}
		Job job = Job.getInstance(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		for (int i = 0; i < otherArgs.length - 1; ++i) {
			FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
		}
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
           

1.waitForCompletion

我們在自己寫的MR程式中通過org.apache.hadoop.mapreduce.Job來建立Job,配置好之後通過waitForCompletion方法來送出Job并列印MR執行過程的log。waitForCompletion源碼及注釋如下:

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit(); //判斷狀态state為DEFINE狀态,則可以送出Job後,執行submit()方法。
    }
    if (verbose) { //verbose表示是否列印Job運作資訊
      monitorAndPrintJob(); //不斷的重新整理擷取job運作的進度資訊,并列印。
    } else {
      // 從配置裡取得輪訓的間隔時間,來分析目前job是否執行完畢
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
           

2.submit

其中調用的函數submit()源碼及注釋如下:

public void submit() 
        throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE); // 確定目前的Job的狀态是處于DEFINE,否則不能送出Job。
    setUseNewAPI(); // 啟用新的API,即org.apache.hadoop.mapreduce下的Mapper和Reducer
    connect(); // Connect方法會産生一個JobClient執行個體,用來和JobTracker通信。
    final JobSubmitter submitter = // 構造送出器
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster); // 送出
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
           

3.submitJobInternal

送出函數 submitJobInternal() 源碼太長,就不貼了,它主要幹了以下事情:

1.checkSpecs:檢查輸出目錄,如果已存在則報錯
2.getStagingDir:初始化Job執行過程中會用到的檔案的存放路徑
3.getHostAddress/Name:擷取和設定送出job機器的位址和主機名
4.getNewJobID:擷取JobID
5.從HDFS的NameNode擷取驗證用的Token,并将其放入緩存。攜帶這個Token就可以去NameNode查詢task運作情況
6. copyAndConfigureFiles:上傳指令中配置的檔案,比如我們打的WordCount.jar
7.writeSplits:對輸入檔案分片,将分片資訊寫入HDFS中
8.submitJob:正式送出Job到Yarn
           

至此,Job已經正式送出到Yarn去運作了。

參考部落格:mapreduce job送出流程源碼級分析: https://www.cnblogs.com/lxf20061900/p/3643581.html

一、getSplits:輸入檔案分片

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

假設我們有一個大小為200M的檔案,裡面每行是一個單詞。在上面的Job送出流程中,有一步就是對輸入檔案進行分片。

預設情況下我們調用的是TextInputFormat類來對檔案進行分片,分片函數getSplits繼承自它的抽象父類FileInputFormat,以下是FileInputFormat.getSplits()函數的流程圖。

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

(1)周遊輸入檔案

(2)如果檔案大小=0,則新增一個長度為0的split;否則到第3步

(3)判斷檔案是否可切分(預設傳回True),如果不可切分,則新增一個和檔案相同大小的split(即把這個檔案全部放到一個split中),否則到第4步

(4)計算一個切片的大小,是minSize,maxSize,blockSize這三個數中的不大不小的那個

(5)如果檔案剩餘帶下/splitSize>1.1,則切分出長度為splitSize的大小的一個切片;否則把剩下的全部都作為一個切片(這樣做是為了防止隻剩129M的時候被切成了128M+1M,防止了過多小檔案的産生)

(6)重複1-5直到檔案周遊結束,傳回所有切分資訊。

minSize 可通過 mapreduce.input.fileinputformat.split.minsize 來設定

maxSize 可通過mapreduce.input.fileinputformat.split.maxsize 來設定

blockSize在Hadoop2中預設128M,Hadoop1中預設64M

FileSplit主要屬性如下圖:

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

從上圖可以看出一個split隻是記錄了一個檔案的位置、開始、結束等資訊,隻是邏輯上的一個分片,并不是真正的切出來這樣的一個檔案放在磁盤上。

是以經過上面切分之後,我們得到了兩個split,第一個是0~128M,第二個是128~200M。

二、RecordReader:讀取檔案

InputFormat除了對輸入檔案進行切片,還有一個重要的作用就是讀取輸入檔案,轉化為key-value形式的資料傳遞給Map來處理。InputFormat.createRecordReader()會傳回一個RecordReader執行個體,然後調用RecordReader中的方法對檔案進行讀取。以LineRecordReader為例,主要變量和函數如下:

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

其中start、end記錄了目前split的開始和結束,pos記錄了目前讀取的位置。nextKeyValue會判斷是否還有下一個k-v,如果有将會擷取下一個k-v,然後調用getCurrentKey()擷取目前key,getCurrentValue擷取目前value。LineRecordReader輸出的key是偏移量,value是每一行的内容。是以輸入檔案的轉化過程如下:

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

三、Map

Mapper的驅動函數如下:

run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
           

可以看出,context是RecordReader的一層封裝,調用的都是RecordReader中的函數。擷取到k-v之後就傳遞給我們自己寫的map函數,輸出新的k-v。這一步的轉化過程如下:

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

四、環形緩沖區:溢寫到磁盤

Mapper的輸出去向如何呢?我們在map()中通過Context.write(k,v)來輸出計算好的k-v,通過outputCollector收集之後寫到寫到環形緩沖區中。

環形内緩沖區就是記憶體中一塊連續的位址,我們從它的一端寫資料,也就是Mapper輸出的k-v,另一端寫這些資料的索引,包括第index個k-v、屬于第partition個分區、key的起始位置keystart、value的起始位置valuestart。資料和索引是根據equator區分的,這個equator在發生溢寫之後是可以變化的。

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

這個環形緩沖區的預設大小是100M,當這個環形數組的資料存儲量達到80%的時候就開始執行splill溢寫操作,它會鎖定這80%的記憶體資料,并把這些資料給它寫到本地磁盤上。在溢寫的時候,剩下的20%依然可以接受來自Mapper的資料。

上面所說的partition表示這個k-v屬于第幾個分區,是通過Partitioner.getPartition()确定的,預設是按照key的哈希值&int最大值,然後對reduce個數取模。當達到80%開始溢寫的時候,有幾個分區就會産生幾個分區檔案。在寫入檔案之前,還會對同一個分區中的資料按照key進行快速排序,然後把排好序的檔案寫到檔案中。

假如我們有兩個reduce,第一個80%會産生兩個分區檔案,分區1和分區2。當第二次達到80%的時候會再産生兩個分區檔案,分區3和分區4。如果此時map執行結束了,同一個map産生的這四個分區檔案還會按照相同的partition進行合并,合并的時候會進行歸并排序以保證合并後的檔案也是有序的。上面這些操作圖示如下:

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

這是在我們沒有配置Combiner的情況下的執行方式。如果我們配置了Combiner,則會在spill到磁盤的時候對相同key的資料進行合并,四個分區檔案進行合并的時候對相同的key也會執行合并操作。是以有combiner的時候執行過程如下:

一張圖了解MapReduce全流程〇、Job送出流程一、getSplits:輸入檔案分片二、RecordReader:讀取檔案三、Map四、環形緩沖區:溢寫到磁盤五、shuffle:分發

接下來詳細看一下Combiner。Combiner繼承于Reducer,是對同一key的value清單進行處理。Combiner是對同一key的部分value進行操作,而Reducer是對同一key的所有value進行操作。是以我們隻能在部分操作不影響總體操作的時候才能使用Combiner,比如最大值、最小值等。不能用的情況有平均值、中位數等。

Combiner的好處有:它會先對對map輸出的結果進行一次合并,減少了map和reduce節點中的資料傳輸量。同時map階段已經對部分資料進行了合并,減少了reduce要處理的資料量。

五、shuffle:分發

Shuffle過程,也稱Copy階段。reduce task從各個map task上遠端拷貝一片資料,并針對某一片資料,如果其大小超過一定的閥值,則寫到磁盤上,否則直接放到記憶體中。

map輸出資料完成之後,通過運作一個HTTP Server暴露出來,供reduce端擷取。同時也會通知Application Master,以便Reducer能夠及時來拉取資料。

Reducer需要從各map任務中提取自己的那一部分資料(對應的partition)。每個map任務的完成時間可能是不一樣的,reduce任務在map任務結束之後會盡快取走輸出結果,這個階段叫copy。map任務完成之後,就會通過正常心跳通知應用程式的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有資料。

資料被reduce提走之後,map機器不會立刻删除資料,這是為了預防reduce任務失敗需要重做。是以map輸出資料是在整個作業完成之後才被删除掉的。

如果map輸出的資料足夠小,則會被拷貝到reduce任務的JVM記憶體中。如果資料太大容不下,則被拷貝到reduce的機器磁盤上。當緩沖中資料達到配置的門檻值時,這些資料在記憶體中被合并、寫入機器磁盤。在合并的過程中,會對被合并的檔案做全局的排序。如果作業配置了Combiner,則會運作combine函數,減少寫入磁盤的資料量。

在copy過來的資料不斷寫入磁盤的過程中,一個背景線程會把這些檔案合并為更大的、有序的檔案。

未完待續。。。