天天看点

MapReduce的几个企业级经典面试案例MapReduce的几个企业级经典面试案例

MapReduce的几个企业级经典面试案例

一、官方统计案例:

  • 要求:统计一下单词出现的次数
  1. 测试数据:
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
    zhangyong zhangrui zhangqin
               
  2. 编写代码:
    • mapper类
      /**
       * @author 17616
       */
      public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 首先获取一行数据
              String line = value.toString ();
              // 将行内的单词进行切分,使用一个数组进行保存,切分数据时根据源数据得知可以使用空格的方式切分。
              String[] arr = line.split (" ");
              for (String str : arr) {
                  context.write (new Text (str), new LongWritable (1));
              }
          }
      }
                 
    • reducer类
      /**
       * @author 17616
       */
      public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
          @Override
          public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
              // 定义变量记录单词出现的次数
              long sum = 0;
              for (LongWritable val : values) {
                  // 记录总次数
                  sum += val.get ();
              }
              // 输出数据,key就是单词,value就是在map阶段这个单词出现的总次数
              context.write (key, new LongWritable (sum));
          }
      }
                 
    • Driver类
      /**
       * @author 17616
       * 官方案例,计算统计
       */
      public class WordCountDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              // 获取当前的默认配置
              Configuration conf = new Configuration ();
      
              // 获取代表当前mr作业的job对象
              Job job = Job.getInstance (conf);
              // 指定一下当前程序的入口类
              job.setJarByClass (WordCountDriver.class);
      
              //指定当前Mapper、Reducer任务的类
              job.setMapperClass (WordCountMapper.class);
              job.setReducerClass (WordCountReducer.class);
      
              //设置Mapper的结果类型
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (LongWritable.class);
      
              // 设置Reducer的结果类型
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (LongWritable.class);
      
              //设置待分析的文件夹路径(linux的路径地址)
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/mapreduce"));
      
              if (!job.waitForCompletion (true)) {
                  return;
              }
          }
      }
                 
  3. 运行结果:
    zhangqin	20
    zhangrui	20
    zhangyong	20
               

二、计算平均值:

  • 要求:计算一下数据的平均值
    1. 测试数据:
      tom 69
      tom 84
      tom 68
      jary 89
      jary 90
      jary 81
      jary 35
      alex 23
      alex 100
      alex 230
                 
    2. 编写代码:
      • mapper类
        /**
         * @Author zhangyong
         * @Date 2020/4/3 23:43
         * @Version 1.0
         */
        public class AverageMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //获取每行的数据内容
                String line = value.toString ();
                //按照空格去切会获取到多个数据,所以用数组的方式存储
                String[] data = line.split (" ");
                String name = data[0];
                //Integer做一个数据类型的强制转换。
                int score = Integer.parseInt (data[1]);
                //输出数据
                context.write (new Text (name), new IntWritable (score));
            }
        }
                   
      • reducer类
        /**
         * @Author zhangyong
         * @Date 2020/4/3 23:43
         * @Version 1.0
         */
        public class AverageReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            public void reduce(Text name, Iterable<IntWritable> scores, Context context) throws IOException, InterruptedException {
                int i = 0;
                int score = 0;
                for (IntWritable data : scores) {
                    score = score + data.get ();
                    i++;
                }
                int average = score / i;
                context.write (name, new IntWritable (average));
            }
        }
                   
      • Driver类
        /**
         * @Author zhangyong
         * @Date 2020/4/3 23:41
         * @Version 1.0
         * 计算平均值
         */
        public class AverageDriver {
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration ();
                Job job = Job.getInstance (conf);
        
                //驱动类,入口类
                job.setJarByClass (AverageDriver.class);
        
                //设置Mapper和Reducer的类
                job.setMapperClass (AverageMapper.class);
                job.setReducerClass (AverageReducer.class);
        
                //设置Mapper的结果类型
                job.setMapOutputKeyClass (Text.class);
                job.setMapOutputValueClass (IntWritable.class);
        
                //设置Reduce的结果类型
                job.setOutputKeyClass (Text.class);
                job.setOutputValueClass (IntWritable.class);
        
                //设置待分析的文件夹路径(linux的路径地址)
                FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/average"));
                FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/average"));
        
                //提交到job
                job.waitForCompletion (true);
        
            }
        }
                   
    3. 运行结果:
      alex 117
      jary 73
      tom	73
                 

