天天看點

MapReduce的幾個企業級經典面試案例MapReduce的幾個企業級經典面試案例

MapReduce的幾個企業級經典面試案例

一、官方統計案例:

  • 要求:統計一下單詞出現的次數
  1. 測試資料:
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
               
  2. 編寫代碼:
    • mapper類
      /**
       * @author 17616
       */
      public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 首先擷取一行資料
              String line = value.toString ();
              // 将行内的單詞進行切分,使用一個數組進行儲存,切分資料時根據源資料得知可以使用空格的方式切分。
              String[] arr = line.split (" ");
              for (String str : arr) {
                  context.write (new Text (str), new LongWritable (1));
              }
          }
      }
                 
    • reducer類
      /**
       * @author 17616
       */
      public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
          @Override
          public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
              // 定義變量記錄單詞出現的次數
              long sum = 0;
              for (LongWritable val : values) {
                  // 記錄總次數
                  sum += val.get ();
              }
              // 輸出資料,key就是單詞,value就是在map階段這個單詞出現的總次數
              context.write (key, new LongWritable (sum));
          }
      }
                 
    • Driver類
      /**
       * @author 17616
       * 官方案例,計算統計
       */
      public class WordCountDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              // 擷取目前的預設配置
              Configuration conf = new Configuration ();
      
              // 擷取代表目前mr作業的job對象
              Job job = Job.getInstance (conf);
              // 指定一下目前程式的入口類
              job.setJarByClass (WordCountDriver.class);
      
              //指定目前Mapper、Reducer任務的類
              job.setMapperClass (WordCountMapper.class);
              job.setReducerClass (WordCountReducer.class);
      
              //設定Mapper的結果類型
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (LongWritable.class);
      
              // 設定Reducer的結果類型
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (LongWritable.class);
      
              //設定待分析的檔案夾路徑(linux的路徑位址)
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/mapreduce"));
      
              if (!job.waitForCompletion (true)) {
                  return;
              }
          }
      }
                 
  3. 運作結果:
    zhangqin	20
    zhangrui	20
    zhangyong	20
               

二、計算平均值:

  • 要求:計算一下資料的平均值
    1. 測試資料:
      tom 69
      tom 84
      tom 68
      jary 89
      jary 90
      jary 81
      jary 35
      alex 23
      alex 100
      alex 230
                 
    2. 編寫代碼:
      • mapper類
        /**
         * @Author zhangyong
         * @Date 2020/4/3 23:43
         * @Version 1.0
         */
        public class AverageMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //擷取每行的資料内容
                String line = value.toString ();
                //按照空格去切會擷取到多個資料,是以用數組的方式存儲
                String[] data = line.split (" ");
                String name = data[0];
                //Integer做一個資料類型的強制轉換。
                int score = Integer.parseInt (data[1]);
                //輸出資料
                context.write (new Text (name), new IntWritable (score));
            }
        }
                   
      • reducer類
        /**
         * @Author zhangyong
         * @Date 2020/4/3 23:43
         * @Version 1.0
         */
        public class AverageReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            public void reduce(Text name, Iterable<IntWritable> scores, Context context) throws IOException, InterruptedException {
                int i = 0;
                int score = 0;
                for (IntWritable data : scores) {
                    score = score + data.get ();
                    i++;
                }
                int average = score / i;
                context.write (name, new IntWritable (average));
            }
        }
                   
      • Driver類
        /**
         * @Author zhangyong
         * @Date 2020/4/3 23:41
         * @Version 1.0
         * 計算平均值
         */
        public class AverageDriver {
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration ();
                Job job = Job.getInstance (conf);
        
                //驅動類,入口類
                job.setJarByClass (AverageDriver.class);
        
                //設定Mapper和Reducer的類
                job.setMapperClass (AverageMapper.class);
                job.setReducerClass (AverageReducer.class);
        
                //設定Mapper的結果類型
                job.setMapOutputKeyClass (Text.class);
                job.setMapOutputValueClass (IntWritable.class);
        
                //設定Reduce的結果類型
                job.setOutputKeyClass (Text.class);
                job.setOutputValueClass (IntWritable.class);
        
                //設定待分析的檔案夾路徑(linux的路徑位址)
                FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/average"));
                FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/average"));
        
                //送出到job
                job.waitForCompletion (true);
        
            }
        }
                   
    3. 運作結果:
      alex 117
      jary 73
      tom	73
                 

