一個龐大的資料,少則幾十兆多則上TB,我們來統計文檔的資料就得使用hadoop來進行了。
本文章通過一個案例的講解,帶大家了解使用mapreducer的方法。
現在我們有一個檔案如圖所示
可以看出每一行通過空格來劃分,第一個為時間,第二個為提示類型,第三個為提示資訊來源…
我們現在想統計一下提示類型分别有幾種,以及各種提示資訊來源有幾種。
首先建一個sortCount的類
package pack;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class sortCount {
public static void main(String [] args) throws IOException
{
Configuration conf =new Configuration();
Job job=new Job(conf);
job.setJarByClass(sortCount.class);
FileInputFormat.addInputPath(job, new Path("/anaconda.syslog"));
//必須有,這就是示例檔案
FileOutputFormat.setOutputPath(job,new Path("/output") );
//必須沒有,輸出的結果
job.setMapperClass(myMapper.class);
//設定Map類
job.setReducerClass(myReducer.class);
//設定Reducer類
job.setCombinerClass(myReducer.class);
//預先為reducer處理資料,見下文詳解
job.setNumReduceTasks(2);
//設定兩個Reduce來執行任務
job.setPartitionerClass(myPartitioner.class);
job.setOutputKeyClass(Text.class);設定輸出的鍵的類型
job.setOutputValueClass(IntWritable.class);設定輸出的值的類型
try {
System.exit(job.waitForCompletion(true) ? 0 : 1); //執行語句
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
上面用到combiner,現對其進行分析
假如現在有兩個maptask,combiner的角色就相當于代收作業的班長來減輕老師的負擔一樣。Maptask1和Maptask2為兩個班,combiner是各個班的班長,班長提前收好作業,老師reducer的負擔就減輕了。
類myMapper
package pack;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class myMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
protected void map(LongWritable key ,Text value ,Context context) throws IOException, InterruptedException
{
//Text value 是一整行的資料 MAp源碼會自動循環調用此函數隻到把每一行都讀入進來。
String [] str =value.toString().split(" ");
//for(String ss:str)
//{
//context.write(new Text(ss),new IntWritable(1)); //不知道把結果發給誰,用上下文
//}
//}//此注釋代碼用于wordCount示例
Text ss = new Text();
ss.set(new StringBuffer("loglevel::").append(str[1]).toString());//添加一個頭部,用于區分
Text ss2 = new Text();
ss2.set(new StringBuffer("logresource::").append(str[2]).toString());
context.write(ss,new IntWritable(1));
context.write(ss2,new IntWritable(1));
//統計anaconda.syslog檔案的各種提示出現次數
}}
類myReducer
package pack;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
//到reduce之前combiner已經自動把相同值(key)的數放到一起,現在隻需要将其個數相加就可以了。
protected void reduce(Text key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum=0;
Iterator<IntWritable> it = values.iterator();
while(it.hasNext())
{
sum+=it.next().get();
}
context.write(key, new IntWritable(sum));
}
}
//ctrl+shift+t 查找類——>小技巧
類myPartitioner
package pack;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class myPartitioner extends Partitioner<Text, IntWritable>{
@Override
public int getPartition(Text key, IntWritable value, int arg2) //arg2為設定的reducer數量{
// TODO Auto-generated method stub
if(key.toString().startsWith("loglevel::"))
return 0;//分發到第一個reducer
else if(key.toString().startsWith("logresource::"))
return 1;//分發到第二個reducer 0,1要以次寫出來
return 0;
}
}
打包執行,見下圖說明操作成功
(●’◡’●**)歡迎指教!**