三、求温度最高值:

  • 要求:求出一下年限的时间的最高温度
  1. 测试数据:
    2329999919500515070000
    9909999919500515120022
    9909999919500515180011
    9509999919490324120111
    6509999919490324180078
    9909999919370515070001
    9909999919370515120002
    9909999919450515180001
    6509999919450324120002
    8509999919450324180078
               
  2. 编写代码:
    • mapper类
      /**
       * @author 17616
       */
      public class HeightMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              //获取一段数据
              String line = value.toString ();
      
              //获取年份
              String year = line.substring (8, 12);
      
              //获取温度(强制转换一下)
              int t = Integer.parseInt (line.substring (18, 22));
      
              context.write (new Text (year),new LongWritable (t));
      
          }
      }
                 
    • reducer类
      /**
       * @author 17616
       */
      public class HeightReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
          @Override
          public void reduce(Text year, Iterable<LongWritable> t, Context context) throws IOException, InterruptedException {
              long max = 0;
              for (LongWritable data : t) {
                  if (max < data.get ()) {
                      max = data.get ();
                  }
              }
              context.write (year, new LongWritable (max));
          }
      }
                 
    • Driver类
      /**
       * @author 17616
       * -求最大值
       */
      public class HeightDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              // 获取当前的默认配置
              Configuration conf = new Configuration ();
      
              // 获取代表当前mr作业的job对象
              Job job = Job.getInstance (conf);
      
              // 指定一下当前程序的入口类
              job.setJarByClass (HeightDriver.class);
      
              //指定当前Mapper、Reducer任务的类
              job.setMapperClass (HeightMapper.class);
              job.setReducerClass (HeightReducer.class);
      
              //设置Mapper的结果类型
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (LongWritable.class);
      
              // 设置Reducer的结果类型
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (LongWritable.class);
      
              //设置待分析的文件夹路径(linux的路径地址)
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/wendu/"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wendu"));
      
              job.waitForCompletion (true);
      
          }
      }
                 
  3. 运行结果:
    1937	2
    1945	78
    1949	111
    1950	22
               

四、数据去重:

  • 要求:去重一下ip地址
  1. 测试数据:
    192.168.234.21
    192.168.234.22
    192.168.234.21
    192.168.234.21
    192.168.234.23
    192.168.234.21
    192.168.234.21
    192.168.234.21
    192.168.234.25
    192.168.234.21
    192.168.234.21
    192.168.234.26
    192.168.234.21
    192.168.234.27
    192.168.234.21
    192.168.234.27
    192.168.234.21
    192.168.234.29
    192.168.234.21
    192.168.234.26
    192.168.234.21
    192.168.234.25
    192.168.234.25
    192.168.234.25
    192.168.234.21
    192.168.234.22
    192.168.234.21
               
  2. 编写代码:
    • mapper类
      /**
       * @Author zhangyong
       * @Date 2020/4/7 19:53
       * @Version 1.0
       */
      public class DisMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              /**
               * 其中value只是一个变量,此处被当做key进行输出
               */
              context.write (value,NullWritable.get ());
          }
      }
                 
    • reducer类
      /**
       * @Author zhangyong
       * @Date 2020/4/7 21:21
       * @Version 1.0
       */
      public class DisReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
      
          @Override
          public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
              context.write (key, NullWritable.get ());
          }
      
      }
                 
    • Driver类
      /**
       * @Author zhangyong
       * @Date 2020/4/7 21:32
       * @Version 1.0
       * 数据去重
       */
      public class DisDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
      
              //设置Drive类
              job.setJarByClass (DisReducer.class);
      
              //设置Mapper、Reduce类
              job.setMapperClass (DisMapper.class);
              job.setReducerClass (DisReducer.class);
      
              //Mapper的输出
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (NullWritable.class);
      
              //地址
              FileInputFormat.setInputPaths (job,new Path ("hdfs://anshun115:9000/distinct"));
              FileOutputFormat.setOutputPath (job,new Path ("hdfs://anshun115:9000/result/distinct"));
      
              job.waitForCompletion (true);
          }
      }
                 
  3. 运行结果:
    192.168.234.21
    192.168.234.22
    192.168.234.23
    192.168.234.25
    192.168.234.26
    192.168.234.27
    192.168.234.29
               