三、求溫度最高值:

  • 要求:求出一下年限的時間的最高溫度
  1. 測試資料:
    2329999919500515070000
    9909999919500515120022
    9909999919500515180011
    9509999919490324120111
    6509999919490324180078
    9909999919370515070001
    9909999919370515120002
    9909999919450515180001
    6509999919450324120002
    8509999919450324180078
               
  2. 編寫代碼:
    • mapper類
      /**
       * @author 17616
       */
      public class HeightMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              //擷取一段資料
              String line = value.toString ();
      
              //擷取年份
              String year = line.substring (8, 12);
      
              //擷取溫度(強制轉換一下)
              int t = Integer.parseInt (line.substring (18, 22));
      
              context.write (new Text (year),new LongWritable (t));
      
          }
      }
                 
    • reducer類
      /**
       * @author 17616
       */
      public class HeightReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
          @Override
          public void reduce(Text year, Iterable<LongWritable> t, Context context) throws IOException, InterruptedException {
              long max = 0;
              for (LongWritable data : t) {
                  if (max < data.get ()) {
                      max = data.get ();
                  }
              }
              context.write (year, new LongWritable (max));
          }
      }
                 
    • Driver類
      /**
       * @author 17616
       * -求最大值
       */
      public class HeightDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              // 擷取目前的預設配置
              Configuration conf = new Configuration ();
      
              // 擷取代表目前mr作業的job對象
              Job job = Job.getInstance (conf);
      
              // 指定一下目前程式的入口類
              job.setJarByClass (HeightDriver.class);
      
              //指定目前Mapper、Reducer任務的類
              job.setMapperClass (HeightMapper.class);
              job.setReducerClass (HeightReducer.class);
      
              //設定Mapper的結果類型
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (LongWritable.class);
      
              // 設定Reducer的結果類型
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (LongWritable.class);
      
              //設定待分析的檔案夾路徑(linux的路徑位址)
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/wendu/"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wendu"));
      
              job.waitForCompletion (true);
      
          }
      }
                 
  3. 運作結果:
    1937	2
    1945	78
    1949	111
    1950	22
               

四、資料去重:

  • 要求:去重一下ip位址
  1. 測試資料:
    192.168.234.21
    192.168.234.22
    192.168.234.21
    192.168.234.21
    192.168.234.23
    192.168.234.21
    192.168.234.21
    192.168.234.21
    192.168.234.25
    192.168.234.21
    192.168.234.21
    192.168.234.26
    192.168.234.21
    192.168.234.27
    192.168.234.21
    192.168.234.27
    192.168.234.21
    192.168.234.29
    192.168.234.21
    192.168.234.26
    192.168.234.21
    192.168.234.25
    192.168.234.25
    192.168.234.25
    192.168.234.21
    192.168.234.22
    192.168.234.21
               
  2. 編寫代碼:
    • mapper類
      /**
       * @Author zhangyong
       * @Date 2020/4/7 19:53
       * @Version 1.0
       */
      public class DisMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              /**
               * 其中value隻是一個變量,此處被當做key進行輸出
               */
              context.write (value,NullWritable.get ());
          }
      }
                 
    • reducer類
      /**
       * @Author zhangyong
       * @Date 2020/4/7 21:21
       * @Version 1.0
       */
      public class DisReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
      
          @Override
          public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
              context.write (key, NullWritable.get ());
          }
      
      }
                 
    • Driver類
      /**
       * @Author zhangyong
       * @Date 2020/4/7 21:32
       * @Version 1.0
       * 資料去重
       */
      public class DisDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
      
              //設定Drive類
              job.setJarByClass (DisReducer.class);
      
              //設定Mapper、Reduce類
              job.setMapperClass (DisMapper.class);
              job.setReducerClass (DisReducer.class);
      
              //Mapper的輸出
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (NullWritable.class);
      
              //位址
              FileInputFormat.setInputPaths (job,new Path ("hdfs://anshun115:9000/distinct"));
              FileOutputFormat.setOutputPath (job,new Path ("hdfs://anshun115:9000/result/distinct"));
      
              job.waitForCompletion (true);
          }
      }
                 
  3. 運作結果:
    192.168.234.21
    192.168.234.22
    192.168.234.23
    192.168.234.25
    192.168.234.26
    192.168.234.27
    192.168.234.29
               

