一. 建立MAVEN程式
二.添加項目依賴包
建立完成後,要給項目添加相關依賴包,否則會出錯。
點選Idea的File菜單,然後點選“Project Structure”菜單,如下圖所示:
導入jar包:
選擇hadoop的包,我用得是hadoop3.*。把下面的依賴包都加入到工程中,否則會出現某個類找不到的錯誤。
(1)”/usr/local/hadoop/share/hadoop/common”目錄下的hadoop-common-3.2.1.jar和haoop-nfs-3.2.1.jar;
(2)/usr/local/hadoop/share/hadoop/common/lib”目錄下的所有JAR包;
(3)“/usr/local/hadoop/share/hadoop/hdfs”目錄下的haoop-hdfs-3.2.1.jar和haoop-hdfs-nfs-3.2.1.jar;
(4)“/usr/local/hadoop/share/hadoop/hdfs/lib”目錄下的所有JAR包。
可以再加上一個haoop-hdfs-client-3.2.1.jar
如果之後項目import報錯,則再配置一下pom.xml檔案中的依賴項:
三.編寫程式
需要處理的檔案是一個日志檔案,我需要從中提取出ID、郵件發送狀态、郵件發送位址,這三個資訊。日志檔案截圖如下:
基本思路類似于wordcount程式。ID、郵件發送狀态、郵件發送位址,觀察發現這三個資訊存在于starting和delivery開頭的行資料中,是以對行資料按空格切片,接下來在mapper中取所需内容即可,然後在reducer中對相同key值的資料的value進行拼接,基本上一個簡單的程式就完成了。
3.1先編寫mapper類:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogAnalysisMap extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException{
String line = value.toString();//讀取一行資料
String[] str = line.split(" ");//因為英文字母是以“ ”為間隔的,是以使用“ ”分隔符将一行資料切成多個單詞并存在數組中
String[] res = new String[2];
if (str[1].equals("starting")) {
String[] id = str[3].split(":");
res[0] = id[0];
res[1] = str[8];
context.write(new Text(res[0]),new Text(res[1]));
}else if (str[1].equals("delivery")) {
String[] id = str[2].split(":");
res[0] = id[0];
String[] status = str[3].split(":");
res[1] = status[0];
context.write(new Text(res[0]),new Text(res[1]));
}
}
}
3.2編寫Reduce類:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogAnalysisReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException,InterruptedException{
String result = "";
for(Text value: values) {
if(value.find("@") == -1){
result = value + "\t" + result;
}else {
result = result + "\t" + value;
}
}
context.write(key,new Text(result));
}
}
3.3編寫入口類:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.example.mapper.LogAnalysisMap;
import org.example.reducer.LogAnalysisReducer;
public class LogAnalysisRunner {
public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
Job wcJob = Job.getInstance(conf);
wcJob.setJarByClass(LogAnalysisRunner.class);
//本job使用的mapper和reducer類
wcJob.setMapperClass(LogAnalysisMap.class);
wcJob.setReducerClass(LogAnalysisReducer.class);
// 指定reduce的輸出資料kv類型
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(Text.class);
// 指定mapper的輸出資料kv類型
wcJob.setMapOutputKeyClass(Text.class);
wcJob.setMapOutputValueClass(Text.class);
//指定要處理的輸入資料存放路徑
FileInputFormat.setInputPaths(wcJob, new Path("指定要處理的輸入資料存放路徑"));
//指定處理結果的輸出資料存放路徑
FileOutputFormat.setOutputPath(wcJob, new Path("指定處理結果的輸出資料存放路徑"));
//将job送出給叢集運作
wcJob.waitForCompletion(true);
}
}
編寫完成後可直接在IDEA中運作入口類,生成檔案會存在于入口類中指定的路徑下。
到此為止,我們的程式就寫完啦!輸出檔案中的資料如下所示:
或者也可以修改一下入口類,把整個程式打成jar包,上傳到叢集環境中運作。這個環節與上一篇中對wordcount程式的處理差不多,就不贅述啦。