map的源碼:
package com.sfd.wordcount;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 自定義的map函數需要繼承mapreduce架構的規範,繼承Mapper類,這個類有
* 四個泛型,他規定了map的輸入,輸出類型,map的輸入,輸出都是鍵值對的
* 形式,是以 ,四個泛型的前兩個分别表示的是map輸入的鍵和值的格式,後兩個
* 分别表示map的輸出的鍵和值格式。
* 之是以使用架構自定義的LongWritable 和Text類型而不是Long和String類型是因為
* 後者中包裝了大量的與傳輸資料無關的内容(如:方法。。。),會影響架構的執行
* 效率。
*
* @author sfd
*/
public class WCMap extends Mapper<LongWritable,Text,Text,LongWritable>{
/**
* 每當讀取一行資料是調用此方法,其中key表示的是這行資料的初始偏移量,
* value則是我們在方法中寫的業務代碼所要使用的資料
*/
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//将Text類型的資料轉化成String類型的資料并對其進行拆分得到單詞數組
String line=value.toString();
String[] words=StringUtils.split(line, " ");
//周遊數組并把資料以鍵值對的形式傳遞給reduce
for(String word:words){
context.write(new Text(word), new LongWritable());
}
}
}
reduce的源碼
package com.sfd.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* *自定義的Reduce函數需要繼承mapreduce架構的規範,繼承Reduce類,這個類有
* 四個泛型,他規定了Reduce的輸入,輸出類型,Reduce的輸入,輸出都是鍵值對的
* 形式,是以 ,四個泛型的前兩個分别表示的是Reduce輸入的鍵和值的格式,後兩個
* 分别表示Reduce的輸出的鍵和值格式。
* @author sfd
*
*/
public class WCReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
/**
* 架構處理完map之後将輸出的鍵值對緩存起來,進行分組,形成<key,values{}>形式的鍵值對
* 在将這個鍵值對傳遞給reduce下面的函數,(第二個參數可以看成是一個List集合)
* 如:<"hello",{1,1,1,1......}>
*/
@Override
protected void reduce(Text word, Iterable<LongWritable> values,
Context context)
throws IOException, InterruptedException {
//定義一個計數器,用來計算單詞的個數
long count=;
//周遊集合求出單詞個數
for(LongWritable value:values){
count+=value.get();
}
//輸出這個單詞的統計結果
context.write(word, new LongWritable(count));
}
}
作業的排程類
package com.sfd.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 用來描述一個特定的作業
* 例如:使用哪個類作為邏輯處理的map那個類作為邏輯處理的reduce
* 還可指定作業所要處理的資料的輸入路徑和處理結果輸出路徑
* @author sfd
*
*/
public class WCRunner {
public static void main(String[] args) throws Exception {
//得到作業執行個體對象
Configuration conf=new Configuration();
Job wcJob=Job.getInstance(conf);
//設定作業所需的類在哪個jar包
wcJob.setJarByClass(WCRunner.class);
//指定哪個類作為邏輯處理的map那個類作為邏輯處理的reduce
wcJob.setMapperClass(WCMap.class);
wcJob.setReducerClass(WCReduce.class);
//指定map和reduce輸出的鍵值對的資料類型
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(LongWritable.class);
//指定map輸出的鍵值對的輸出類型,這個方法将會覆寫上面設定的map的
//設定
wcJob.setMapOutputKeyClass(Text.class);
wcJob.setMapOutputValueClass(LongWritable.class);
//指定業務所處裡資料的來源
FileInputFormat.setInputPaths(wcJob, new Path("/wc/input"));
//指定業務處理後産生結果所存方的路徑
FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output"));
//将作頁交給叢集運作
wcJob.waitForCompletion(true);
}
}