五、流量統計:

  • 要求:統計一下手機号碼使用的流量數
  1. 測試資料:
    13901000123 zs bj 343
    13202111011 ww sh 456
    13901000123 zs bj 1024
    13207551234 ls sz 758
               
  2. 編寫代碼:
    • Bean類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:01
       * @Version 1.0
       */
      public class FlowBean implements Writable {
          private String phone;
          private String name;
          private String addr;
          private long flow;
      
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (phone);
              dataOutput.writeUTF (name);
              dataOutput.writeUTF (addr);
              dataOutput.writeLong (flow);
          }
      
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.phone = dataInput.readUTF ();
              this.name = dataInput.readUTF ();
              this.addr = dataInput.readUTF ();
              this.flow = dataInput.readLong ();
          }
      
          public String getPhone() {
              return phone;
          }
      
          public void setPhone(String phone) {
              this.phone = phone;
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public String getAddr() {
              return addr;
          }
      
          public void setAddr(String addr) {
              this.addr = addr;
          }
      
          public long getFlow() {
              return flow;
          }
      
          public void setFlow(long flow) {
              this.flow = flow;
          }
      
          @Override
          public String toString() {
              return "FlowBean{" +
                      "phone='" + phone + '\'' +
                      ", name='" + name + '\'' +
                      ", addr='" + addr + '\'' +
                      ", flow=" + flow +
                      '}';
          }
      }
                 
    • mapper類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:10
       * @Version 1.0
       */
      public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              //擷取行
              String line = value.toString ();
              /**
               * [13901000123,zk,bj,343]
               *  phone = 13901000123;
               *  name = zk;
               *  addr = bj;
               *  flow = 343;
               */
              String[] info = line.split (" ");
              FlowBean flowBean = new FlowBean ();
      
              flowBean.setPhone (info[0]);
              flowBean.setName (info[1]);
              flowBean.setAddr (info[2]);
              flowBean.setFlow (Integer.parseInt (info[3]));
      
              context.write (new Text (flowBean.getName ()), flowBean);
          }
      }
                 
    • reducer類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:23
       * @Version 1.0
       */
      public class FlowReducer extends Reducer<Text, FlowBean, FlowBean, NullWritable> {
          @Override
          public void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
              FlowBean result = new FlowBean ();
      
              for (FlowBean value : values) {
                  result.setPhone (value.getPhone ());
                  result.setName (value.getName ());
                  result.setAddr (value.getAddr ());
                  result.setFlow (result.getFlow () + value.getFlow ());
              }
              context.write (result, NullWritable.get ());
          }
      }
                 
    • Driver類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:28
       * @Version 1.0
       * 流量統計
       */
      public class FlowDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              // 加載主類
              job.setJarByClass (FlowDriver.class);
      
              //加載mapper、reduce類
              job.setMapperClass (FlowMapper.class);
              job.setReducerClass (FlowReducer.class);
      
              //設定map的的key、value
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(FlowBean.class);
      
              //設定輸出的的key、value
              job.setOutputKeyClass(FlowBean.class);
              job.setOutputValueClass (NullWritable.class);
      
              //設定路徑(傳輸、結果)
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/flow"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/flow"));
              job.waitForCompletion (true);
          }
      }
                 
  3. 運作結果:
    FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}
    FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}
    FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}
               

六、電影的排行榜:

  • 要求:按照降序排列一下電影的熱度
  1. 測試資料:
    中國機長 72
    機械師2 83
    奇異博士 87
    流浪地球 79
    複仇者聯盟4:終局之戰 94
    驚奇隊長 68
    蜘蛛俠:英雄遠征 80
    長城 56
    奪路而逃 69 
    神奇動物在哪裡 57
    驢得水 59
    我不是潘金蓮 55
    速度與激情:特别行動 77
    哪吒之魔童降世 96
    捉迷藏 78
    上海堡壘 9
    葉問4 75
    勇士之門 35
    羅曼蒂克消亡史 67
    阿麗塔:戰鬥天使 89
               
  2. 編寫代碼:
    • Bean類
      /**
       * @Author zhangyong
       * @Date 2020/4/13 8:42
       * @Version 1.0
       */
      public class MovieBean implements WritableComparable<MovieBean> {
          private String name;
          private int hot;
      
          /**
           * 排序方法
           *
           * @param o
           * @return
           */
          @Override
          public int compareTo(MovieBean o) {
              return o.hot - this.hot;
          }
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (name);
              dataOutput.writeInt (hot);
          }
      
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.name = dataInput.readUTF ();
              this.hot = dataInput.readInt ();
          }
      
      
          public void setName(String name) {
              this.name = name;
          }
      
          public int getHot() {
              return hot;
          }
      
          public void setHot(int hot) {
              this.hot = hot;
          }
      
          @Override
          public String toString() {
              return "MovieBean{" +
                      "name='" + name + '\'' +
                      ", hot=" + hot +
                      '}';
          }
      }
                 
    • mapper類
      /**
       * @Author zhangyong
       * @Date 2020/4/13 8:52
       * @Version 1.0
       */
      public class MovieMapper extends Mapper<LongWritable, Text, MovieBean, NullWritable> {
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              //擷取一行
              String line = value.toString ();
              //截取資料
              String[] split = line.split (" ");
              //封裝對象
              MovieBean movieBean = new MovieBean ();
              movieBean.setName (split[0]);
              movieBean.setHot (Integer.parseInt (split[1]));
      
              //輸出
              context.write (movieBean, NullWritable.get ());
      
          }
      }
                 
    • reducer類
    • Driver類
      /**
       * @Author zhangyong
       * @Date 2020/4/13 9:19
       * @Version 1.0
       */
      public class MovieDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
      
              job.setJarByClass (MovieDriver.class);
      
              job.setMapperClass (MovieMapper.class);
      
              //加載map輸出類型和value的輸出類型
              job.setMapOutputKeyClass (MovieBean.class);
              job.setMapOutputValueClass (NullWritable.class);
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/sort"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/sort"));
      
              job.waitForCompletion (true);
          }
      }
                 
  3. 運作結果:
    MovieBean{name='哪吒之魔童降世', hot=96}
    MovieBean{name='複仇者聯盟4:終局之戰', hot=94}
    MovieBean{name='阿麗塔:戰鬥天使', hot=89}
    MovieBean{name='奇異博士', hot=87}
    MovieBean{name='機械師2', hot=83}
    MovieBean{name='蜘蛛俠:英雄遠征', hot=80}
    MovieBean{name='流浪地球', hot=79}
    MovieBean{name='捉迷藏', hot=78}
    MovieBean{name='速度與激情:特别行動', hot=77}
    MovieBean{name='葉問4', hot=75}
    MovieBean{name='中國機長', hot=72}
    MovieBean{name='奪路而逃', hot=69}
    MovieBean{name='驚奇隊長', hot=68}
    MovieBean{name='羅曼蒂克消亡史', hot=67}
    MovieBean{name='驢得水', hot=59}
    MovieBean{name='神奇動物在哪裡', hot=57}
    MovieBean{name='長城', hot=56}
    MovieBean{name='我不是潘金蓮', hot=55}
    MovieBean{name='勇士之門', hot=35}
    MovieBean{name='上海堡壘', hot=9}
               

