天天看點

Mapreduce原理及應用

Mapreduce原理

MapReduce(以下簡稱MR)是一種程式設計模型,用于大規模資料集(大于1TB)的并行運算。概念"Map(映射)“和"Reduce(歸約)”,是它們的主要思想,都是從函數式程式設計語言裡借來的,還有從矢量程式設計語言裡借來的特性。它極大地友善了程式設計人員在不會分布式并行程式設計的情況下,将自己的程式運作在分布式系統上。 目前的軟體實作是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定并發的Reduce(歸約)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

MR主要思想:分久必合

MR是由兩個階段組成

Map端

Reduce端

MR核心思想:“相同”的key為一組,調用一次reduce方法,方法内疊代這一組資料進行計算

MapReduce分布式計算原理

MR在計算之前,會将HDFS上的檔案劃分切片

預設大小 block = split切片 = map task 注:split會比block大幾kb或小幾kb,因為block是嚴格按照位元組切分,防止資料亂碼,會将block下一塊的第一行資料也添加進去;

Shuffle write階段

Map task将處理後的每一條記錄打上标簽,打标簽的目的就是為讓這一條知道将來被哪一個redcuce task處理,然後進入buffer後,每一條記錄是由三部分組成:1、分區号 2、key 3、value,Map task往buffer中寫入過程中,一旦寫入到80M,此時會将這80M的記憶體封鎖,封鎖後,會對記憶體中的資料進行combiner(小聚合),然後進行排序,将相同分區的資料放到一起,并且分區的資料是有序的,以上的combiner以及排序完成後,就開始溢寫資料到磁盤上,此時的磁盤檔案就是一個根據分區号,分好區的,并且内部有序的檔案combiner、sort、spill每進行一次溢寫,就會産生一個磁盤小檔案

Map task計算完畢後,會将磁盤上的小檔案合并成一個大檔案,在合并的時候會使用歸并排序的算法,将各個小檔案合并成一個有序的大檔案

Shuffle read階段

從map端讀取相應的分區資料,将分區資料寫入到記憶體中,記憶體滿了就會溢寫,溢寫之前會排序,當把所有的資料取過來之後,會将溢寫産生的磁盤小檔案合并 排序成有序的大檔案,然後 每一個大檔案啟動一個Reduce task進行計算,然後将結果輸出到結果檔案中,最後傳回給用戶端。

Mapreduce原理及應用

MapReduce的核心程式運作機制

(1) 一個 MR 程式啟動的時候,最先啟動的是 MRAppMaster, MRAppMaster 啟動後根據本次 job 的描述資訊,計算出需要的 maptask 執行個體數量,然後向叢集申請機器啟動相應數量的 maptask 程序

(2) maptask 程序啟動之後,根據給定的資料切片(哪個檔案的哪個偏移量範圍)範圍進行數 據處理,主體流程為:

A、 利用客戶指定的 inputformat 來擷取 RecordReader 讀取資料,形成輸入 KV 對

B、 将輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,并将 map()方法輸出的 KV 對收 集到緩存

C、 将緩存中的 KV 對按照 K 分區排序後不斷溢寫到磁盤檔案 (超過緩存記憶體寫到磁盤臨時檔案,最後都寫到該檔案,ruduce 擷取該檔案後,删除 )

(3) MRAppMaster 監控到所有 maptask 程序任務完成之後(真實情況是,某些 maptask 程序處理完成後,就會開始啟動 reducetask 去已完成的 maptask 處資料),會根據客戶指 定的參數啟動相應數量的 reducetask 程序,并告知 reducetask 程序要處理的資料範圍(資料分區)

(4) Reducetask 程序啟動之後,根據 MRAppMaster 告知的待處理資料所在位置,從若幹台 maptask 運作所在機器上擷取到若幹個 maptask 輸出結果檔案,并在本地進行重新歸并排序, 然後按照相同 key 的 KV 為一個組,調用客戶定義的 reduce()方法進行邏輯運算,并收集運

算輸出的結果 KV,然後調用客戶指定的 outputformat 将結果資料輸出到外部存儲

Mapreduce應用

MapReduce-WordCount示例編寫及編碼規範

(1) 程式有一個 main 方法,來啟動任務的運作,其中 job 對象就存儲了該程式運作的必要 資訊,比如指定 Mapper 類和 Reducer 類

public class WC {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration(true);


		Job job = Job.getInstance(conf);
		//設定目前main函數所在類
		job.setJarByClass(WC.class);
		job.setJar("d:/HDFS/wc.jar");

		//設定輸入路徑
		FileInputFormat.setInputPaths(job, "/input/wc.txt");
		//設定輸出路徑
		Path outputPath = new Path("/output");
		FileSystem fs = outputPath.getFileSystem(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);

		//設定Map class
		job.setMapperClass(WCMapper.class);
		//設定map輸出key、value的類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		//設定reduce class
		job.setReducerClass(WCReduce.class);

		job.setNumReduceTasks(2);

		job.waitForCompletion(true);
	}
}
           

job.setMapperClass(TokenizerMapper.class);

job.setReducerClass(IntSumReducer.class);

(2) 該程式中的 TokenizerMapper 類繼承了 Mapper 類

(3) 該程式中的 IntSumReducer 類繼承了 Reducer 類

MapReduce 程式的業務編碼分為兩個大部分,一部配置設定置程式的運作資訊,一部分 編寫該 MapReduce 程式的業務邏輯,并且業務邏輯的 map 階段和 reduce 階段的代碼分别繼 承 Mapper 類和 Reducer 類

(1) 使用者編寫的程式分成三個部分: Mapper, Reducer, Driver(送出運作 MR 程式的用戶端)

(2) Mapper 的輸入資料是 KV 對的形式( KV 的類型可自定義)

(3) Mapper 的輸出資料是 KV 對的形式( KV 的類型可自定義)

(4) Mapper 中的業務邏輯寫在 map()方法中

(5) map()方法( maptask 程序)對每一個<K,V>調用一次

package com.hpe.mr.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	Text myKey = new Text();
	IntWritable myValue = new IntWritable(1);
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		System.out.println(key+"==========");
//		value.toString().split(" ")
		String[] words = StringUtils.split(value.toString(), ' ');
		for (String word : words) {
			myKey.set(word);
			context.write(myKey,myValue);
		}
	}
}

           

(6) Reducer 的輸入資料類型對應 Mapper 的輸出資料類型,也是 KV

(7) Reducer 的業務邏輯寫在 reduce()方法中

(8) Reducetask 程序對每一組相同 k 的<k,v>組調用一次 reduce()方法

package com.hpe.mr.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class WCReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
		 int sum = 0;
		 for (IntWritable value : values) {
			sum += value.get();
		}
		 context.write(key, new IntWritable(sum));
	}

}
           

(9) 使用者自定義的 Mapper 和 Reducer 都要繼承各自的父類

(10) 整個程式需要一個 Drvier 來進行送出,送出的是一個描述了各種必要資訊的 job 對象

WordCount 的業務邏輯:

1、 maptask 階段處理每個資料分塊的單詞統計分析,思路是每遇到一個單詞則把其轉換成 一個 key-value 對,比如單詞 hello,就轉換成<’hello’,1>發送給 reducetask 去彙總

2、 reducetask 階段将接受 maptask 的結果, 來做彙總計數