WordCount執行個體
主程式
package com.lzz.hadoop.mapreduce.wordcount;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MRRunJob {
public static void main1(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
//namenode入口ip
conf.set("fs.defaultFS", "hdfs://192.168.211.110:8020");
conf.set("hadoop.tmp.dir","D:\\tmp\\hadoop-70452");
Job job = Job.getInstance(conf, "mywc");;
//主方法
job.setJarByClass(MRRunJob.class);
//map方法名
job.setMapperClass(WordCountMapper.class);
//reducer方法名
job.setReducerClass(WordCountReducer.class);
//map輸出的key類型
job.setOutputKeyClass(Text.class);
//map輸出的value類型
job.setOutputValueClass(IntWritable.class);
//讀取的檔案位置
FileInputFormat.addInputPath(job, new Path("/wcinput/"));
//處理完之後資料存放位置,注意輸出的檔案夾如果已經存在會報錯
FileOutputFormat.setOutputPath(job, new Path("/wcoutput/"));
boolean f = job.waitForCompletion(true);
}
}
Mapper
package com.lzz.hadoop.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//用空格進行分詞
String[] str = value.toString().split(" ");
//for循環輸出
for (int i = 0; i < str.length; i++) {
//new Tex,new IntWritable進行可序列化
context.write(new Text(str[i]), new IntWritable(1));
}
}
}
Reducer
package com.lzz.hadoop.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//mapper切分後是a 1 b 1 a 1 c 1 的形式,輸出a 2 b 1 c 1形式
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//資料分組合并輸出
@Override
protected void reduce(Text test, Iterable<IntWritable> arg1, Context arg2)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : arg1) {
sum = sum + i.get();
}
arg2.write(test, new IntWritable(sum));
}
}
排序統計用例
主程式
package com.lzz.hadoop.mapreduce.airsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RunJob {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
//基本配置 namenode入口 IP
conf.set("fs.defaultFS", "hdfs://192.168.211.110:8020");
conf.set("mapred.jar", "/soft/hadoop-0.0.1-SNAPSHOT.jar");
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf, "weather");
//主方法
job.setJarByClass(RunJob.class);
//Map方法名
job.setMapperClass(MyMapper.class);
//InputFormat方法名
job.setInputFormatClass(KeyValueTextInputFormat.class);
//Reducer方法名
job.setReducerClass(MyReducer.class);
//Partitioner方法名
job.setPartitionerClass(MyPartitioner.class);
//SortComparator方法名
job.setSortComparatorClass(MySort.class);
//GroupingComparator方法名
job.setGroupingComparatorClass(MyGroup.class);
//Reducer Text的數量
job.setNumReduceTasks(3);
//Map輸出的Key類型
job.setOutputKeyClass(MyKey.class);
//Map輸出的Value類型
job.setOutputValueClass(Text.class);
//讀取檔案的位置
FileInputFormat.addInputPath(job, new Path("/wcinput"));
//處理完之後資料存放位置,注意輸出的檔案夾如果已經存在會删除
Path path = new Path("/wcoutput");
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
boolean f = job.waitForCompletion(true);
System.out.println("f:" + f);
}
}
Mapper
package com.lzz.hadoop.mapreduce.airsort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//輸入的key和value都是text類型,将溫度和年月分割後,輸出的是封裝後的MyKey,溫度仍為我們想看到的text類型
public class MyMapper extends Mapper<Text, Text, MyKey, Text> {
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
//年月日通過-進行分割提取
String[] strArray = key.toString().split("-");
//将年月溫度封裝到myKey中
MyKey myKey = new MyKey();
myKey.setYear(Integer.parseInt(strArray[0]));
myKey.setMonth(Integer.parseInt(strArray[1]));
myKey.setAir(Double.parseDouble(value.toString()));
//”\t”相當于Tab鍵
context.write(myKey, new Text(key.toString() + "\t" + value));
}
}
Reducer
package com.lzz.hadoop.mapreduce.airsort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReducer extends Reducer<MyKey, Text, NullWritable, Text> {
@Override //取出前三條
protected void reduce(MyKey arg0, Iterable<Text> arg1, Context ctx)
throws IOException, InterruptedException {
int sum = 0; //計數器
for (Text t : arg1) {
sum++;
//如果大于3
if (sum > 3) {
//跳出
break;
} else {
//寫入記錄
ctx.write(NullWritable.get(), t);
}
}
}
}
Group分組
package com.lzz.hadoop.mapreduce.airsort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//1949-10-29 23:29:12 39
public class MyGroup extends WritableComparator {
//繼承WritableComparator類來實作排序
public MyGroup() {
super(MyKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey myKey1 = (MyKey) a;
MyKey myKey2 = (MyKey) b;
//年做對比,如果在同一年中就傳回所在月份,不在同一年就傳回比較結果
int r1 = Integer.compare(myKey1.getYear(), myKey2.getYear());
//如果年相等
if (r1 == 0) {
return Integer.compare(myKey1.getMonth(), myKey2.getMonth());
}
return r1;
}
}
Partitioner
package com.lzz.hadoop.mapreduce.airsort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<MyKey, Text> {
@Override
//已知最小年份是1949,計算分區的位置,numbers為reduce的數量
public int getPartition(MyKey key, Text value, int numPartitions) {
//假如1949,1950,1951這三年
//1949-1949=0; 得到第一個分區
//1950-1949=1; 得到第二個分區
//1951-1949=2; 得到第三個分區
return (key.getYear() - 1949) % numPartitions;
}
}
Sort
package com.lzz.hadoop.mapreduce.airsort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MySort extends WritableComparator {
//使用super調用序列化的構造函數
public MySort() {
super(MyKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//MyKey進行排序處理分組合并
MyKey myKey1 = (MyKey) a;
MyKey myKey2 = (MyKey) b;
//年做對比
int r1 = Integer.compare(myKey1.getYear(), myKey2.getYear());
//如果年相等就比較月,年不等就傳回年比較結果
if (r1 == 0) {
//月做對比
int r2 = Integer.compare(myKey1.getMonth(), myKey2.getMonth());
//如果月相等就溫度排序,月不相等就傳回月比較結果
if (r2 == 0) {
//月相等就把溫度按倒序排列,加-,表示倒序排列
return -Double.compare(myKey1.getAir(), myKey2.getAir());
}
return r2;
}
return r1;
}
}
Model
package com.lzz.hadoop.mapreduce.airsort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@SuppressWarnings("rawtypes")
public class MyKey implements WritableComparable { //可序列化;可比較
private int year; //年
private int month; //月
private double air; //溫度
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public double getAir() {
return air;
}
public void setAir(double air) {
this.air = air;
}
//@Override//序列化
public void write(DataOutput out) throws IOException {
//通過write方法寫入序列化的資料流
out.writeInt(year);
out.writeInt(month);
out.writeDouble(air);
}
//@Override//反序列化
public void readFields(DataInput in) throws IOException {
//通過readFields方法從序列化的資料流中讀出進行指派
year = in.readInt();
month = in.readInt();
air = in.readDouble();
}
//@Override
//按照字典順序進行比較,傳回的值是一個int型
public int compareTo(Object o) {
return this == o ? 0 : -1;
}
}
用例
結果