七、多個檔案統計成績:

  • 要求:根據三張表統計每個同學的各課成績的總和
  1. 測試資料:

    chinese.txt

    1 alex 89
    2 alex 73
    3 alex 67
    1 romeo 49
    2 romeo 83
    3 romeo 27
    1 lee 77
    2 lee 66
    3 lee 89
               
    english.txt
    1 alex 55
    2 alex 69
    3 alex 75
    1 romeo 44
    2 romeo 64
    3 romeo 86
    1 lee 76
    2 lee 84
    3 lee 93
               
    math.txt
    1 alex 85
    2 alex 59
    3 alex 95
    1 romeo 74
    2 romeo 67
    3 romeo 96
    1 lee 45
    2 lee 76
    3 lee 67
               
  2. 編寫代碼:
    • Bean類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:00
       * @Version 1.0
       */
      public class ScoreBean implements Writable {
          private String name;
          private int chinese;
          private int math;
          private int english;
      
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF(name);
              dataOutput.writeInt(chinese);
              dataOutput.writeInt(math);
              dataOutput.writeInt(english);
          }
      
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.name = dataInput.readUTF();
              this.chinese = dataInput.readInt();
              this.math = dataInput.readInt();
              this.english = dataInput.readInt();
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public int getChinese() {
              return chinese;
          }
      
          public void setChinese(int chinese) {
              this.chinese = chinese;
          }
      
          public int getMath() {
              return math;
          }
      
          public void setMath(int math) {
              this.math = math;
          }
      
          public int getEnglish() {
              return english;
          }
      
          public void setEnglish(int english) {
              this.english = english;
          }
      
          @Override
          public String toString() {
              return "StuScore{" +
                      "name='" + name + '\'' +
                      ", chinese=" + chinese +
                      ", math=" + math +
                      ", english=" + english +
                      '}';
          }
      }
                 
    • mapper類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:03
       * @Version 1.0
       */
      public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
              // 擷取一行資料
              String line = value.toString();
              String[] data = line.split(" ");
      
              ScoreBean ss = new ScoreBean();
              ss.setName(data[1]);
      
              /**
               * 注意,此處導包的時候不要導錯,應該導入的是org.apache.hadoop.mapreduce.lib.input.FileSplit;
               * 通過擷取目前map階段的MapTask處理的切片資訊來擷取檔案名。
               */
              FileSplit split = (FileSplit) context.getInputSplit();
      
              if (split.getPath().getName().equals("chinese.txt")) {
                  ss.setChinese(Integer.parseInt(data[2]));
              } else if (split.getPath().getName().equals("math.txt")) {
                  ss.setMath(Integer.parseInt(data[2]));
              } else if (split.getPath().getName().equals("english.txt")) {
                  ss.setEnglish(Integer.parseInt(data[2]));
              }
      
              context.write(new Text(ss.getName()), ss);
          }
      }
                 
    • reducer類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:05
       * @Version 1.0
       */
      public class ScoreReducer extends Reducer<Text, ScoreBean, Text, ScoreBean> {
          @Override
          protected void reduce(Text key, Iterable<ScoreBean> values, Context context) throws IOException, InterruptedException {
      
              ScoreBean resultScore = new ScoreBean();
              // 此處key.toSting中隻有name一個值,因為在map階段的輸出key隻有name
              resultScore.setName(key.toString());
      
              for (ScoreBean value : values) {
                  //  result.setFlow(result.getFlow() + value.getFlow());
                  // 國文成績分數
                  resultScore.setChinese(resultScore.getChinese() + value.getChinese());
                  // 數學成績分
                  resultScore.setMath(resultScore.getMath() + value.getMath());
                  // 英語成績分
                  resultScore.setEnglish(resultScore.getEnglish() + value.getEnglish());
              }
              context.write(key, resultScore);
          }
      }
                 
    • Driver類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:10
       * @Version 1.0
       * 統計成績
       */
      public class ScoreDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              //加載Drive類
              job.setJarByClass (ScoreDriver.class);
              //加載Mapper、Reducer類
              job.setMapperClass (ScoreMapper.class);
              job.setReducerClass (ScoreReducer.class);
              //加載map輸出類型和value的輸出類型
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (ScoreBean.class);
      
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (ScoreBean.class);
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/score"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/score"));
      
              job.waitForCompletion (true);
          }
      }
                 
  3. 運作結果:
    alex StuScore{name='alex', chinese=229, math=239, english=199}
    lee	StuScore{name='lee', chinese=232, math=188, english=253}
    romeo StuScore{name='romeo', chinese=159, math=237, english=194}
               

