天天看點

MAPREDUCE實踐篇(1)

(1)使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(送出運作mr程式的用戶端)

(2)Mapper的輸入資料是KV對的形式(KV的類型可自定義)

(3)Mapper的輸出資料是KV對的形式(KV的類型可自定義)

(4)Mapper中的業務邏輯寫在map()方法中

(5)map()方法(maptask程序)對每一個<K,V>調用一次

(6)Reducer的輸入資料類型對應Mapper的輸出資料類型,也是KV

(7)Reducer的業務邏輯寫在reduce()方法中

(8)Reducetask程序對每一組相同k的<k,v>組調用一次reduce()方法

(9)使用者自定義的Mapper和Reducer都要繼承各自的父類

(10)整個程式需要一個Drvier來進行送出,送出的是一個描述了各種必要資訊的job對象

需求:在一堆給定的文本檔案中統計輸出每一個單詞出現的總次數

(1)定義一個mapper類

//首先要定義四個泛型的類型

//keyin:  LongWritable    valuein: Text

//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

//map方法的生命周期:  架構每傳一行資料就被調用一次

//key :  這一行的起始點在檔案中的偏移量

//value: 這一行的内容

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//拿到一行資料轉換為string

String line = value.toString();

//将這一行切分出各個單詞

String[] words = line.split(" ");

//周遊數組,輸出<單詞,1>

for(String word:words){

context.write(new Text(word), new IntWritable(1));

}

(2)定義一個reducer類

//生命周期:架構每傳遞進來一個kv 組,reduce方法被調用一次

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

//定義一個計數器

int count = 0;

//周遊這一組kv的所有v,累加到count中

for(IntWritable value:values){

count += value.get();

context.write(key, new IntWritable(count));

(3)定義一個主類,用來描述job并送出job

public class WordCountRunner {

//把業務邏輯相關的資訊(哪個是mapper,哪個是reducer,要處理的資料在哪裡,輸出的結果放哪裡……)描述成一個job對象

//把這個描述好的job送出給叢集去運作

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job wcjob = Job.getInstance(conf);

//指定我這個job所在的jar包

//wcjob.setJar("/home/hadoop/wordcount.jar");

wcjob.setJarByClass(WordCountRunner.class);

wcjob.setMapperClass(WordCountMapper.class);

wcjob.setReducerClass(WordCountReducer.class);

//設定我們的業務邏輯Mapper類的輸出key和value的資料類型

wcjob.setMapOutputKeyClass(Text.class);

wcjob.setMapOutputValueClass(IntWritable.class);

//設定我們的業務邏輯Reducer類的輸出key和value的資料類型

wcjob.setOutputKeyClass(Text.class);

wcjob.setOutputValueClass(IntWritable.class);

//指定要處理的資料所在的位置

FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");

//指定處理完成之後的結果所儲存的位置

FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));

//向yarn叢集送出這個job

boolean res = wcjob.waitForCompletion(true);

System.exit(res?0:1);

(1)mapreduce程式是被送出給LocalJobRunner在本地以單程序的形式運作

(2)而處理的資料及輸出結果可以在本地檔案系統,也可以在hdfs上

(3)怎樣實作本地運作?寫一個程式,不要帶叢集的配置檔案(本質是你的mr程式的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname參數)

(4)本地模式非常便于進行業務邏輯的debug,隻要在eclipse中打斷點即可

如果在windows下想運作本地模式來測試程式邏輯,需要在windows中配置環境變量:

%HADOOP_HOME%  =  d:/hadoop-2.6.1

%PATH% =  %HADOOP_HOME%\bin

并且要将d:/hadoop-2.6.1的lib和bin目錄替換成windows平台編譯的版本

(1)将mapreduce程式送出給yarn叢集resourcemanager,分發到很多的節點上并發執行

(2)處理的資料和輸出結果應該位于hdfs檔案系統

(3)送出叢集的實作步驟:

A、将程式打成JAR包,然後在叢集的任意一個節點上用hadoop指令啟動

     $ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath

B、直接在linux的eclipse中運作main方法

(項目中要帶參數:mapreduce.framework.name=yarn以及yarn的兩個基本配置)

C、如果要在windows的eclipse中送出job給叢集,則要修改YarnRunner類

mapreduce程式在叢集中運作時的大體流程:

附:在windows平台上通路hadoop時改變自身身份辨別的方法之二:

(1)combiner是MR程式中Mapper和Reducer之外的一種元件

(2)combiner元件的父類就是Reducer

(3)combiner和reducer的差別在于運作的位置:

Combiner是在每一個maptask所在的節點運作

Reducer是接收全局所有Mapper的輸出結果;

(4) combiner的意義就是對每一個maptask的輸出進行局部彙總,以減小網絡傳輸量

具體實作步驟:

1、 自定義一個combiner繼承Reducer,重寫reduce方法

2、 在job中設定:  job.setCombinerClass(CustomCombiner.class)

(5) combiner能夠應用的前提是不能影響最終的業務邏輯

而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來

      本文轉自yushiwh 51CTO部落格,原文連結:http://blog.51cto.com/yushiwh/1913043,如需轉載請自行聯系原作者