五、流量统计:

  • 要求:统计一下手机号码使用的流量数
  1. 测试数据:
    13901000123 zs bj 343
    13202111011 ww sh 456
    13901000123 zs bj 1024
    13207551234 ls sz 758
               
  2. 编写代码:
    • Bean类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:01
       * @Version 1.0
       */
      public class FlowBean implements Writable {
          private String phone;
          private String name;
          private String addr;
          private long flow;
      
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (phone);
              dataOutput.writeUTF (name);
              dataOutput.writeUTF (addr);
              dataOutput.writeLong (flow);
          }
      
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.phone = dataInput.readUTF ();
              this.name = dataInput.readUTF ();
              this.addr = dataInput.readUTF ();
              this.flow = dataInput.readLong ();
          }
      
          public String getPhone() {
              return phone;
          }
      
          public void setPhone(String phone) {
              this.phone = phone;
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public String getAddr() {
              return addr;
          }
      
          public void setAddr(String addr) {
              this.addr = addr;
          }
      
          public long getFlow() {
              return flow;
          }
      
          public void setFlow(long flow) {
              this.flow = flow;
          }
      
          @Override
          public String toString() {
              return "FlowBean{" +
                      "phone='" + phone + '\'' +
                      ", name='" + name + '\'' +
                      ", addr='" + addr + '\'' +
                      ", flow=" + flow +
                      '}';
          }
      }
                 
    • mapper类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:10
       * @Version 1.0
       */
      public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              //获取行
              String line = value.toString ();
              /**
               * [13901000123,zk,bj,343]
               *  phone = 13901000123;
               *  name = zk;
               *  addr = bj;
               *  flow = 343;
               */
              String[] info = line.split (" ");
              FlowBean flowBean = new FlowBean ();
      
              flowBean.setPhone (info[0]);
              flowBean.setName (info[1]);
              flowBean.setAddr (info[2]);
              flowBean.setFlow (Integer.parseInt (info[3]));
      
              context.write (new Text (flowBean.getName ()), flowBean);
          }
      }
                 
    • reducer类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:23
       * @Version 1.0
       */
      public class FlowReducer extends Reducer<Text, FlowBean, FlowBean, NullWritable> {
          @Override
          public void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
              FlowBean result = new FlowBean ();
      
              for (FlowBean value : values) {
                  result.setPhone (value.getPhone ());
                  result.setName (value.getName ());
                  result.setAddr (value.getAddr ());
                  result.setFlow (result.getFlow () + value.getFlow ());
              }
              context.write (result, NullWritable.get ());
          }
      }
                 
    • Driver类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 8:28
       * @Version 1.0
       * 流量统计
       */
      public class FlowDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              // 加载主类
              job.setJarByClass (FlowDriver.class);
      
              //加载mapper、reduce类
              job.setMapperClass (FlowMapper.class);
              job.setReducerClass (FlowReducer.class);
      
              //设置map的的key、value
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(FlowBean.class);
      
              //设置输出的的key、value
              job.setOutputKeyClass(FlowBean.class);
              job.setOutputValueClass (NullWritable.class);
      
              //设置路径(传输、结果)
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/flow"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/flow"));
              job.waitForCompletion (true);
          }
      }
                 
  3. 运行结果:
    FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}
    FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}
    FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}
               

六、电影的排行榜:

  • 要求:按照降序排列一下电影的热度
  1. 测试数据:
    中国机长 72
    机械师2 83
    奇异博士 87
    流浪地球 79
    复仇者联盟4:终局之战 94
    惊奇队长 68
    蜘蛛侠:英雄远征 80
    长城 56
    夺路而逃 69 
    神奇动物在哪里 57
    驴得水 59
    我不是潘金莲 55
    速度与激情:特别行动 77
    哪吒之魔童降世 96
    捉迷藏 78
    上海堡垒 9
    叶问4 75
    勇士之门 35
    罗曼蒂克消亡史 67
    阿丽塔:战斗天使 89
               
  2. 编写代码:
    • Bean类
      /**
       * @Author zhangyong
       * @Date 2020/4/13 8:42
       * @Version 1.0
       */
      public class MovieBean implements WritableComparable<MovieBean> {
          private String name;
          private int hot;
      
          /**
           * 排序方法
           *
           * @param o
           * @return
           */
          @Override
          public int compareTo(MovieBean o) {
              return o.hot - this.hot;
          }
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (name);
              dataOutput.writeInt (hot);
          }
      
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.name = dataInput.readUTF ();
              this.hot = dataInput.readInt ();
          }
      
      
          public void setName(String name) {
              this.name = name;
          }
      
          public int getHot() {
              return hot;
          }
      
          public void setHot(int hot) {
              this.hot = hot;
          }
      
          @Override
          public String toString() {
              return "MovieBean{" +
                      "name='" + name + '\'' +
                      ", hot=" + hot +
                      '}';
          }
      }
                 
    • mapper类
      /**
       * @Author zhangyong
       * @Date 2020/4/13 8:52
       * @Version 1.0
       */
      public class MovieMapper extends Mapper<LongWritable, Text, MovieBean, NullWritable> {
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              //获取一行
              String line = value.toString ();
              //截取数据
              String[] split = line.split (" ");
              //封装对象
              MovieBean movieBean = new MovieBean ();
              movieBean.setName (split[0]);
              movieBean.setHot (Integer.parseInt (split[1]));
      
              //输出
              context.write (movieBean, NullWritable.get ());
      
          }
      }
                 
    • reducer类
    • Driver类
      /**
       * @Author zhangyong
       * @Date 2020/4/13 9:19
       * @Version 1.0
       */
      public class MovieDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
      
              job.setJarByClass (MovieDriver.class);
      
              job.setMapperClass (MovieMapper.class);
      
              //加载map输出类型和value的输出类型
              job.setMapOutputKeyClass (MovieBean.class);
              job.setMapOutputValueClass (NullWritable.class);
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/sort"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/sort"));
      
              job.waitForCompletion (true);
          }
      }
                 
  3. 运行结果:
    MovieBean{name='哪吒之魔童降世', hot=96}
    MovieBean{name='复仇者联盟4:终局之战', hot=94}
    MovieBean{name='阿丽塔:战斗天使', hot=89}
    MovieBean{name='奇异博士', hot=87}
    MovieBean{name='机械师2', hot=83}
    MovieBean{name='蜘蛛侠:英雄远征', hot=80}
    MovieBean{name='流浪地球', hot=79}
    MovieBean{name='捉迷藏', hot=78}
    MovieBean{name='速度与激情:特别行动', hot=77}
    MovieBean{name='叶问4', hot=75}
    MovieBean{name='中国机长', hot=72}
    MovieBean{name='夺路而逃', hot=69}
    MovieBean{name='惊奇队长', hot=68}
    MovieBean{name='罗曼蒂克消亡史', hot=67}
    MovieBean{name='驴得水', hot=59}
    MovieBean{name='神奇动物在哪里', hot=57}
    MovieBean{name='长城', hot=56}
    MovieBean{name='我不是潘金莲', hot=55}
    MovieBean{name='勇士之门', hot=35}
    MovieBean{name='上海堡垒', hot=9}
               

