天天看點

大資料-hadoop-mapreduce總結

首先mapreduce的核心思想是分而治之:

                                即将大的任務拆分成若幹個小的任務(map階段) ,之後再将小的任務的結果合并(reduce階段)

大資料-hadoop-mapreduce總結

Mapreduce程式設計指導思想:(8個步驟記下來)

mapReduce程式設計模型的總結:

MapReduce的開發一共有八個步驟其中map階段分為2個步驟,shuffle階段4個步驟,reduce階段分為2個步驟

Map階段2個步驟

第一步:設定inputFormat類,将我們的資料切分成key,value對,輸入到第二步

第二步:自定義map邏輯,處理我們第一步的輸入資料,然後轉換成新的key,value對進行輸出

shuffle階段4個步驟

第三步:對輸出的key,value對進行分區(分區個數與reduce task個數相等)。相同key的資料發送到同一個reduce裡面去,相同key合并,value形成一個集合

第四步:對不同分區的資料按照相同的key進行排序

第五步:對分組後的資料進行規約(combine操作),降低資料的網絡拷貝(可選步驟)

第六步:對排序後的資料進行分組,分組的過程中,将相同key的value放到一個集合當中

reduce階段2個步驟

第七步:對多個map的任務進行合并,排序,寫reduce函數自己的邏輯,對輸入的key,value對進行處理,轉換成新的key,value對進行輸出

第八步:設定outputformat将輸出的key,value對資料進行儲存到檔案中

mapReduce程式設計模型的總結:

MapReduce的開發一共有八個步驟其中map階段分為2個步驟,shuffle階段4個步驟,reduce階段分為2個步驟

具體例子-詞頻統計:

建立maven項目 導入需要的依賴  此處就省略了

定義自己的mapper類  

import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import java.io.IOException;
 
 /**
  * 自定義mapper類需要繼承Mapper,有四個泛型,
  * keyin: k1   行偏移量 Long
  * valuein: v1   一行文本内容   String
  * keyout: k2   每一個單詞   String
  * valueout : v2   1         int
  * 在hadoop當中沒有沿用Java的一些基本類型,使用自己封裝了一套基本類型
  * long ==>LongWritable
  * String ==> Text
  * int ==> IntWritable
  *
  */
 public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    /**
      * 繼承mapper之後,覆寫map方法,每次讀取一行資料,都會來調用一下map方法
      * @param key:對應k1
      * @param value:對應v1
      * @param context 上下文對象。承上啟下,承接上面步驟發過來的資料,通過context将資料發送到下面的步驟裡面去
      * @throws IOException
      * @throws InterruptedException
      * k1   v1
      * 0;hello,world
      *
      * k2 v2
      * hello 1
      * world   1
      */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //擷取我們的一行資料
        String line = value.toString();
        String[] split = line.split(",");
        Text text = new Text();
        IntWritable intWritable = new IntWritable(1);
        for (String word : split) {
            //将每個單詞出現都記做1次
            //key2 Text類型
            //v2 IntWritable類型
            text.set(word);
            //将我們的key2 v2寫出去到下遊
            context.write(text,intWritable);
        }
    }
 }
           

定義自己的reduce類

import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 
 import java.io.IOException;
 
 public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    //第三步:分區   相同key的資料發送到同一個reduce裡面去,相同key合并,value形成一個集合
    /**
      * 繼承Reducer類之後,覆寫reduce方法
      * @param key
      * @param values
      * @param context
      * @throws IOException
      * @throws InterruptedException
      */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int result = 0;
        for (IntWritable value : values) {
            //将我們的結果進行累加
            result += value.get();
        }
        //繼續輸出我們的資料
        IntWritable intWritable = new IntWritable(result);
        //将我們的資料輸出
        context.write(key,intWritable);
    }
 }
           

主函數測試

import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /*
 這個類作為mr程式的入口類,這裡面寫main方法
  */
 public class WordCount extends Configured implements Tool{
    /**
      * 實作Tool接口之後,需要實作一個run方法,
      * 這個run方法用于組裝我們的程式的邏輯,其實就是組裝八個步驟
      * @param args
      * @return
      * @throws Exception
      */
    @Override
    public int run(String[] args) throws Exception {
        //擷取Job對象,組裝我們的八個步驟,每一個步驟都是一個class類
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "mrdemo1");
 
        //實際工作當中,程式運作完成之後一般都是打包到叢集上面去運作,打成一個jar包
        //如果要打包到叢集上面去運作,必須添加以下設定
        job.setJarByClass(WordCount.class);
 
        //第一步:讀取檔案,解析成key,value對,k1:行偏移量 v1:一行文本内容
        job.setInputFormatClass(TextInputFormat.class);
        //指定我們去哪一個路徑讀取檔案
        TextInputFormat.addInputPath(job,new Path("file:///D:\\開課吧課程資料\\Hadoop&ZooKeeper課件\\最新版本課件\\hadoop與zookeeper課件資料\\3、第三天\\input"));
        //第二步:自定義map邏輯,接受k1   v1 轉換成為新的k2   v2輸出
        job.setMapperClass(MyMapper.class);
        //設定map階段輸出的key,value的類型,其實就是k2 v2的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //第三步到六步:分區,排序,規約,分組都省略
        //第七步:自定義reduce邏輯
        job.setReducerClass(MyReducer.class);
        //設定key3 value3的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //第八步:輸出k3 v3 進行儲存
        job.setOutputFormatClass(TextOutputFormat.class);
        //一定要注意,輸出路徑是需要不存在的,如果存在就報錯
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\開課吧課程資料\\Hadoop&ZooKeeper課件\\最新版本課件\\hadoop與zookeeper課件資料\\3、第三天\\input\\out_result"));
        //送出job任務
        boolean b = job.waitForCompletion(true);
        return b?0:1;
        /***
          * 第一步:讀取檔案,解析成key,value對,k1   v1
          * 第二步:自定義map邏輯,接受k1   v1 轉換成為新的k2   v2輸出
          * 第三步:分區。相同key的資料發送到同一個reduce裡面去,key合并,value形成一個集合
          * 第四步:排序   對key2進行排序。字典順序排序
          * 第五步:規約 combiner過程 調優步驟 可選
          * 第六步:分組
          * 第七步:自定義reduce邏輯接受k2   v2 轉換成為新的k3   v3輸出
          * 第八步:輸出k3 v3 進行儲存
          *
          *
          */
    }
    /*
    作為程式的入口類
      */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hello","world");
        //送出run方法之後,得到一個程式的退出狀态碼
        int run = ToolRunner.run(configuration, new WordCount(), args);
        //根據我們 程式的退出狀态碼,退出整個程序
        System.exit(run);
    }
           

繼續閱讀