大資料學習路線之mapreduce概述,mapreduce:分布式并行離線計算架構,是一個分布式運算程式的程式設計架構,是使用者開發“基于hadoop的資料分析應用”的核心架構;Mapreduce核心功能是将使用者編寫的業務邏輯代碼和自帶預設元件整合成一個完整的分布式運算程式,并發運作在一個hadoop叢集上;
與HDFS解決問題的原理類似,HDFS是将大的檔案切分成若幹小檔案,然後将它們分别存儲到叢集中各個主機中。
同樣原理,mapreduce是将一個複雜的運算切分成若個子運算,然後将它們分别交給叢集中各個主機,由各個主機并行運算。
1.1 mapreduce産生的背景
海量資料在單機上處理因為硬體資源限制,無法勝任。
而一旦将單機版程式擴充到叢集來分布式運作,将極大增加程式的複雜度和開發難度。
引入mapreduce架構後,開發人員可以将絕大部分工作集中在業務邏輯的開發上,而将分布式計算中的複雜性交由架構來處理。
1.2 mapreduce程式設計模型
一種分布式計算模型。
MapReduce将這個并行計算過程抽象到兩個函數。
Map(映射):對一些獨立元素組成的清單的每一個元素進行指定的操作,可以高度并行。
Reduce(化簡 歸約):對一個清單的元素進行合并。
一個簡單的MapReduce程式隻需要指定map()、reduce()、input和output,剩下的事由架構完成。
Mapreduce的幾個關鍵名詞
Job :使用者的每一個計算請求稱為一個作業。
Task:每一個作業,都需要拆分開了,交由多個主機來完成,拆分出來的執行機關就是任務。
Task又分為如下三種類型的任務:
Map:負責map階段的整個資料處理流程
Reduce:負責reduce階段的整個資料處理流程
MRAppMaster:負責整個程式的過程排程及狀态協調
1.4 mapreduce程式運作流程
具體流程說明:
一個mr程式啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動後根據本次job的描述資訊,計算出需要的maptask執行個體數量,然後向叢集申請機器啟動相應數量的maptask程序
maptask程序啟動之後,根據給定的資料切片範圍進行資料處理,主體流程為:
– 利用客戶指定的inputformat來擷取RecordReader讀取資料,形成輸入KV對。
– 将輸入KV(k是檔案的行号,v是檔案一行的資料)對傳遞給客戶定義的map()方法,做邏輯運算,并将map()方法輸出的KV對收集到緩存。
– 将緩存中的KV對按照K分區排序後不斷溢寫到磁盤檔案
MRAppMaster監控到所有maptask程序任務完成之後,會根據客戶指定的參數啟動相應數量的reducetask程序,并告知reducetask程序要處理的資料範圍(資料分區)
Reducetask程序啟動之後,根據MRAppMaster告知的待處理資料所在位置,從若幹台maptask運作所在機器上擷取到若幹個maptask輸出結果檔案,并在本地進行重新歸并排序,然後按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,并收集運算輸出的結果KV,然後調用客戶指定的outputformat将結果資料輸出到外部存儲
1.5 編寫MapReduce程式
- 基于MapReduce 計算模型編寫分布式并行程式非常簡單,程式員的主要編碼工作就是實作Map 和Reduce函數。
- 其它的并行程式設計中的種種複雜問題,如分布式存儲,工作排程,負載平衡,容錯處理,網絡通信等,均由YARN架構負責處理。
- MapReduce中,map和reduce函數遵循如下正常格式:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
- Mapper的接口:
protected void map(KEY key, VALUE value, Context context)
throws IOException, InterruptedException {
}
- Reduce的接口:
protected void reduce(KEY key, Iterable<VALUE> values,
Context context) throws IOException, InterruptedException {
}
- Mapreduce程式代碼基本結構
maprecue執行個體開發
2.1 程式設計步驟
使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(送出運作mr程式的用戶端)
Mapper的輸入資料是KV對的形式(KV的類型可自定義)
Mapper的輸出資料是KV對的形式(KV的類型可自定義)
Mapper中的業務邏輯寫在map()方法中
map()方法(maptask程序)對每一個<K,V>調用一次
Reducer的輸入資料類型對應Mapper的輸出資料類型,也是KV
Reducer的業務邏輯寫在reduce()方法中
Reducetask程序對每一組相同k的<k,v>組調用一次reduce()方法
使用者自定義的Mapper和Reducer都要繼承各自的父類
整個程式需要一個Drvier來進行送出,送出的是一個描述了各種必要資訊的job對象
2.2 經典的wordcount程式編寫
需求:有一批檔案(規模為TB級或者PB級),如何統計這些檔案中所有單詞出現次數
如有三個檔案,檔案名是qfcourse.txt、qfstu.txt 和 qf_teacher
qf_course.txt内容:
php java linux
bigdata VR
C C++ java web
linux shell
qf_stu.txt内容:
tom jim lucy
lily sally
andy
tom jim sally
qf_teacher内容:
jerry Lucy tom
jim
方案
– 分别統計每個檔案中單詞出現次數 - map()
– 累加不同檔案中同一個單詞出現次數 - reduce()
實作代碼
– 建立一個簡單的maven項目
– 添加hadoop client依賴的jar,pom.xml主要内容如下:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
– 編寫代碼
– 自定義一個mapper類
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将maptask傳給我們的一行的文本内容先轉換成String
String line = value.toString();
//根據空格将這一行切分成單詞
String[] words = line.split(" ");
for(String word:words){
//将單詞作為key,将次數1作為value,以便于後續的資料分發,可以根據單詞分發,以便于相同單詞會到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}
– 自定義一個reduce類
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0; //累加單詞的出現的次數
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
– 編寫一個Driver類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
public static void main(String[] args) throws Exception {
//此代碼需要兩個輸入參數 第一個參數支援要處理的源檔案;第二個參數是處理結果的輸出路徑
if (args == null || args.length == 0) {
args = new String[2];
//路徑都是 hdfs系統的檔案路徑
args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";
args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本程式的jar包所在的本地路徑
job.setJarByClass(WordcountDriver.class);
//指定本業務job要使用的mapper業務類
job.setMapperClass(WordcountMapper.class);
//指定mapper輸出資料的kv類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定本業務job要使用的Reducer業務類
job.setReducerClass(WordcountReducer.class);
//指定最終輸出的資料的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的輸出結果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相關參數,以及job所用的java類所在的jar包,送出給yarn去運作
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
wordcount處理過程
将檔案拆分成splits,由于測試用的檔案較小,是以每個檔案為一個split,并将檔案按行分割形成<key,value>對,下圖所示。這一步由MapReduce架構自動完成,其中偏移量(即key值)包括了回車所占的字元數(Windows/Linux環境不同)。
将分割好的<key,value>對交給使用者定義的map方法進行處理,生成新的<key,value>對,下圖所示。
得到map方法輸出的<key,value>對後,Mapper會将它們按照key值進行排序,并執行Combine過程,将key至相同value值累加,得到Mapper的最終輸出結果。下圖所示。
Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的<key,value>對,并作為WordCount的輸出結果,下圖所示。