七、多个文件统计成绩:

  • 要求:根据三张表统计每个同学的各课成绩的总和
  1. 测试数据:

    chinese.txt

    1 alex 89
    2 alex 73
    3 alex 67
    1 romeo 49
    2 romeo 83
    3 romeo 27
    1 lee 77
    2 lee 66
    3 lee 89
               
    english.txt
    1 alex 55
    2 alex 69
    3 alex 75
    1 romeo 44
    2 romeo 64
    3 romeo 86
    1 lee 76
    2 lee 84
    3 lee 93
               
    math.txt
    1 alex 85
    2 alex 59
    3 alex 95
    1 romeo 74
    2 romeo 67
    3 romeo 96
    1 lee 45
    2 lee 76
    3 lee 67
               
  2. 编写代码:
    • Bean类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:00
       * @Version 1.0
       */
      public class ScoreBean implements Writable {
          private String name;
          private int chinese;
          private int math;
          private int english;
      
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF(name);
              dataOutput.writeInt(chinese);
              dataOutput.writeInt(math);
              dataOutput.writeInt(english);
          }
      
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.name = dataInput.readUTF();
              this.chinese = dataInput.readInt();
              this.math = dataInput.readInt();
              this.english = dataInput.readInt();
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public int getChinese() {
              return chinese;
          }
      
          public void setChinese(int chinese) {
              this.chinese = chinese;
          }
      
          public int getMath() {
              return math;
          }
      
          public void setMath(int math) {
              this.math = math;
          }
      
          public int getEnglish() {
              return english;
          }
      
          public void setEnglish(int english) {
              this.english = english;
          }
      
          @Override
          public String toString() {
              return "StuScore{" +
                      "name='" + name + '\'' +
                      ", chinese=" + chinese +
                      ", math=" + math +
                      ", english=" + english +
                      '}';
          }
      }
                 
    • mapper类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:03
       * @Version 1.0
       */
      public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
              // 获取一行数据
              String line = value.toString();
              String[] data = line.split(" ");
      
              ScoreBean ss = new ScoreBean();
              ss.setName(data[1]);
      
              /**
               * 注意,此处导包的时候不要导错,应该导入的是org.apache.hadoop.mapreduce.lib.input.FileSplit;
               * 通过获取当前map阶段的MapTask处理的切片信息来获取文件名。
               */
              FileSplit split = (FileSplit) context.getInputSplit();
      
              if (split.getPath().getName().equals("chinese.txt")) {
                  ss.setChinese(Integer.parseInt(data[2]));
              } else if (split.getPath().getName().equals("math.txt")) {
                  ss.setMath(Integer.parseInt(data[2]));
              } else if (split.getPath().getName().equals("english.txt")) {
                  ss.setEnglish(Integer.parseInt(data[2]));
              }
      
              context.write(new Text(ss.getName()), ss);
          }
      }
                 
    • reducer类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:05
       * @Version 1.0
       */
      public class ScoreReducer extends Reducer<Text, ScoreBean, Text, ScoreBean> {
          @Override
          protected void reduce(Text key, Iterable<ScoreBean> values, Context context) throws IOException, InterruptedException {
      
              ScoreBean resultScore = new ScoreBean();
              // 此处key.toSting中只有name一个值,因为在map阶段的输出key只有name
              resultScore.setName(key.toString());
      
              for (ScoreBean value : values) {
                  //  result.setFlow(result.getFlow() + value.getFlow());
                  // 语文成绩分数
                  resultScore.setChinese(resultScore.getChinese() + value.getChinese());
                  // 数学成绩分
                  resultScore.setMath(resultScore.getMath() + value.getMath());
                  // 英语成绩分
                  resultScore.setEnglish(resultScore.getEnglish() + value.getEnglish());
              }
              context.write(key, resultScore);
          }
      }
                 
    • Driver类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:10
       * @Version 1.0
       * 统计成绩
       */
      public class ScoreDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              //加载Drive类
              job.setJarByClass (ScoreDriver.class);
              //加载Mapper、Reducer类
              job.setMapperClass (ScoreMapper.class);
              job.setReducerClass (ScoreReducer.class);
              //加载map输出类型和value的输出类型
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (ScoreBean.class);
      
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (ScoreBean.class);
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/score"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/score"));
      
              job.waitForCompletion (true);
          }
      }
                 
  3. 运行结果:
    alex StuScore{name='alex', chinese=229, math=239, english=199}
    lee	StuScore{name='lee', chinese=232, math=188, english=253}
    romeo StuScore{name='romeo', chinese=159, math=237, english=194}
               