八、Job鍊處理資料:

  • 要求:統計每個同學的總分
  1. 測試資料:
    1|zhang 100
    2|wang 200
    3|zhang 150
    4|lisi 190
    5|wang 50
    6|zhang 80
    7|lisi 50
               
  2. 編寫代碼:
    • Bean類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 15:06
       * @Version 1.0
       */
      public class CountBean implements WritableComparable<CountBean> {
          private String name;
          private int count;
      
          @Override
          public int compareTo(CountBean o) {
              return o.count - this.count;
          }
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (name);
              dataOutput.writeInt (count);
          }
      
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.name = dataInput.readUTF ();
              this.count = dataInput.readInt ();
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public int getCount() {
              return count;
          }
      
          public void setCount(int count) {
              this.count = count;
          }
      
          @Override
          public String toString() {
              return "CountBean{" +
                      "name='" + name + '\'' +
                      ", count=" + count +
                      '}';
          }
      }
                 
    • mapper類

      OneCountMapper類

      /**
       * @Author zhangyong
       * @Date 2020/4/14 8:54
       * @Version 1.0
       */
      public class OneCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String line = value.toString ();
              /**
               * 通過 | 進行切分時會得到一個數組,數組中的0号下标為序号,1号下标中有name和profit的資料。
               * 再通過切分1号下标中的資料時可以擷取到name和profit的資料。
               */
              String name = line.split ("\\|")[1].split (" ")[0];
              int count = Integer.parseInt (line.split ("\\|")[1].split (" ")[1]);
      
              context.write (new Text (name),new IntWritable (count));
              
          }
      }
                 
      TwoCountMapper類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 15:27
       * @Version 1.0
       */
      public class TwoCountMapper extends Mapper<LongWritable, Text, CountBean, NullWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String line = value.toString ();
              /**
               * 因為第二個Mapper任務要讀取的資料内容是第一個MR任務的結果檔案,通常MR任務的結果檔案是以TAB的方式來展示資料的。
               * 是以當第二個Mapper任務要執行切分時,所使用的分隔符應該是\t——制表符。
               */
              String name = line.split ("\t")[0];
              int count = Integer.parseInt (line.split ("\t")[1]);
      
              CountBean bean = new CountBean ();
              bean.setName (name);
              bean.setCount (count);
      
              context.write (bean, NullWritable.get ());
          }
      }
                 
    • reducer類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 8:54
       * @Version 1.0
       */
      public class OneCountReducer extends Reducer<Text , IntWritable, Text, IntWritable> {
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              //統計利潤
              int sum = 0;
              for (IntWritable value : values) {
                  sum+=value.get ();
              }
              context.write (key,new IntWritable (sum));
          }
      }
                 
    • Driver類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 8:54
       * @Version 1.0
       * job鍊操作資料
       */
      public class CountDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              job.setJarByClass (CountDriver.class);
              job.setMapperClass (OneCountMapper.class);
              job.setReducerClass (OneCountReducer.class);
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (IntWritable.class);
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (IntWritable.class);
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/count"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/count"));
              if (job.waitForCompletion (true)) {
                  // 設定第二個Job任務
                  Job job2 = Job.getInstance (conf);
                  // 設定第二個Job任務的Mapper
                  job2.setMapperClass (TwoCountMapper.class);
                  job2.setMapOutputKeyClass (CountBean.class);
                  job2.setMapOutputValueClass (NullWritable.class);
                  /**
                   * 設定第二個Job任務是輸入輸出路徑。
                   * 此處的輸入路徑是第一個job任務的輸出路徑
                   * 注意設定路徑時,裡面傳入的job應該是目前的job任務,如下所示,應該是job2。
                   * 如果寫成前面的job任務名稱,在運作時則會爆出錯誤,提示路徑不存在。
                   */
                  FileInputFormat.setInputPaths (job2, new Path ("hdfs://anshun115:9000/result/count"));
                  FileOutputFormat.setOutputPath (job2, new Path ("hdfs://anshun115:9000/result/count2"));
                  // 此處送出任務時,注意用的是job2。
                  job2.waitForCompletion (true);
              }
          }
      
      }
                 
  3. 運作結果:

    count.txt

    lisi	240
    wang	250
    zhang	330
               
    count2.txt
    CountBean{name='zhang', count=330}
    CountBean{name='wang', count=250}
    CountBean{name='lisi', count=240}
               

