天天看點

使用IDEA編寫的第一個MapReduce程式

一. 建立MAVEN程式

使用IDEA編寫的第一個MapReduce程式

二.添加項目依賴包

建立完成後,要給項目添加相關依賴包,否則會出錯。

點選Idea的File菜單,然後點選“Project Structure”菜單,如下圖所示:

使用IDEA編寫的第一個MapReduce程式

導入jar包:

選擇hadoop的包,我用得是hadoop3.*。把下面的依賴包都加入到工程中,否則會出現某個類找不到的錯誤。

(1)”/usr/local/hadoop/share/hadoop/common”目錄下的hadoop-common-3.2.1.jar和haoop-nfs-3.2.1.jar;

(2)/usr/local/hadoop/share/hadoop/common/lib”目錄下的所有JAR包;

(3)“/usr/local/hadoop/share/hadoop/hdfs”目錄下的haoop-hdfs-3.2.1.jar和haoop-hdfs-nfs-3.2.1.jar;

(4)“/usr/local/hadoop/share/hadoop/hdfs/lib”目錄下的所有JAR包。

可以再加上一個haoop-hdfs-client-3.2.1.jar

如果之後項目import報錯,則再配置一下pom.xml檔案中的依賴項:

使用IDEA編寫的第一個MapReduce程式

三.編寫程式

需要處理的檔案是一個日志檔案,我需要從中提取出ID、郵件發送狀态、郵件發送位址,這三個資訊。日志檔案截圖如下:

使用IDEA編寫的第一個MapReduce程式

基本思路類似于wordcount程式。ID、郵件發送狀态、郵件發送位址,觀察發現這三個資訊存在于starting和delivery開頭的行資料中,是以對行資料按空格切片,接下來在mapper中取所需内容即可,然後在reducer中對相同key值的資料的value進行拼接,基本上一個簡單的程式就完成了。

3.1先編寫mapper類:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogAnalysisMap extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException{

        String line = value.toString();//讀取一行資料

        String[] str = line.split(" ");//因為英文字母是以“ ”為間隔的,是以使用“ ”分隔符将一行資料切成多個單詞并存在數組中

        String[] res = new String[2];

        if (str[1].equals("starting")) {
            String[] id = str[3].split(":");
            res[0] = id[0];
            res[1] = str[8];
            context.write(new Text(res[0]),new Text(res[1]));
        }else if (str[1].equals("delivery")) {
            String[] id = str[2].split(":");
            res[0] = id[0];
            String[] status = str[3].split(":");
            res[1] = status[0];
            context.write(new Text(res[0]),new Text(res[1]));
        }
    }
}
           

3.2編寫Reduce類:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogAnalysisReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException,InterruptedException{
        String result = "";
        for(Text value: values) {
            if(value.find("@") == -1){
                result =  value + "\t" + result;
            }else {
                result =  result + "\t" + value;
            }
        }
        context.write(key,new Text(result));
    }
}
           

3.3編寫入口類:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.example.mapper.LogAnalysisMap;
import org.example.reducer.LogAnalysisReducer;

public class LogAnalysisRunner {
    public static void main(String[] args)throws Exception{
        Configuration conf = new Configuration();

        Job wcJob = Job.getInstance(conf);

        wcJob.setJarByClass(LogAnalysisRunner.class);

        //本job使用的mapper和reducer類
        wcJob.setMapperClass(LogAnalysisMap.class);
        wcJob.setReducerClass(LogAnalysisReducer.class);

        // 指定reduce的輸出資料kv類型
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(Text.class);

        // 指定mapper的輸出資料kv類型
        wcJob.setMapOutputKeyClass(Text.class);
        wcJob.setMapOutputValueClass(Text.class);

        //指定要處理的輸入資料存放路徑
        FileInputFormat.setInputPaths(wcJob, new Path("指定要處理的輸入資料存放路徑"));

        //指定處理結果的輸出資料存放路徑
        FileOutputFormat.setOutputPath(wcJob, new Path("指定處理結果的輸出資料存放路徑"));

        //将job送出給叢集運作
        wcJob.waitForCompletion(true);
    }
}

           

編寫完成後可直接在IDEA中運作入口類,生成檔案會存在于入口類中指定的路徑下。

到此為止,我們的程式就寫完啦!輸出檔案中的資料如下所示:

使用IDEA編寫的第一個MapReduce程式

或者也可以修改一下入口類,把整個程式打成jar包,上傳到叢集環境中運作。這個環節與上一篇中對wordcount程式的處理差不多,就不贅述啦。