八、Job链处理数据:

  • 要求:统计每个同学的总分
  1. 测试数据:
    1|zhang 100
    2|wang 200
    3|zhang 150
    4|lisi 190
    5|wang 50
    6|zhang 80
    7|lisi 50
               
  2. 编写代码:
    • Bean类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 15:06
       * @Version 1.0
       */
      public class CountBean implements WritableComparable<CountBean> {
          private String name;
          private int count;
      
          @Override
          public int compareTo(CountBean o) {
              return o.count - this.count;
          }
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (name);
              dataOutput.writeInt (count);
          }
      
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.name = dataInput.readUTF ();
              this.count = dataInput.readInt ();
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public int getCount() {
              return count;
          }
      
          public void setCount(int count) {
              this.count = count;
          }
      
          @Override
          public String toString() {
              return "CountBean{" +
                      "name='" + name + '\'' +
                      ", count=" + count +
                      '}';
          }
      }
                 
    • mapper类

      OneCountMapper类

      /**
       * @Author zhangyong
       * @Date 2020/4/14 8:54
       * @Version 1.0
       */
      public class OneCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String line = value.toString ();
              /**
               * 通过 | 进行切分时会得到一个数组,数组中的0号下标为序号,1号下标中有name和profit的数据。
               * 再通过切分1号下标中的数据时可以获取到name和profit的数据。
               */
              String name = line.split ("\\|")[1].split (" ")[0];
              int count = Integer.parseInt (line.split ("\\|")[1].split (" ")[1]);
      
              context.write (new Text (name),new IntWritable (count));
              
          }
      }
                 
      TwoCountMapper类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 15:27
       * @Version 1.0
       */
      public class TwoCountMapper extends Mapper<LongWritable, Text, CountBean, NullWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String line = value.toString ();
              /**
               * 因为第二个Mapper任务要读取的数据内容是第一个MR任务的结果文件,通常MR任务的结果文件是以TAB的方式来展示数据的。
               * 所以当第二个Mapper任务要执行切分时,所使用的分隔符应该是\t——制表符。
               */
              String name = line.split ("\t")[0];
              int count = Integer.parseInt (line.split ("\t")[1]);
      
              CountBean bean = new CountBean ();
              bean.setName (name);
              bean.setCount (count);
      
              context.write (bean, NullWritable.get ());
          }
      }
                 
    • reducer类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 8:54
       * @Version 1.0
       */
      public class OneCountReducer extends Reducer<Text , IntWritable, Text, IntWritable> {
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              //统计利润
              int sum = 0;
              for (IntWritable value : values) {
                  sum+=value.get ();
              }
              context.write (key,new IntWritable (sum));
          }
      }
                 
    • Driver类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 8:54
       * @Version 1.0
       * job链操作数据
       */
      public class CountDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              job.setJarByClass (CountDriver.class);
              job.setMapperClass (OneCountMapper.class);
              job.setReducerClass (OneCountReducer.class);
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (IntWritable.class);
              job.setOutputKeyClass (Text.class);
              job.setOutputValueClass (IntWritable.class);
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/count"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/count"));
              if (job.waitForCompletion (true)) {
                  // 设置第二个Job任务
                  Job job2 = Job.getInstance (conf);
                  // 设置第二个Job任务的Mapper
                  job2.setMapperClass (TwoCountMapper.class);
                  job2.setMapOutputKeyClass (CountBean.class);
                  job2.setMapOutputValueClass (NullWritable.class);
                  /**
                   * 设置第二个Job任务是输入输出路径。
                   * 此处的输入路径是第一个job任务的输出路径
                   * 注意设置路径时,里面传入的job应该是当前的job任务,如下所示,应该是job2。
                   * 如果写成前面的job任务名称,在运行时则会爆出错误,提示路径不存在。
                   */
                  FileInputFormat.setInputPaths (job2, new Path ("hdfs://anshun115:9000/result/count"));
                  FileOutputFormat.setOutputPath (job2, new Path ("hdfs://anshun115:9000/result/count2"));
                  // 此处提交任务时,注意用的是job2。
                  job2.waitForCompletion (true);
              }
          }
      
      }
                 
  3. 运行结果:

    count.txt

    lisi	240
    wang	250
    zhang	330
               
    count2.txt
    CountBean{name='zhang', count=330}
    CountBean{name='wang', count=250}
    CountBean{name='lisi', count=240}
               