九、簡單分區案例:

  • 要求:分區顯示手機使用流量的總和
    1. 測試資料:
      13901000123 zs bj 343
      13202111011 ww sh 456
      13901000123 zs bj 1024
      13207551234 ls sz 758
                 
    2. 編寫代碼:
    
       - Partitioner類
    
         ```java
         /**
          * @Author zhangyong
       * @Date 2020/4/10 10:42
          * @Version 1.0
       */
         public class AddPartitioner extends Partitioner<Text, PartFlowBean> {
             @Override
             public int getPartition(Text text, PartFlowBean flowBean, int
                  numPartitioner) {
                 String addr = flowBean.getAddr();
              if (addr.equals("bj")) {
                     return 0;
                 } else if (addr.equals("sh")) {
                     return 1;
              } else {
                     return 2;
              }
             }
         }
               
    • Bean類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:01
       * @Version 1.0
       */
      public class PartFlowBean implements Writable {
          private String phone;
          private String name;
          private String addr;
          private long flow;
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (phone);
              dataOutput.writeUTF (name);
              dataOutput.writeUTF (addr);
              dataOutput.writeLong (flow);
          }
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.phone = dataInput.readUTF ();
              this.name = dataInput.readUTF ();
              this.addr = dataInput.readUTF ();
              this.flow = dataInput.readLong ();
          }
      
          public String getPhone() {
              return phone;
          }
      
          public void setPhone(String phone) {
              this.phone = phone;
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public String getAddr() {
              return addr;
          }
      
          public void setAddr(String addr) {
              this.addr = addr;
          }
      
          public long getFlow() {
              return flow;
          }
      
          public void setFlow(long flow) {
              this.flow = flow;
          }
      
          @Override
          public String toString() {
              return "FlowBean{" +
                      "phone='" + phone + '\'' +
                      ", name='" + name + '\'' +
                      ", addr='" + addr + '\'' +
                      ", flow=" + flow +
                      '}';
          }
      }
                 
    • mapper類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:51
       * @Version 1.0
       */
      public class PartFlowMapper extends Mapper<LongWritable, Text, Text, PartFlowBean> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws
                  IOException, InterruptedException {
              String line = value.toString ();
               /**
               [13901000123,zk,bj,343]
               phone = 13901000123;
               name = zk;
               addr = bj;
               flow = 343;
               */
              String[] info = line.split (" ");
              PartFlowBean flowBean = new PartFlowBean ();
              flowBean.setPhone (info[0]);
              flowBean.setName (info[1]);
              flowBean.setAddr (info[2]);
              flowBean.setFlow (Integer.parseInt (info[3]));
              context.write (new Text (flowBean.getName ()), flowBean);
          }
      }
                 
    • reducer類
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:23
       * @Version 1.0
       */
      public class PartFlowReducer extends Reducer<Text, PartFlowBean, PartFlowBean,
              NullWritable> {
          @Override
          public void reduce(Text key, Iterable<PartFlowBean> values, Context
                  context) throws IOException, InterruptedException {
              PartFlowBean result = new PartFlowBean ();
              for (PartFlowBean value : values) {
                  result.setPhone (value.getPhone ());
                  result.setPhone (value.getPhone ());
                  result.setName (value.getName ());
                  result.setAddr (value.getAddr ());
                  result.setFlow (result.getFlow () + value.getFlow ());
              }
              context.write (result, NullWritable.get ());
          }
      }
                 
    • Driver類
      /**
       * @Author zhangyong
       * @Date 2020/4/11 11:17
       * @Version 1.0
       * 分區案例
       */
      public class PartFlowDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
      
              Job job = Job.getInstance (conf);
      
              job.setJarByClass (PartFlowDriver.class);
      
              job.setMapperClass (PartFlowMapper.class);
              job.setReducerClass (PartFlowReducer.class);
              /**
               * 下面的兩個類如果不寫的話,那麼就不會生效。
               */
              // 設定分區類
              job.setPartitionerClass (AddPartitioner.class);
              // 設定分區數量
              job.setNumReduceTasks (3);
      
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (PartFlowBean.class);
      
              job.setOutputKeyClass (PartFlowBean.class);
              job.setOutputValueClass (NullWritable.class);
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/partition"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/partition"));
      
              job.waitForCompletion (true);
          }
      }
                 
  1. 運作結果:

    part-r-00000

    FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}
               
    part-r-00001
    FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}
               
    part-r-00002
    FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}
               

十、分區并全排序:

  • 要求:把一下資料按照兩位數、三位數、四位數以上進行分區,并且按照大小排序
  1. 測試資料:
    82 239 231
    23 22 213
    123 232 124
    213 3434 232
    4546 565 123
    231 231
    2334 231
    1123 5656 657
    12313 4324 213
    123 2 232 32
    343 123 4535
    12321 3442 453
    1233 342 453
    1231 322 452
    232 343 455
    3123 3434 3242
               
  2. 編寫代碼:
    • Partitioner類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:39
       * @Version 1.0
       * 全排序
       * 将上述檔案内容按照數字位數分别寫入三個檔案,如下
       * 0-99的寫入到檔案1
       * 100-999寫入到檔案2
       * 1000-其他資料寫入到檔案3
       */
      public class AutoPartitioner extends Partitioner<IntWritable, IntWritable> {
          @Override
          public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
              String num = String.valueOf (key.get ());
              if (num.matches ("[0-9][0-9]") || num.matches ("[0-9]")) {
                  return 0;
              } else if (num.matches ("[0-9][0-9][0-9]")) {
                  return 1;
              } else {
                  return 2;
              }
          }
      }
                 
    • mapper類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:44
       * @Version 1.0
       */
      public class NumSortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String line = value.toString ();
              String[] data = line.split (" ");
              for (String num : data) {
                  context.write (new IntWritable (Integer.parseInt (num)), new IntWritable (1));
              }
          }
      }
                 
    • reducer類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:39
       * @Version 1.0
       */
      public class NumSortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
          @Override
          protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              int result = 0;
              for (IntWritable count : values) {
                  result = result + count.get ();
              }
              context.write (key, new IntWritable (result));
          }
      }
                 
    • Driver類
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:39
       * @Version 1.0
       *
       */
      public class NumSortDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              job.setJarByClass (NumSortDriver.class);
              job.setMapperClass (NumSortMapper.class);
              job.setMapOutputKeyClass (IntWritable.class);
              job.setMapOutputValueClass (IntWritable.class);
              /**
               * 由于結果檔案系統是3個,是以需要在此指定Reduce的分區類和任務數。
               */
              job.setPartitionerClass (AutoPartitioner.class);
              job.setNumReduceTasks (3);
              job.setReducerClass (NumSortReducer.class);
              job.setOutputKeyClass (IntWritable.class);
              job.setOutputValueClass (IntWritable.class);
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/numcount/"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/numcount"));
              job.waitForCompletion (true);
          }
      }
                 
  3. 運作結果:

    part-r-00000

    2	1
    22	1
    23	1
    32	1
    82	1
               
    part-r-00001
    123	4
    124	1
    213	3
    231	4
    232	4
    239	1
    322	1
    342	1
    343	2
    452	1
    453	2
    455	1
    565	1
    657	1
               
    part-r-00002
    1123	1
    1231	1
    1233	1
    2334	1
    3123	1
    3242	1
    3434	2
    3442	1
    4324	1
    4535	1
    4546	1
    5656	1
    12313	1
    12321	1
               

十一、Combine提高運作效率:

  • 要求:使用Combine類統計一下單詞出現的次數
    1. 測試資料:
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
                 
    2. 編寫代碼:
      • mapper類
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:30
         * @Version 1.0
         */
        public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString ();
                String[] words = line.split (" ");
                for (String word : words) {
                    context.write (new Text (word), new IntWritable (1));
                }
            }
        }
                   
      • Combine類
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:34
         * @Version 1.0
         */
        public class WcCombine extends Reducer<Text,IntWritable,Text,IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count=0;
                for (IntWritable value : values) {
                    count+=value.get ();
                }
                context.write (key,new IntWritable (count));
            }
        }
                   
      • reducer類
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:34
         * @Version 1.0
         */
        public class WcReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count = 0;
                for (IntWritable value : values) {
                    count+=value.get ();
                    System.err.println(key + ":" + value);
                }
                context.write (key,new IntWritable (count));
            }
        }
                   
      • Driver類
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:42
         * @Version 1.0
         */
        public class WcDriver {
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration ();
                Job job = Job.getInstance (conf);
        
                job.setJarByClass (WcDriver.class);
        
                job.setMapperClass (WcMapper.class);
                job.setReducerClass (WcReducer.class);
        
                job.setMapOutputKeyClass (Text.class);
                job.setMapOutputValueClass (IntWritable.class);
        
                /**
                 * 設定combine元件類,如果不設定,預設是不執行combine過程的。
                 * 設定combine的目的是為了讓合并工作提前發生一次,在MapTask階段時合并一次,使Reduce階段的工作負載。
                 * 需要注意的是combine僅僅是做合并的工作,減少工作負載,并不能影響最終的檔案結果。
                 */
                job.setCombinerClass(WcCombine.class);
        
                job.setOutputKeyClass (Text.class);
                job.setOutputValueClass (IntWritable.class);
        
                FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce"));
                FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wccombine"));
        
                job.waitForCompletion (true);
            }
        }
                   
    3. 運作結果:
      zhangqin	20
      zhangrui	20
      zhangyong	20
                 

十二、推薦認識好友:

  • 要求:找出一下朋友的潛在朋友(一度二度朋友關系鍊)
  1. 測試資料:
    tom rose
    tom jim
    tom smith
    tom lucy
    rose tom
    rose lucy
    rose smith
    jim tom
    jim lucy
    jim smith
    smith jim
    smith tom
    smith rose
               
  2. 編寫代碼:
    • 第一個mapper類
      /**
       * @Author 張勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:08
       */
      public class OneFriendMapper extends Mapper<LongWritable, Text, Text, Text> {
          /**
           * 輸入的key和value是根據檔案内容來确定。
           * 輸出的key和value是因為在業務邏輯中設定的輸出是name-friend好友關系。
           */
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 擷取每行的資料
              String line = value.toString();
              // 擷取姓名
              String name = line.split(" ")[0];
              // 擷取好友
              String friend = line.split(" ")[1];
              context.write(new Text(name), new Text(friend));
          }
      }
                 
    • 第一個reducer類
      /**
       * @Author 張勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:28
       */
      public class OneFriendReducer extends Reducer<Text, Text, Text, IntWritable> {
          /**
           * 輸入key和value要和mapper的輸出保持一緻。
           * Text和IntWritable:
           * 如果是好友-1,如果不是好友就用-2。
           */
          @Override
          protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
              ArrayList<String> friendList = new ArrayList<>();
              //處理好友關系
              for (Text value : values) {
                  friendList.add(value.toString());
                  if (key.toString().compareTo(value.toString()) < 0) {
                      context.write(new Text(key + "-" + value), new IntWritable(1));
                  } else {
                      context.write(new Text(value + "-" + key), new IntWritable(1));
                  }
              }
              // 處理可能相識的好友。
              for (int i = 0; i < friendList.size(); i++) {
                  for (int j = 0; j < friendList.size(); j++) {
                      String friend1 = friendList.get(i);
                      String friend2 = friendList.get(j);
                      if (friend1.compareTo(friend2) < 0) {
                          context.write(new Text(friend1 + "-" + friend2), new IntWritable(2));
                      }
                  }
              }
          }
      }
                 
    • 第二個mapper類
      /**
       * @Author 張勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:32
       */
      public class TwoFriendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 擷取一行資料
              String line = value.toString();
              // 擷取朋友關系的資訊
              String friendInfo = line.split("\t")[0];
              // 擷取朋友關系的深度
              int deep = Integer.parseInt(line.split("\t")[1]);
              context.write(new Text(friendInfo), new IntWritable(deep));
          }
      }
                 
    • 第二個reducer類
      /**
       * @Author 張勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:34
       */
      public class TwoFriendReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              Boolean flag = true;
              /**
               * 設定好友關系為true的時候進行輸出
               * 因為題目要求是輸出可能相識的好友。是以為true的代碼應該是2
               * 也就是好友關系為1的時候設定變量為false
               */
              for (IntWritable value : values) {
                  if (value.get() == 1) {
                      flag = false;
                  }
              }
              if (flag) {
                  context.write(key, NullWritable.get());
              }
          }
      }
                 
    • Driver類
      /**
       * @Author 張勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:36
       */
      public class FriendDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration();
      
              //設定第一輪MapReduce的相應處理類與輸入輸出
              Job job1 = Job.getInstance(conf);
      
              job1.setJarByClass(FriendDriver.class);
      
              job1.setMapperClass(OneFriendMapper.class);
              job1.setReducerClass(OneFriendReducer.class);
      
              job1.setMapOutputKeyClass(Text.class);
              job1.setMapOutputValueClass(Text.class);
      
              job1.setOutputKeyClass(Text.class);
              job1.setOutputValueClass(IntWritable.class);
      
              //設定路徑(傳輸、結果)
              FileInputFormat.setInputPaths(job1, new Path("hdfs://anshun115:9000/friend"));
              FileOutputFormat.setOutputPath(job1, new Path("hdfs://anshun115:9000/result/friend"));
      
              //如果第一輪MapReduce完成再做這裡的代碼
              if (job1.waitForCompletion(true)) {
                  Job job2 = Job.getInstance(conf);
                  // 設定第二個Job任務的Mapper
                  job2.setMapperClass(TwoFriendMapper.class);
                  job2.setMapOutputKeyClass(Text.class);
                  job2.setMapOutputValueClass(IntWritable.class);
      
                  // 設定第二個Job任務的Reducer
                  job2.setReducerClass(TwoFriendReducer.class);
                  job2.setOutputKeyClass(Text.class);
                  job2.setOutputValueClass(NullWritable.class);
      
                  /**
                   * 設定第二個Job任務是輸入輸出路徑。
                   * 此處的輸入路徑是第一個job任務的輸出路徑
                   * 注意設定路徑時,裡面傳入的job應該是目前的job任務,如下所示,應該是job2。
                   * 如果寫成前面的job任務名稱,在運作時則會爆出錯誤,提示路徑不存在。
                   */
                  FileInputFormat.setInputPaths(job2, new Path("hdfs://anshun115:9000/result/friend"));
                  FileOutputFormat.setOutputPath(job2, new Path("hdfs://anshun115:9000/result/friend2"));
                  // 此處送出任務時,注意用的是job2。
                  job2.waitForCompletion(true);
              }
          }
      }
                 
  3. 運作結果:

    friend

    jim-smith	1
    jim-lucy	1
    jim-tom	1
    smith-tom	2
    lucy-smith	2
    lucy-tom	2
    rose-smith	1
    lucy-rose	1
    rose-tom	1
    smith-tom	2
    lucy-smith	2
    lucy-tom	2
    rose-smith	1
    smith-tom	1
    jim-smith	1
    rose-tom	2
    jim-rose	2
    jim-tom	2
    lucy-tom	1
    smith-tom	1
    jim-tom	1
    rose-tom	1
    lucy-smith	2
    lucy-rose	2
    jim-lucy	2
    jim-smith	2
    jim-rose	2
    rose-smith	2
               
    friend2
    jim-rose
    lucy-smith