MapReduce的幾個企業級經典面試案例
一、官方統計案例:
- 要求:統計一下單詞出現的次數
- 測試資料:
zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin
- 編寫代碼:
- mapper類
/** * @author 17616 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 首先擷取一行資料 String line = value.toString (); // 将行内的單詞進行切分,使用一個數組進行儲存,切分資料時根據源資料得知可以使用空格的方式切分。 String[] arr = line.split (" "); for (String str : arr) { context.write (new Text (str), new LongWritable (1)); } } }
- reducer類
/** * @author 17616 */ public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // 定義變量記錄單詞出現的次數 long sum = 0; for (LongWritable val : values) { // 記錄總次數 sum += val.get (); } // 輸出資料,key就是單詞,value就是在map階段這個單詞出現的總次數 context.write (key, new LongWritable (sum)); } }
- Driver類
/** * @author 17616 * 官方案例,計算統計 */ public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 擷取目前的預設配置 Configuration conf = new Configuration (); // 擷取代表目前mr作業的job對象 Job job = Job.getInstance (conf); // 指定一下目前程式的入口類 job.setJarByClass (WordCountDriver.class); //指定目前Mapper、Reducer任務的類 job.setMapperClass (WordCountMapper.class); job.setReducerClass (WordCountReducer.class); //設定Mapper的結果類型 job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (LongWritable.class); // 設定Reducer的結果類型 job.setOutputKeyClass (Text.class); job.setOutputValueClass (LongWritable.class); //設定待分析的檔案夾路徑(linux的路徑位址) FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/mapreduce")); if (!job.waitForCompletion (true)) { return; } } }
- mapper類
- 運作結果:
zhangqin 20 zhangrui 20 zhangyong 20
二、計算平均值:
- 要求:計算一下資料的平均值
- 測試資料:
tom 69 tom 84 tom 68 jary 89 jary 90 jary 81 jary 35 alex 23 alex 100 alex 230
- 編寫代碼:
- mapper類
/** * @Author zhangyong * @Date 2020/4/3 23:43 * @Version 1.0 */ public class AverageMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //擷取每行的資料内容 String line = value.toString (); //按照空格去切會擷取到多個資料,是以用數組的方式存儲 String[] data = line.split (" "); String name = data[0]; //Integer做一個資料類型的強制轉換。 int score = Integer.parseInt (data[1]); //輸出資料 context.write (new Text (name), new IntWritable (score)); } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/3 23:43 * @Version 1.0 */ public class AverageReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text name, Iterable<IntWritable> scores, Context context) throws IOException, InterruptedException { int i = 0; int score = 0; for (IntWritable data : scores) { score = score + data.get (); i++; } int average = score / i; context.write (name, new IntWritable (average)); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/3 23:41 * @Version 1.0 * 計算平均值 */ public class AverageDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); //驅動類,入口類 job.setJarByClass (AverageDriver.class); //設定Mapper和Reducer的類 job.setMapperClass (AverageMapper.class); job.setReducerClass (AverageReducer.class); //設定Mapper的結果類型 job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (IntWritable.class); //設定Reduce的結果類型 job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class); //設定待分析的檔案夾路徑(linux的路徑位址) FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/average")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/average")); //送出到job job.waitForCompletion (true); } }
- mapper類
- 運作結果:
alex 117 jary 73 tom 73
- 測試資料:
三、求溫度最高值:
- 要求:求出一下年限的時間的最高溫度
- 測試資料:
2329999919500515070000 9909999919500515120022 9909999919500515180011 9509999919490324120111 6509999919490324180078 9909999919370515070001 9909999919370515120002 9909999919450515180001 6509999919450324120002 8509999919450324180078
- 編寫代碼:
- mapper類
/** * @author 17616 */ public class HeightMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //擷取一段資料 String line = value.toString (); //擷取年份 String year = line.substring (8, 12); //擷取溫度(強制轉換一下) int t = Integer.parseInt (line.substring (18, 22)); context.write (new Text (year),new LongWritable (t)); } }
- reducer類
/** * @author 17616 */ public class HeightReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text year, Iterable<LongWritable> t, Context context) throws IOException, InterruptedException { long max = 0; for (LongWritable data : t) { if (max < data.get ()) { max = data.get (); } } context.write (year, new LongWritable (max)); } }
- Driver類
/** * @author 17616 * -求最大值 */ public class HeightDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 擷取目前的預設配置 Configuration conf = new Configuration (); // 擷取代表目前mr作業的job對象 Job job = Job.getInstance (conf); // 指定一下目前程式的入口類 job.setJarByClass (HeightDriver.class); //指定目前Mapper、Reducer任務的類 job.setMapperClass (HeightMapper.class); job.setReducerClass (HeightReducer.class); //設定Mapper的結果類型 job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (LongWritable.class); // 設定Reducer的結果類型 job.setOutputKeyClass (Text.class); job.setOutputValueClass (LongWritable.class); //設定待分析的檔案夾路徑(linux的路徑位址) FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/wendu/")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wendu")); job.waitForCompletion (true); } }
- mapper類
- 運作結果:
1937 2 1945 78 1949 111 1950 22
四、資料去重:
- 要求:去重一下ip位址
- 測試資料:
192.168.234.21 192.168.234.22 192.168.234.21 192.168.234.21 192.168.234.23 192.168.234.21 192.168.234.21 192.168.234.21 192.168.234.25 192.168.234.21 192.168.234.21 192.168.234.26 192.168.234.21 192.168.234.27 192.168.234.21 192.168.234.27 192.168.234.21 192.168.234.29 192.168.234.21 192.168.234.26 192.168.234.21 192.168.234.25 192.168.234.25 192.168.234.25 192.168.234.21 192.168.234.22 192.168.234.21
- 編寫代碼:
- mapper類
/** * @Author zhangyong * @Date 2020/4/7 19:53 * @Version 1.0 */ public class DisMapper extends Mapper<LongWritable,Text,Text,NullWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 其中value隻是一個變量,此處被當做key進行輸出 */ context.write (value,NullWritable.get ()); } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/7 21:21 * @Version 1.0 */ public class DisReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write (key, NullWritable.get ()); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/7 21:32 * @Version 1.0 * 資料去重 */ public class DisDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); //設定Drive類 job.setJarByClass (DisReducer.class); //設定Mapper、Reduce類 job.setMapperClass (DisMapper.class); job.setReducerClass (DisReducer.class); //Mapper的輸出 job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (NullWritable.class); //位址 FileInputFormat.setInputPaths (job,new Path ("hdfs://anshun115:9000/distinct")); FileOutputFormat.setOutputPath (job,new Path ("hdfs://anshun115:9000/result/distinct")); job.waitForCompletion (true); } }
- mapper類
- 運作結果:
192.168.234.21 192.168.234.22 192.168.234.23 192.168.234.25 192.168.234.26 192.168.234.27 192.168.234.29
五、流量統計:
- 要求:統計一下手機号碼使用的流量數
- 測試資料:
13901000123 zs bj 343 13202111011 ww sh 456 13901000123 zs bj 1024 13207551234 ls sz 758
- 編寫代碼:
- Bean類
/** * @Author zhangyong * @Date 2020/4/10 8:01 * @Version 1.0 */ public class FlowBean implements Writable { private String phone; private String name; private String addr; private long flow; /** * 序列化 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF (phone); dataOutput.writeUTF (name); dataOutput.writeUTF (addr); dataOutput.writeLong (flow); } /** * 反序列化 * * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.phone = dataInput.readUTF (); this.name = dataInput.readUTF (); this.addr = dataInput.readUTF (); this.flow = dataInput.readLong (); } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAddr() { return addr; } public void setAddr(String addr) { this.addr = addr; } public long getFlow() { return flow; } public void setFlow(long flow) { this.flow = flow; } @Override public String toString() { return "FlowBean{" + "phone='" + phone + '\'' + ", name='" + name + '\'' + ", addr='" + addr + '\'' + ", flow=" + flow + '}'; } }
- mapper類
/** * @Author zhangyong * @Date 2020/4/10 8:10 * @Version 1.0 */ public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //擷取行 String line = value.toString (); /** * [13901000123,zk,bj,343] * phone = 13901000123; * name = zk; * addr = bj; * flow = 343; */ String[] info = line.split (" "); FlowBean flowBean = new FlowBean (); flowBean.setPhone (info[0]); flowBean.setName (info[1]); flowBean.setAddr (info[2]); flowBean.setFlow (Integer.parseInt (info[3])); context.write (new Text (flowBean.getName ()), flowBean); } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/10 8:23 * @Version 1.0 */ public class FlowReducer extends Reducer<Text, FlowBean, FlowBean, NullWritable> { @Override public void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { FlowBean result = new FlowBean (); for (FlowBean value : values) { result.setPhone (value.getPhone ()); result.setName (value.getName ()); result.setAddr (value.getAddr ()); result.setFlow (result.getFlow () + value.getFlow ()); } context.write (result, NullWritable.get ()); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/10 8:28 * @Version 1.0 * 流量統計 */ public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); // 加載主類 job.setJarByClass (FlowDriver.class); //加載mapper、reduce類 job.setMapperClass (FlowMapper.class); job.setReducerClass (FlowReducer.class); //設定map的的key、value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //設定輸出的的key、value job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass (NullWritable.class); //設定路徑(傳輸、結果) FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/flow")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/flow")); job.waitForCompletion (true); } }
- Bean類
- 運作結果:
FlowBean{phone='13207551234', name='ls', addr='sz', flow=758} FlowBean{phone='13202111011', name='ww', addr='sh', flow=456} FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}
六、電影的排行榜:
- 要求:按照降序排列一下電影的熱度
- 測試資料:
中國機長 72 機械師2 83 奇異博士 87 流浪地球 79 複仇者聯盟4:終局之戰 94 驚奇隊長 68 蜘蛛俠:英雄遠征 80 長城 56 奪路而逃 69 神奇動物在哪裡 57 驢得水 59 我不是潘金蓮 55 速度與激情:特别行動 77 哪吒之魔童降世 96 捉迷藏 78 上海堡壘 9 葉問4 75 勇士之門 35 羅曼蒂克消亡史 67 阿麗塔:戰鬥天使 89
- 編寫代碼:
- Bean類
/** * @Author zhangyong * @Date 2020/4/13 8:42 * @Version 1.0 */ public class MovieBean implements WritableComparable<MovieBean> { private String name; private int hot; /** * 排序方法 * * @param o * @return */ @Override public int compareTo(MovieBean o) { return o.hot - this.hot; } /** * 序列化 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF (name); dataOutput.writeInt (hot); } /** * 反序列化 * * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.name = dataInput.readUTF (); this.hot = dataInput.readInt (); } public void setName(String name) { this.name = name; } public int getHot() { return hot; } public void setHot(int hot) { this.hot = hot; } @Override public String toString() { return "MovieBean{" + "name='" + name + '\'' + ", hot=" + hot + '}'; } }
- mapper類
/** * @Author zhangyong * @Date 2020/4/13 8:52 * @Version 1.0 */ public class MovieMapper extends Mapper<LongWritable, Text, MovieBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //擷取一行 String line = value.toString (); //截取資料 String[] split = line.split (" "); //封裝對象 MovieBean movieBean = new MovieBean (); movieBean.setName (split[0]); movieBean.setHot (Integer.parseInt (split[1])); //輸出 context.write (movieBean, NullWritable.get ()); } }
- reducer類
無
- Driver類
/** * @Author zhangyong * @Date 2020/4/13 9:19 * @Version 1.0 */ public class MovieDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (MovieDriver.class); job.setMapperClass (MovieMapper.class); //加載map輸出類型和value的輸出類型 job.setMapOutputKeyClass (MovieBean.class); job.setMapOutputValueClass (NullWritable.class); FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/sort")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/sort")); job.waitForCompletion (true); } }
- Bean類
- 運作結果:
MovieBean{name='哪吒之魔童降世', hot=96} MovieBean{name='複仇者聯盟4:終局之戰', hot=94} MovieBean{name='阿麗塔:戰鬥天使', hot=89} MovieBean{name='奇異博士', hot=87} MovieBean{name='機械師2', hot=83} MovieBean{name='蜘蛛俠:英雄遠征', hot=80} MovieBean{name='流浪地球', hot=79} MovieBean{name='捉迷藏', hot=78} MovieBean{name='速度與激情:特别行動', hot=77} MovieBean{name='葉問4', hot=75} MovieBean{name='中國機長', hot=72} MovieBean{name='奪路而逃', hot=69} MovieBean{name='驚奇隊長', hot=68} MovieBean{name='羅曼蒂克消亡史', hot=67} MovieBean{name='驢得水', hot=59} MovieBean{name='神奇動物在哪裡', hot=57} MovieBean{name='長城', hot=56} MovieBean{name='我不是潘金蓮', hot=55} MovieBean{name='勇士之門', hot=35} MovieBean{name='上海堡壘', hot=9}
七、多個檔案統計成績:
- 要求:根據三張表統計每個同學的各課成績的總和
-
測試資料:
chinese.txt
english.txt1 alex 89 2 alex 73 3 alex 67 1 romeo 49 2 romeo 83 3 romeo 27 1 lee 77 2 lee 66 3 lee 89
math.txt1 alex 55 2 alex 69 3 alex 75 1 romeo 44 2 romeo 64 3 romeo 86 1 lee 76 2 lee 84 3 lee 93
1 alex 85 2 alex 59 3 alex 95 1 romeo 74 2 romeo 67 3 romeo 96 1 lee 45 2 lee 76 3 lee 67
- 編寫代碼:
- Bean類
/** * @Author zhangyong * @Date 2020/4/10 10:00 * @Version 1.0 */ public class ScoreBean implements Writable { private String name; private int chinese; private int math; private int english; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(name); dataOutput.writeInt(chinese); dataOutput.writeInt(math); dataOutput.writeInt(english); } @Override public void readFields(DataInput dataInput) throws IOException { this.name = dataInput.readUTF(); this.chinese = dataInput.readInt(); this.math = dataInput.readInt(); this.english = dataInput.readInt(); } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getChinese() { return chinese; } public void setChinese(int chinese) { this.chinese = chinese; } public int getMath() { return math; } public void setMath(int math) { this.math = math; } public int getEnglish() { return english; } public void setEnglish(int english) { this.english = english; } @Override public String toString() { return "StuScore{" + "name='" + name + '\'' + ", chinese=" + chinese + ", math=" + math + ", english=" + english + '}'; } }
- mapper類
/** * @Author zhangyong * @Date 2020/4/10 10:03 * @Version 1.0 */ public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 擷取一行資料 String line = value.toString(); String[] data = line.split(" "); ScoreBean ss = new ScoreBean(); ss.setName(data[1]); /** * 注意,此處導包的時候不要導錯,應該導入的是org.apache.hadoop.mapreduce.lib.input.FileSplit; * 通過擷取目前map階段的MapTask處理的切片資訊來擷取檔案名。 */ FileSplit split = (FileSplit) context.getInputSplit(); if (split.getPath().getName().equals("chinese.txt")) { ss.setChinese(Integer.parseInt(data[2])); } else if (split.getPath().getName().equals("math.txt")) { ss.setMath(Integer.parseInt(data[2])); } else if (split.getPath().getName().equals("english.txt")) { ss.setEnglish(Integer.parseInt(data[2])); } context.write(new Text(ss.getName()), ss); } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/10 10:05 * @Version 1.0 */ public class ScoreReducer extends Reducer<Text, ScoreBean, Text, ScoreBean> { @Override protected void reduce(Text key, Iterable<ScoreBean> values, Context context) throws IOException, InterruptedException { ScoreBean resultScore = new ScoreBean(); // 此處key.toSting中隻有name一個值,因為在map階段的輸出key隻有name resultScore.setName(key.toString()); for (ScoreBean value : values) { // result.setFlow(result.getFlow() + value.getFlow()); // 國文成績分數 resultScore.setChinese(resultScore.getChinese() + value.getChinese()); // 數學成績分 resultScore.setMath(resultScore.getMath() + value.getMath()); // 英語成績分 resultScore.setEnglish(resultScore.getEnglish() + value.getEnglish()); } context.write(key, resultScore); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/10 10:10 * @Version 1.0 * 統計成績 */ public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); //加載Drive類 job.setJarByClass (ScoreDriver.class); //加載Mapper、Reducer類 job.setMapperClass (ScoreMapper.class); job.setReducerClass (ScoreReducer.class); //加載map輸出類型和value的輸出類型 job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (ScoreBean.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (ScoreBean.class); FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/score")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/score")); job.waitForCompletion (true); } }
- Bean類
- 運作結果:
alex StuScore{name='alex', chinese=229, math=239, english=199} lee StuScore{name='lee', chinese=232, math=188, english=253} romeo StuScore{name='romeo', chinese=159, math=237, english=194}
八、Job鍊處理資料:
- 要求:統計每個同學的總分
- 測試資料:
1|zhang 100 2|wang 200 3|zhang 150 4|lisi 190 5|wang 50 6|zhang 80 7|lisi 50
- 編寫代碼:
- Bean類
/** * @Author zhangyong * @Date 2020/4/14 15:06 * @Version 1.0 */ public class CountBean implements WritableComparable<CountBean> { private String name; private int count; @Override public int compareTo(CountBean o) { return o.count - this.count; } /** * 序列化 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF (name); dataOutput.writeInt (count); } /** * 反序列化 * * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.name = dataInput.readUTF (); this.count = dataInput.readInt (); } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public String toString() { return "CountBean{" + "name='" + name + '\'' + ", count=" + count + '}'; } }
-
mapper類
OneCountMapper類
TwoCountMapper類/** * @Author zhangyong * @Date 2020/4/14 8:54 * @Version 1.0 */ public class OneCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString (); /** * 通過 | 進行切分時會得到一個數組,數組中的0号下标為序号,1号下标中有name和profit的資料。 * 再通過切分1号下标中的資料時可以擷取到name和profit的資料。 */ String name = line.split ("\\|")[1].split (" ")[0]; int count = Integer.parseInt (line.split ("\\|")[1].split (" ")[1]); context.write (new Text (name),new IntWritable (count)); } }
/** * @Author zhangyong * @Date 2020/4/14 15:27 * @Version 1.0 */ public class TwoCountMapper extends Mapper<LongWritable, Text, CountBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString (); /** * 因為第二個Mapper任務要讀取的資料内容是第一個MR任務的結果檔案,通常MR任務的結果檔案是以TAB的方式來展示資料的。 * 是以當第二個Mapper任務要執行切分時,所使用的分隔符應該是\t——制表符。 */ String name = line.split ("\t")[0]; int count = Integer.parseInt (line.split ("\t")[1]); CountBean bean = new CountBean (); bean.setName (name); bean.setCount (count); context.write (bean, NullWritable.get ()); } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/14 8:54 * @Version 1.0 */ public class OneCountReducer extends Reducer<Text , IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //統計利潤 int sum = 0; for (IntWritable value : values) { sum+=value.get (); } context.write (key,new IntWritable (sum)); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/14 8:54 * @Version 1.0 * job鍊操作資料 */ public class CountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (CountDriver.class); job.setMapperClass (OneCountMapper.class); job.setReducerClass (OneCountReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (IntWritable.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class); FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/count")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/count")); if (job.waitForCompletion (true)) { // 設定第二個Job任務 Job job2 = Job.getInstance (conf); // 設定第二個Job任務的Mapper job2.setMapperClass (TwoCountMapper.class); job2.setMapOutputKeyClass (CountBean.class); job2.setMapOutputValueClass (NullWritable.class); /** * 設定第二個Job任務是輸入輸出路徑。 * 此處的輸入路徑是第一個job任務的輸出路徑 * 注意設定路徑時,裡面傳入的job應該是目前的job任務,如下所示,應該是job2。 * 如果寫成前面的job任務名稱,在運作時則會爆出錯誤,提示路徑不存在。 */ FileInputFormat.setInputPaths (job2, new Path ("hdfs://anshun115:9000/result/count")); FileOutputFormat.setOutputPath (job2, new Path ("hdfs://anshun115:9000/result/count2")); // 此處送出任務時,注意用的是job2。 job2.waitForCompletion (true); } } }
- Bean類
-
運作結果:
count.txt
count2.txtlisi 240 wang 250 zhang 330
CountBean{name='zhang', count=330} CountBean{name='wang', count=250} CountBean{name='lisi', count=240}
九、簡單分區案例:
- 要求:分區顯示手機使用流量的總和
- 測試資料:
13901000123 zs bj 343 13202111011 ww sh 456 13901000123 zs bj 1024 13207551234 ls sz 758
2. 編寫代碼: - Partitioner類 ```java /** * @Author zhangyong * @Date 2020/4/10 10:42 * @Version 1.0 */ public class AddPartitioner extends Partitioner<Text, PartFlowBean> { @Override public int getPartition(Text text, PartFlowBean flowBean, int numPartitioner) { String addr = flowBean.getAddr(); if (addr.equals("bj")) { return 0; } else if (addr.equals("sh")) { return 1; } else { return 2; } } }
- Bean類
/** * @Author zhangyong * @Date 2020/4/10 10:01 * @Version 1.0 */ public class PartFlowBean implements Writable { private String phone; private String name; private String addr; private long flow; /** * 序列化 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF (phone); dataOutput.writeUTF (name); dataOutput.writeUTF (addr); dataOutput.writeLong (flow); } /** * 反序列化 * * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.phone = dataInput.readUTF (); this.name = dataInput.readUTF (); this.addr = dataInput.readUTF (); this.flow = dataInput.readLong (); } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAddr() { return addr; } public void setAddr(String addr) { this.addr = addr; } public long getFlow() { return flow; } public void setFlow(long flow) { this.flow = flow; } @Override public String toString() { return "FlowBean{" + "phone='" + phone + '\'' + ", name='" + name + '\'' + ", addr='" + addr + '\'' + ", flow=" + flow + '}'; } }
- mapper類
/** * @Author zhangyong * @Date 2020/4/10 10:51 * @Version 1.0 */ public class PartFlowMapper extends Mapper<LongWritable, Text, Text, PartFlowBean> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString (); /** [13901000123,zk,bj,343] phone = 13901000123; name = zk; addr = bj; flow = 343; */ String[] info = line.split (" "); PartFlowBean flowBean = new PartFlowBean (); flowBean.setPhone (info[0]); flowBean.setName (info[1]); flowBean.setAddr (info[2]); flowBean.setFlow (Integer.parseInt (info[3])); context.write (new Text (flowBean.getName ()), flowBean); } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/10 10:23 * @Version 1.0 */ public class PartFlowReducer extends Reducer<Text, PartFlowBean, PartFlowBean, NullWritable> { @Override public void reduce(Text key, Iterable<PartFlowBean> values, Context context) throws IOException, InterruptedException { PartFlowBean result = new PartFlowBean (); for (PartFlowBean value : values) { result.setPhone (value.getPhone ()); result.setPhone (value.getPhone ()); result.setName (value.getName ()); result.setAddr (value.getAddr ()); result.setFlow (result.getFlow () + value.getFlow ()); } context.write (result, NullWritable.get ()); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/11 11:17 * @Version 1.0 * 分區案例 */ public class PartFlowDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (PartFlowDriver.class); job.setMapperClass (PartFlowMapper.class); job.setReducerClass (PartFlowReducer.class); /** * 下面的兩個類如果不寫的話,那麼就不會生效。 */ // 設定分區類 job.setPartitionerClass (AddPartitioner.class); // 設定分區數量 job.setNumReduceTasks (3); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (PartFlowBean.class); job.setOutputKeyClass (PartFlowBean.class); job.setOutputValueClass (NullWritable.class); FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/partition")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/partition")); job.waitForCompletion (true); } }
- 測試資料:
-
運作結果:
part-r-00000
part-r-00001FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}
part-r-00002FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}
FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}
十、分區并全排序:
- 要求:把一下資料按照兩位數、三位數、四位數以上進行分區,并且按照大小排序
- 測試資料:
82 239 231 23 22 213 123 232 124 213 3434 232 4546 565 123 231 231 2334 231 1123 5656 657 12313 4324 213 123 2 232 32 343 123 4535 12321 3442 453 1233 342 453 1231 322 452 232 343 455 3123 3434 3242
- 編寫代碼:
- Partitioner類
/** * @Author zhangyong * @Date 2020/4/14 9:39 * @Version 1.0 * 全排序 * 将上述檔案内容按照數字位數分别寫入三個檔案,如下 * 0-99的寫入到檔案1 * 100-999寫入到檔案2 * 1000-其他資料寫入到檔案3 */ public class AutoPartitioner extends Partitioner<IntWritable, IntWritable> { @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { String num = String.valueOf (key.get ()); if (num.matches ("[0-9][0-9]") || num.matches ("[0-9]")) { return 0; } else if (num.matches ("[0-9][0-9][0-9]")) { return 1; } else { return 2; } } }
- mapper類
/** * @Author zhangyong * @Date 2020/4/14 9:44 * @Version 1.0 */ public class NumSortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString (); String[] data = line.split (" "); for (String num : data) { context.write (new IntWritable (Integer.parseInt (num)), new IntWritable (1)); } } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/14 9:39 * @Version 1.0 */ public class NumSortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int result = 0; for (IntWritable count : values) { result = result + count.get (); } context.write (key, new IntWritable (result)); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/14 9:39 * @Version 1.0 * */ public class NumSortDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (NumSortDriver.class); job.setMapperClass (NumSortMapper.class); job.setMapOutputKeyClass (IntWritable.class); job.setMapOutputValueClass (IntWritable.class); /** * 由于結果檔案系統是3個,是以需要在此指定Reduce的分區類和任務數。 */ job.setPartitionerClass (AutoPartitioner.class); job.setNumReduceTasks (3); job.setReducerClass (NumSortReducer.class); job.setOutputKeyClass (IntWritable.class); job.setOutputValueClass (IntWritable.class); FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/numcount/")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/numcount")); job.waitForCompletion (true); } }
- Partitioner類
-
運作結果:
part-r-00000
part-r-000012 1 22 1 23 1 32 1 82 1
part-r-00002123 4 124 1 213 3 231 4 232 4 239 1 322 1 342 1 343 2 452 1 453 2 455 1 565 1 657 1
1123 1 1231 1 1233 1 2334 1 3123 1 3242 1 3434 2 3442 1 4324 1 4535 1 4546 1 5656 1 12313 1 12321 1
十一、Combine提高運作效率:
- 要求:使用Combine類統計一下單詞出現的次數
- 測試資料:
zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin
- 編寫代碼:
- mapper類
/** * @Author zhangyong * @Date 2020/4/15 7:30 * @Version 1.0 */ public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString (); String[] words = line.split (" "); for (String word : words) { context.write (new Text (word), new IntWritable (1)); } } }
- Combine類
/** * @Author zhangyong * @Date 2020/4/15 7:34 * @Version 1.0 */ public class WcCombine extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get (); } context.write (key,new IntWritable (count)); } }
- reducer類
/** * @Author zhangyong * @Date 2020/4/15 7:34 * @Version 1.0 */ public class WcReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count+=value.get (); System.err.println(key + ":" + value); } context.write (key,new IntWritable (count)); } }
- Driver類
/** * @Author zhangyong * @Date 2020/4/15 7:42 * @Version 1.0 */ public class WcDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (WcDriver.class); job.setMapperClass (WcMapper.class); job.setReducerClass (WcReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (IntWritable.class); /** * 設定combine元件類,如果不設定,預設是不執行combine過程的。 * 設定combine的目的是為了讓合并工作提前發生一次,在MapTask階段時合并一次,使Reduce階段的工作負載。 * 需要注意的是combine僅僅是做合并的工作,減少工作負載,并不能影響最終的檔案結果。 */ job.setCombinerClass(WcCombine.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class); FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce")); FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wccombine")); job.waitForCompletion (true); } }
- mapper類
- 運作結果:
zhangqin 20 zhangrui 20 zhangyong 20
- 測試資料:
十二、推薦認識好友:
- 要求:找出一下朋友的潛在朋友(一度二度朋友關系鍊)
- 測試資料:
tom rose tom jim tom smith tom lucy rose tom rose lucy rose smith jim tom jim lucy jim smith smith jim smith tom smith rose
- 編寫代碼:
- 第一個mapper類
/** * @Author 張勇 * @Site www.gz708090.com * @Version 1.0 * @Date 2020-04-17 12:08 */ public class OneFriendMapper extends Mapper<LongWritable, Text, Text, Text> { /** * 輸入的key和value是根據檔案内容來确定。 * 輸出的key和value是因為在業務邏輯中設定的輸出是name-friend好友關系。 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 擷取每行的資料 String line = value.toString(); // 擷取姓名 String name = line.split(" ")[0]; // 擷取好友 String friend = line.split(" ")[1]; context.write(new Text(name), new Text(friend)); } }
- 第一個reducer類
/** * @Author 張勇 * @Site www.gz708090.com * @Version 1.0 * @Date 2020-04-17 12:28 */ public class OneFriendReducer extends Reducer<Text, Text, Text, IntWritable> { /** * 輸入key和value要和mapper的輸出保持一緻。 * Text和IntWritable: * 如果是好友-1,如果不是好友就用-2。 */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { ArrayList<String> friendList = new ArrayList<>(); //處理好友關系 for (Text value : values) { friendList.add(value.toString()); if (key.toString().compareTo(value.toString()) < 0) { context.write(new Text(key + "-" + value), new IntWritable(1)); } else { context.write(new Text(value + "-" + key), new IntWritable(1)); } } // 處理可能相識的好友。 for (int i = 0; i < friendList.size(); i++) { for (int j = 0; j < friendList.size(); j++) { String friend1 = friendList.get(i); String friend2 = friendList.get(j); if (friend1.compareTo(friend2) < 0) { context.write(new Text(friend1 + "-" + friend2), new IntWritable(2)); } } } } }
- 第二個mapper類
/** * @Author 張勇 * @Site www.gz708090.com * @Version 1.0 * @Date 2020-04-17 12:32 */ public class TwoFriendMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 擷取一行資料 String line = value.toString(); // 擷取朋友關系的資訊 String friendInfo = line.split("\t")[0]; // 擷取朋友關系的深度 int deep = Integer.parseInt(line.split("\t")[1]); context.write(new Text(friendInfo), new IntWritable(deep)); } }
- 第二個reducer類
/** * @Author 張勇 * @Site www.gz708090.com * @Version 1.0 * @Date 2020-04-17 12:34 */ public class TwoFriendReducer extends Reducer<Text, IntWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { Boolean flag = true; /** * 設定好友關系為true的時候進行輸出 * 因為題目要求是輸出可能相識的好友。是以為true的代碼應該是2 * 也就是好友關系為1的時候設定變量為false */ for (IntWritable value : values) { if (value.get() == 1) { flag = false; } } if (flag) { context.write(key, NullWritable.get()); } } }
- Driver類
/** * @Author 張勇 * @Site www.gz708090.com * @Version 1.0 * @Date 2020-04-17 12:36 */ public class FriendDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //設定第一輪MapReduce的相應處理類與輸入輸出 Job job1 = Job.getInstance(conf); job1.setJarByClass(FriendDriver.class); job1.setMapperClass(OneFriendMapper.class); job1.setReducerClass(OneFriendReducer.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); //設定路徑(傳輸、結果) FileInputFormat.setInputPaths(job1, new Path("hdfs://anshun115:9000/friend")); FileOutputFormat.setOutputPath(job1, new Path("hdfs://anshun115:9000/result/friend")); //如果第一輪MapReduce完成再做這裡的代碼 if (job1.waitForCompletion(true)) { Job job2 = Job.getInstance(conf); // 設定第二個Job任務的Mapper job2.setMapperClass(TwoFriendMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(IntWritable.class); // 設定第二個Job任務的Reducer job2.setReducerClass(TwoFriendReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(NullWritable.class); /** * 設定第二個Job任務是輸入輸出路徑。 * 此處的輸入路徑是第一個job任務的輸出路徑 * 注意設定路徑時,裡面傳入的job應該是目前的job任務,如下所示,應該是job2。 * 如果寫成前面的job任務名稱,在運作時則會爆出錯誤,提示路徑不存在。 */ FileInputFormat.setInputPaths(job2, new Path("hdfs://anshun115:9000/result/friend")); FileOutputFormat.setOutputPath(job2, new Path("hdfs://anshun115:9000/result/friend2")); // 此處送出任務時,注意用的是job2。 job2.waitForCompletion(true); } } }
- 第一個mapper類
-
運作結果:
friend
friend2jim-smith 1 jim-lucy 1 jim-tom 1 smith-tom 2 lucy-smith 2 lucy-tom 2 rose-smith 1 lucy-rose 1 rose-tom 1 smith-tom 2 lucy-smith 2 lucy-tom 2 rose-smith 1 smith-tom 1 jim-smith 1 rose-tom 2 jim-rose 2 jim-tom 2 lucy-tom 1 smith-tom 1 jim-tom 1 rose-tom 1 lucy-smith 2 lucy-rose 2 jim-lucy 2 jim-smith 2 jim-rose 2 rose-smith 2
jim-rose lucy-smith