(1)使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(送出運作mr程式的用戶端)
(2)Mapper的輸入資料是KV對的形式(KV的類型可自定義)
(3)Mapper的輸出資料是KV對的形式(KV的類型可自定義)
(4)Mapper中的業務邏輯寫在map()方法中
(5)map()方法(maptask程序)對每一個<K,V>調用一次
(6)Reducer的輸入資料類型對應Mapper的輸出資料類型,也是KV
(7)Reducer的業務邏輯寫在reduce()方法中
(8)Reducetask程序對每一組相同k的<k,v>組調用一次reduce()方法
(9)使用者自定義的Mapper和Reducer都要繼承各自的父類
(10)整個程式需要一個Drvier來進行送出,送出的是一個描述了各種必要資訊的job對象
需求:在一堆給定的文本檔案中統計輸出每一個單詞出現的總次數
(1)定義一個mapper類
//首先要定義四個泛型的類型
//keyin: LongWritable valuein: Text
//keyout: Text valueout:IntWritable
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//map方法的生命周期: 架構每傳一行資料就被調用一次
//key : 這一行的起始點在檔案中的偏移量
//value: 這一行的内容
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行資料轉換為string
String line = value.toString();
//将這一行切分出各個單詞
String[] words = line.split(" ");
//周遊數組,輸出<單詞,1>
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
(2)定義一個reducer類
//生命周期:架構每傳遞進來一個kv 組,reduce方法被調用一次
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定義一個計數器
int count = 0;
//周遊這一組kv的所有v,累加到count中
for(IntWritable value:values){
count += value.get();
context.write(key, new IntWritable(count));
(3)定義一個主類,用來描述job并送出job
public class WordCountRunner {
//把業務邏輯相關的資訊(哪個是mapper,哪個是reducer,要處理的資料在哪裡,輸出的結果放哪裡……)描述成一個job對象
//把這個描述好的job送出給叢集去運作
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
//指定我這個job所在的jar包
//wcjob.setJar("/home/hadoop/wordcount.jar");
wcjob.setJarByClass(WordCountRunner.class);
wcjob.setMapperClass(WordCountMapper.class);
wcjob.setReducerClass(WordCountReducer.class);
//設定我們的業務邏輯Mapper類的輸出key和value的資料類型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.class);
//設定我們的業務邏輯Reducer類的輸出key和value的資料類型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);
//指定要處理的資料所在的位置
FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
//指定處理完成之後的結果所儲存的位置
FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));
//向yarn叢集送出這個job
boolean res = wcjob.waitForCompletion(true);
System.exit(res?0:1);
(1)mapreduce程式是被送出給LocalJobRunner在本地以單程序的形式運作
(2)而處理的資料及輸出結果可以在本地檔案系統,也可以在hdfs上
(3)怎樣實作本地運作?寫一個程式,不要帶叢集的配置檔案(本質是你的mr程式的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname參數)
(4)本地模式非常便于進行業務邏輯的debug,隻要在eclipse中打斷點即可
如果在windows下想運作本地模式來測試程式邏輯,需要在windows中配置環境變量:
%HADOOP_HOME% = d:/hadoop-2.6.1
%PATH% = %HADOOP_HOME%\bin
并且要将d:/hadoop-2.6.1的lib和bin目錄替換成windows平台編譯的版本
(1)将mapreduce程式送出給yarn叢集resourcemanager,分發到很多的節點上并發執行
(2)處理的資料和輸出結果應該位于hdfs檔案系統
(3)送出叢集的實作步驟:
A、将程式打成JAR包,然後在叢集的任意一個節點上用hadoop指令啟動
$ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath
B、直接在linux的eclipse中運作main方法
(項目中要帶參數:mapreduce.framework.name=yarn以及yarn的兩個基本配置)
C、如果要在windows的eclipse中送出job給叢集,則要修改YarnRunner類
mapreduce程式在叢集中運作時的大體流程:
附:在windows平台上通路hadoop時改變自身身份辨別的方法之二:
(1)combiner是MR程式中Mapper和Reducer之外的一種元件
(2)combiner元件的父類就是Reducer
(3)combiner和reducer的差別在于運作的位置:
Combiner是在每一個maptask所在的節點運作
Reducer是接收全局所有Mapper的輸出結果;
(4) combiner的意義就是對每一個maptask的輸出進行局部彙總,以減小網絡傳輸量
具體實作步驟:
1、 自定義一個combiner繼承Reducer,重寫reduce方法
2、 在job中設定: job.setCombinerClass(CustomCombiner.class)
(5) combiner能夠應用的前提是不能影響最終的業務邏輯
而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
本文轉自yushiwh 51CTO部落格,原文連結:http://blog.51cto.com/yushiwh/1913043,如需轉載請自行聯系原作者