天天看點

MapReduce—平均工資

MapReduce—平均工資

        • MapReduce—平均工資
          • 1. 需求分析
          • 2. 解答思路

MapReduce—平均工資

我這裡是使用叢集去處理這個日志資料,資料在我的github上,預設使用maven去管理所有的jar包
           
  • github位址
1. 需求分析
按照所給資料檔案去統計每個部門的人數,最高工資,最低工資和平均工資
           
需求統計的日志資料如下:
MapReduce—平均工資

需要将每個部門的人數,工資進行統計。比如10号部門有3個人,最高工資是5000元,最低工資是1300元,平均工資是2916.666666666667元。則以如下形式進行顯示:

10 3 5000 1300 2916.666666666667

2. 解答思路
1.因為要統計部門的人數以及工資,那麼在最後的reduce階段,進行彙總時,可以設定一個計數器,在進行彙總時,就可以計算出部門人數,是以,我們隻需要日志資料中的兩列,分别是部門編号和工資,将部門編号作為key,工資作為value

2.在reduce輸出階段,因為要輸出人數,最高工資,最低工資和平均工資,一共四列,是以需要将計算出的結果拼接成一個Text進行輸出

3.在處理過程中我使用Partitioner将資料分開通過不同的reduce去處理

4.如果需要本地運作,記得注釋掉avgsal檔案中的23/24/25行,并将47行和50行的檔案路徑修改為自己所使用的檔案路徑

5.因為在資料扭轉的過程中,<K2, V2>和<K3, V3>的資料類型發生了變化,是以要在avgsal中設定map端所輸出的資料類型,也就是要指定<K2, V2>的資料類型
           
mapper端代碼
package com.yangqi.avgsal;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author xiaoer
 * @date 2019/11/12 12:44
 */
public class MyMapper extends Mapper<LongWritable, Text, IntWritable, DoubleWritable> {
    IntWritable num = new IntWritable();
    DoubleWritable result = new DoubleWritable();

    /**
     * 針對每一行的資料,都會執行一次下面的map方法
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split(",");
        String str1 = split[split.length - 1];
        String str2 = split[split.length - 3];
        num.set(Integer.parseInt(str1));
        result.set(Double.parseDouble(str2));
        context.write(num, result);
    }
}
           
reduce端代碼
package com.yangqi.avgsal;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author xiaoer
 * @date 2019/11/12 12:47
 */
public class MyReducer extends Reducer<IntWritable, DoubleWritable, IntWritable, Text> {
    Text result = new Text();

    @Override
    protected void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
        // 記錄部門的人數
        int num = 0;
        // 記錄部門的工資和
        double sum = 0;
        // 記錄最大工資
        double max = Double.MIN_VALUE;
        // 記錄最小工資
        double min = Double.MAX_VALUE;
        for (DoubleWritable value : values) {
            num++;
            sum += value.get();
            if (max < value.get()) {
                max = value.get();
            }
            if (min > value.get()) {
                min = value.get();
            }
        }
        // 将結果進行拼接,拼接成Text進行輸出
        String str = "\t" + num + "" + "\t" + max + "" + "\t" + min + "\t" + (sum / num);
        result.set(str);
        // 以<K3, V3>形式進行寫出
        context.write(key, result);
    }
}
           
partitioner端代碼
package com.yangqi.avgsal;

import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author xiaoer
 * @date 2019/11/13 11:54
 */
public class MyPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numPartitions) {
        int emp = Integer.parseInt(key.toString());
        if (emp == 10 || emp == 30) {
            return 0;
        } else
            return 1;
    }
}
           
avgsal
package com.yangqi.avgsal;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author xiaoer
 * @date 2019/11/12 12:50
 */
public class AvgSal {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 擷取配置對象:讀取四個預設配置檔案
        Configuration conf = new Configuration();
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        conf.set("mapreduce.app-submission.cross-platform", "true");
        conf.set("mapred.jar", "AvgSal/target/AvgSal-1.0-SNAPSHOT.jar");
        FileSystem fs = FileSystem.get(conf);
        // 建立Job執行個體對象
        Job job = Job.getInstance(conf, "avgsal");
        // 用于指定驅動類型
        job.setJarByClass(AvgSal.class);
        // 用于指定Map階段的類型
        job.setMapperClass(MyMapper.class);
        // 用于指定Reduce階段的類型
        job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(2);
        // 設定Partition的類型
        job.setPartitionerClass(MyPartitioner.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        // 設定K3的輸出類型
        job.setOutputKeyClass(IntWritable.class);
        // 設定V3的輸出類型
        job.setOutputValueClass(Text.class);

        // 設定要統計的檔案的路徑
        FileInputFormat.addInputPath(job, new Path("/emp"));
        // FileInputFormat.addInputPath(job, new Path(args[0]));
        // 設定檔案的輸出路徑
        Path path = new Path("/output");
        // Path path = new Path(args[1]);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, path);
        // 等到作業執行,并退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
           

繼續閱讀