九、简单分区案例:

  • 要求:分区显示手机使用流量的总和
    1. 测试数据:
      13901000123 zs bj 343
      13202111011 ww sh 456
      13901000123 zs bj 1024
      13207551234 ls sz 758
                 
    2. 编写代码:
    
       - Partitioner类
    
         ```java
         /**
          * @Author zhangyong
       * @Date 2020/4/10 10:42
          * @Version 1.0
       */
         public class AddPartitioner extends Partitioner<Text, PartFlowBean> {
             @Override
             public int getPartition(Text text, PartFlowBean flowBean, int
                  numPartitioner) {
                 String addr = flowBean.getAddr();
              if (addr.equals("bj")) {
                     return 0;
                 } else if (addr.equals("sh")) {
                     return 1;
              } else {
                     return 2;
              }
             }
         }
               
    • Bean类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:01
       * @Version 1.0
       */
      public class PartFlowBean implements Writable {
          private String phone;
          private String name;
          private String addr;
          private long flow;
      
          /**
           * 序列化
           *
           * @param dataOutput
           * @throws IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF (phone);
              dataOutput.writeUTF (name);
              dataOutput.writeUTF (addr);
              dataOutput.writeLong (flow);
          }
          /**
           * 反序列化
           *
           * @param dataInput
           * @throws IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              this.phone = dataInput.readUTF ();
              this.name = dataInput.readUTF ();
              this.addr = dataInput.readUTF ();
              this.flow = dataInput.readLong ();
          }
      
          public String getPhone() {
              return phone;
          }
      
          public void setPhone(String phone) {
              this.phone = phone;
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public String getAddr() {
              return addr;
          }
      
          public void setAddr(String addr) {
              this.addr = addr;
          }
      
          public long getFlow() {
              return flow;
          }
      
          public void setFlow(long flow) {
              this.flow = flow;
          }
      
          @Override
          public String toString() {
              return "FlowBean{" +
                      "phone='" + phone + '\'' +
                      ", name='" + name + '\'' +
                      ", addr='" + addr + '\'' +
                      ", flow=" + flow +
                      '}';
          }
      }
                 
    • mapper类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:51
       * @Version 1.0
       */
      public class PartFlowMapper extends Mapper<LongWritable, Text, Text, PartFlowBean> {
          @Override
          public void map(LongWritable key, Text value, Context context) throws
                  IOException, InterruptedException {
              String line = value.toString ();
               /**
               [13901000123,zk,bj,343]
               phone = 13901000123;
               name = zk;
               addr = bj;
               flow = 343;
               */
              String[] info = line.split (" ");
              PartFlowBean flowBean = new PartFlowBean ();
              flowBean.setPhone (info[0]);
              flowBean.setName (info[1]);
              flowBean.setAddr (info[2]);
              flowBean.setFlow (Integer.parseInt (info[3]));
              context.write (new Text (flowBean.getName ()), flowBean);
          }
      }
                 
    • reducer类
      /**
       * @Author zhangyong
       * @Date 2020/4/10 10:23
       * @Version 1.0
       */
      public class PartFlowReducer extends Reducer<Text, PartFlowBean, PartFlowBean,
              NullWritable> {
          @Override
          public void reduce(Text key, Iterable<PartFlowBean> values, Context
                  context) throws IOException, InterruptedException {
              PartFlowBean result = new PartFlowBean ();
              for (PartFlowBean value : values) {
                  result.setPhone (value.getPhone ());
                  result.setPhone (value.getPhone ());
                  result.setName (value.getName ());
                  result.setAddr (value.getAddr ());
                  result.setFlow (result.getFlow () + value.getFlow ());
              }
              context.write (result, NullWritable.get ());
          }
      }
                 
    • Driver类
      /**
       * @Author zhangyong
       * @Date 2020/4/11 11:17
       * @Version 1.0
       * 分区案例
       */
      public class PartFlowDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
      
              Job job = Job.getInstance (conf);
      
              job.setJarByClass (PartFlowDriver.class);
      
              job.setMapperClass (PartFlowMapper.class);
              job.setReducerClass (PartFlowReducer.class);
              /**
               * 下面的两个类如果不写的话,那么就不会生效。
               */
              // 设置分区类
              job.setPartitionerClass (AddPartitioner.class);
              // 设置分区数量
              job.setNumReduceTasks (3);
      
              job.setMapOutputKeyClass (Text.class);
              job.setMapOutputValueClass (PartFlowBean.class);
      
              job.setOutputKeyClass (PartFlowBean.class);
              job.setOutputValueClass (NullWritable.class);
      
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/partition"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/partition"));
      
              job.waitForCompletion (true);
          }
      }
                 
  1. 运行结果:

    part-r-00000

    FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}
               
    part-r-00001
    FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}
               
    part-r-00002
    FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}
               

十、分区并全排序:

  • 要求:把一下数据按照两位数、三位数、四位数以上进行分区,并且按照大小排序
  1. 测试数据:
    82 239 231
    23 22 213
    123 232 124
    213 3434 232
    4546 565 123
    231 231
    2334 231
    1123 5656 657
    12313 4324 213
    123 2 232 32
    343 123 4535
    12321 3442 453
    1233 342 453
    1231 322 452
    232 343 455
    3123 3434 3242
               
  2. 编写代码:
    • Partitioner类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:39
       * @Version 1.0
       * 全排序
       * 将上述文件内容按照数字位数分别写入三个文件,如下
       * 0-99的写入到文件1
       * 100-999写入到文件2
       * 1000-其他数据写入到文件3
       */
      public class AutoPartitioner extends Partitioner<IntWritable, IntWritable> {
          @Override
          public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
              String num = String.valueOf (key.get ());
              if (num.matches ("[0-9][0-9]") || num.matches ("[0-9]")) {
                  return 0;
              } else if (num.matches ("[0-9][0-9][0-9]")) {
                  return 1;
              } else {
                  return 2;
              }
          }
      }
                 
    • mapper类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:44
       * @Version 1.0
       */
      public class NumSortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String line = value.toString ();
              String[] data = line.split (" ");
              for (String num : data) {
                  context.write (new IntWritable (Integer.parseInt (num)), new IntWritable (1));
              }
          }
      }
                 
    • reducer类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:39
       * @Version 1.0
       */
      public class NumSortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
          @Override
          protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              int result = 0;
              for (IntWritable count : values) {
                  result = result + count.get ();
              }
              context.write (key, new IntWritable (result));
          }
      }
                 
    • Driver类
      /**
       * @Author zhangyong
       * @Date 2020/4/14 9:39
       * @Version 1.0
       *
       */
      public class NumSortDriver {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration ();
              Job job = Job.getInstance (conf);
              job.setJarByClass (NumSortDriver.class);
              job.setMapperClass (NumSortMapper.class);
              job.setMapOutputKeyClass (IntWritable.class);
              job.setMapOutputValueClass (IntWritable.class);
              /**
               * 由于结果文件系统是3个,所以需要在此指定Reduce的分区类和任务数。
               */
              job.setPartitionerClass (AutoPartitioner.class);
              job.setNumReduceTasks (3);
              job.setReducerClass (NumSortReducer.class);
              job.setOutputKeyClass (IntWritable.class);
              job.setOutputValueClass (IntWritable.class);
              FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/numcount/"));
              FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/numcount"));
              job.waitForCompletion (true);
          }
      }
                 
  3. 运行结果:

    part-r-00000

    2	1
    22	1
    23	1
    32	1
    82	1
               
    part-r-00001
    123	4
    124	1
    213	3
    231	4
    232	4
    239	1
    322	1
    342	1
    343	2
    452	1
    453	2
    455	1
    565	1
    657	1
               
    part-r-00002
    1123	1
    1231	1
    1233	1
    2334	1
    3123	1
    3242	1
    3434	2
    3442	1
    4324	1
    4535	1
    4546	1
    5656	1
    12313	1
    12321	1
               

十一、Combine提高运行效率:

  • 要求:使用Combine类统计一下单词出现的次数
    1. 测试数据:
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
      zhangyong zhangrui zhangqin
                 
    2. 编写代码:
      • mapper类
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:30
         * @Version 1.0
         */
        public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString ();
                String[] words = line.split (" ");
                for (String word : words) {
                    context.write (new Text (word), new IntWritable (1));
                }
            }
        }
                   
      • Combine类
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:34
         * @Version 1.0
         */
        public class WcCombine extends Reducer<Text,IntWritable,Text,IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count=0;
                for (IntWritable value : values) {
                    count+=value.get ();
                }
                context.write (key,new IntWritable (count));
            }
        }
                   
      • reducer类
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:34
         * @Version 1.0
         */
        public class WcReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count = 0;
                for (IntWritable value : values) {
                    count+=value.get ();
                    System.err.println(key + ":" + value);
                }
                context.write (key,new IntWritable (count));
            }
        }
                   
      • Driver类
        /**
         * @Author zhangyong
         * @Date 2020/4/15 7:42
         * @Version 1.0
         */
        public class WcDriver {
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration ();
                Job job = Job.getInstance (conf);
        
                job.setJarByClass (WcDriver.class);
        
                job.setMapperClass (WcMapper.class);
                job.setReducerClass (WcReducer.class);
        
                job.setMapOutputKeyClass (Text.class);
                job.setMapOutputValueClass (IntWritable.class);
        
                /**
                 * 设置combine组件类,如果不设定,默认是不执行combine过程的。
                 * 设置combine的目的是为了让合并工作提前发生一次,在MapTask阶段时合并一次,使Reduce阶段的工作负载。
                 * 需要注意的是combine仅仅是做合并的工作,减少工作负载,并不能影响最终的文件结果。
                 */
                job.setCombinerClass(WcCombine.class);
        
                job.setOutputKeyClass (Text.class);
                job.setOutputValueClass (IntWritable.class);
        
                FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce"));
                FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wccombine"));
        
                job.waitForCompletion (true);
            }
        }
                   
    3. 运行结果:
      zhangqin	20
      zhangrui	20
      zhangyong	20
                 

十二、推荐认识好友:

  • 要求:找出一下朋友的潜在朋友(一度二度朋友关系链)
  1. 测试数据:
    tom rose
    tom jim
    tom smith
    tom lucy
    rose tom
    rose lucy
    rose smith
    jim tom
    jim lucy
    jim smith
    smith jim
    smith tom
    smith rose
               
  2. 编写代码:
    • 第一个mapper类
      /**
       * @Author 张勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:08
       */
      public class OneFriendMapper extends Mapper<LongWritable, Text, Text, Text> {
          /**
           * 输入的key和value是根据文件内容来确定。
           * 输出的key和value是因为在业务逻辑中设定的输出是name-friend好友关系。
           */
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 获取每行的数据
              String line = value.toString();
              // 获取姓名
              String name = line.split(" ")[0];
              // 获取好友
              String friend = line.split(" ")[1];
              context.write(new Text(name), new Text(friend));
          }
      }
                 
    • 第一个reducer类
      /**
       * @Author 张勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:28
       */
      public class OneFriendReducer extends Reducer<Text, Text, Text, IntWritable> {
          /**
           * 输入key和value要和mapper的输出保持一致。
           * Text和IntWritable:
           * 如果是好友-1,如果不是好友就用-2。
           */
          @Override
          protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
              ArrayList<String> friendList = new ArrayList<>();
              //处理好友关系
              for (Text value : values) {
                  friendList.add(value.toString());
                  if (key.toString().compareTo(value.toString()) < 0) {
                      context.write(new Text(key + "-" + value), new IntWritable(1));
                  } else {
                      context.write(new Text(value + "-" + key), new IntWritable(1));
                  }
              }
              // 处理可能相识的好友。
              for (int i = 0; i < friendList.size(); i++) {
                  for (int j = 0; j < friendList.size(); j++) {
                      String friend1 = friendList.get(i);
                      String friend2 = friendList.get(j);
                      if (friend1.compareTo(friend2) < 0) {
                          context.write(new Text(friend1 + "-" + friend2), new IntWritable(2));
                      }
                  }
              }
          }
      }
                 
    • 第二个mapper类
      /**
       * @Author 张勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:32
       */
      public class TwoFriendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 获取一行数据
              String line = value.toString();
              // 获取朋友关系的信息
              String friendInfo = line.split("\t")[0];
              // 获取朋友关系的深度
              int deep = Integer.parseInt(line.split("\t")[1]);
              context.write(new Text(friendInfo), new IntWritable(deep));
          }
      }
                 
    • 第二个reducer类
      /**
       * @Author 张勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:34
       */
      public class TwoFriendReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              Boolean flag = true;
              /**
               * 设定好友关系为true的时候进行输出
               * 因为题目要求是输出可能相识的好友。所以为true的代码应该是2
               * 也就是好友关系为1的时候设置变量为false
               */
              for (IntWritable value : values) {
                  if (value.get() == 1) {
                      flag = false;
                  }
              }
              if (flag) {
                  context.write(key, NullWritable.get());
              }
          }
      }
                 
    • Driver类
      /**
       * @Author 张勇
       * @Site www.gz708090.com
       * @Version 1.0
       * @Date 2020-04-17 12:36
       */
      public class FriendDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration();
      
              //设置第一轮MapReduce的相应处理类与输入输出
              Job job1 = Job.getInstance(conf);
      
              job1.setJarByClass(FriendDriver.class);
      
              job1.setMapperClass(OneFriendMapper.class);
              job1.setReducerClass(OneFriendReducer.class);
      
              job1.setMapOutputKeyClass(Text.class);
              job1.setMapOutputValueClass(Text.class);
      
              job1.setOutputKeyClass(Text.class);
              job1.setOutputValueClass(IntWritable.class);
      
              //设置路径(传输、结果)
              FileInputFormat.setInputPaths(job1, new Path("hdfs://anshun115:9000/friend"));
              FileOutputFormat.setOutputPath(job1, new Path("hdfs://anshun115:9000/result/friend"));
      
              //如果第一轮MapReduce完成再做这里的代码
              if (job1.waitForCompletion(true)) {
                  Job job2 = Job.getInstance(conf);
                  // 设置第二个Job任务的Mapper
                  job2.setMapperClass(TwoFriendMapper.class);
                  job2.setMapOutputKeyClass(Text.class);
                  job2.setMapOutputValueClass(IntWritable.class);
      
                  // 设置第二个Job任务的Reducer
                  job2.setReducerClass(TwoFriendReducer.class);
                  job2.setOutputKeyClass(Text.class);
                  job2.setOutputValueClass(NullWritable.class);
      
                  /**
                   * 设置第二个Job任务是输入输出路径。
                   * 此处的输入路径是第一个job任务的输出路径
                   * 注意设置路径时,里面传入的job应该是当前的job任务,如下所示,应该是job2。
                   * 如果写成前面的job任务名称,在运行时则会爆出错误,提示路径不存在。
                   */
                  FileInputFormat.setInputPaths(job2, new Path("hdfs://anshun115:9000/result/friend"));
                  FileOutputFormat.setOutputPath(job2, new Path("hdfs://anshun115:9000/result/friend2"));
                  // 此处提交任务时,注意用的是job2。
                  job2.waitForCompletion(true);
              }
          }
      }
                 
  3. 运行结果:

    friend

    jim-smith	1
    jim-lucy	1
    jim-tom	1
    smith-tom	2
    lucy-smith	2
    lucy-tom	2
    rose-smith	1
    lucy-rose	1
    rose-tom	1
    smith-tom	2
    lucy-smith	2
    lucy-tom	2
    rose-smith	1
    smith-tom	1
    jim-smith	1
    rose-tom	2
    jim-rose	2
    jim-tom	2
    lucy-tom	1
    smith-tom	1
    jim-tom	1
    rose-tom	1
    lucy-smith	2
    lucy-rose	2
    jim-lucy	2
    jim-smith	2
    jim-rose	2
    rose-smith	2
               
    friend2
    jim-rose
    lucy-smith