天天看點

MapReduce對交易日志進行排序的Demo(MR的二次排序)

1.日志源檔案 (各個列分别是: 賬戶,營業額,花費,日期)

[email protected]    6000    0    2014-02-20
[email protected]    2000    0    2014-02-20
[email protected]    0    100    2014-02-20
[email protected]    3000    0    2014-02-20
[email protected]    9000    0    2014-02-20
[email protected]    0    200    2014-02-20      

想要的結果: (計算出每個賬戶的總營業額和總花費,要求營業額排序降序,如果營業額相同則花費少的在上面)

[email protected]    9000    0    9000
[email protected]    9000    200    8800
[email protected]    2000    100    1900      

2.寫代碼:

InfoBean.java  對賬戶的後三個字段封裝成一個Bean對象

1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.WritableComparable;
 6 
 7 //要和其他的InfoBean類型進行比較,是以此處泛型T為InfoBean
 8 public class InfoBean implements WritableComparable<InfoBean> {
 9 
10     private String account;
11     private double income;
12     private double expenses;
13     private double surplus;
14 
15     /*
16      *如果不寫這個方法,封裝InfoBean對象的時候就要分别set這個對象的各個屬性.
17      */
18     public void set(String account,double income,double expenses){
19         this.account = account;
20         this.income = income;
21         this.expenses = expenses;
22         this.surplus = income -expenses;
23     }
24     @Override
25     public void write(DataOutput out) throws IOException {
26         out.writeUTF(account);
27         out.writeDouble(income);
28         out.writeDouble(expenses);
29         out.writeDouble(surplus);
30     }
31 
32     @Override
33     public void readFields(DataInput in) throws IOException {
34         this.account = in.readUTF();
35         this.income = in.readDouble();
36         this.expenses = in.readDouble();
37         this.surplus = in.readDouble();
38     }
39 
40     @Override
41     public int compareTo(InfoBean o) {
42         if(this.income == o.getIncome()){
43             return this.expenses > o.getExpenses() ? 1 : -1;
44         } else {
45             return this.income > o.getIncome() ? -1 : 1;
46         }
47     }
48     
49     @Override
50     //toString()方法輸出的格式最好和源檔案trade_info.txt中的格式一樣, 字段通過Tab鍵分隔.
51     //而且在SumReducer類輸出k3,v3的時候會輸出k3(context.write(key, v);) 是以這個地方沒有必要再輸出k3(account)  
52     public String toString() {
53 //        return "InfoBean [account=" + account + ", income=" + income
54 //                + ", expenses=" + expenses + ", surplus=" + surplus + "]";
55         return this.income + "\t" + this.expenses+"\t" + this.surplus;
56     }
57     public double getIncome() {
58         return income;
59     }
60 
61     public void setIncome(double income) {
62         this.income = income;
63     }
64 
65     public double getExpenses() {
66         return expenses;
67     }
68 
69     public void setExpenses(double expenses) {
70         this.expenses = expenses;
71     }
72 
73     public double getSurplus() {
74         return surplus;
75     }
76 
77     public void setSurplus(double surplus) {
78         this.surplus = surplus;
79     }
80 
81     public String getAccount() {
82         return account;
83     }
84 
85     public void setAccount(String account) {
86         this.account = account;
87     }
88 
89 }      

 SumStep.java

1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 
13 public class SumStep {
14 
15     public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
16         private Text k = new Text();
17         private InfoBean bean = new InfoBean();
18         
19         @Override
20         protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, InfoBean>.Context context)
21                 throws IOException, InterruptedException {
22             
23             String line = value.toString();
24             String [] fields = line.split("\t");
25             String account = fields[0];
26             double income = Double.parseDouble(fields[1]);
27             double expenses = Double.parseDouble(fields[2]);
28             k.set(account);
29             bean.set(account, income, expenses);
30             context.write(k, bean);
31         }
32     }
33     public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
34         private InfoBean v = new InfoBean();
35         @Override
36         protected void reduce(Text key, Iterable<InfoBean> values,Reducer<Text, InfoBean, Text, InfoBean>.Context context)
37                 throws IOException, InterruptedException {
38             double sum_in = 0;
39             double sum_out = 0;
40             for(InfoBean bean : values){
41                 sum_in += bean.getIncome();
42                 sum_out += bean.getExpenses();
43             }
44             /*
45              * 在crxy的流量統計的案例中   是如下的方式寫出k3和v3的 在reduce方法中new這個封裝好的對象.
46              * 但是如果資料量比較大的情況下 是可能會造成記憶體溢出的.
47              * TrafficWritable v3 = new TrafficWritable(t1, t2, t3, t4);
48              * context.write(k2, v3);
49              * 
50              * 是以建議把這個封裝的對象寫在"腦袋頂上" 如上所示....private InfoBean v = new InfoBean();
51              * 但是如果你Java基礎比較好的話可能會說 在Java中是引用傳遞...是以後面的v會覆寫前面的v,造成最後隻有最有一個v
52              * 其實這裡是不會産生問題的,因為context.write()方法會直接把v3對應的InfoBean對象序列化.
53              * 雖然之前對象的引用确實覆寫了,但是之前對象的值等都儲存了下來.是可以放在這個類的"腦袋頂上"的.
54              * 讓這個類公用這個InfoBean對象.
55              */
56             
57             v.set(key.toString(),sum_in,sum_out);
58             context.write(key, v);
59         }
60     }
61     public static void main(String[] args) throws Exception {
62         Configuration conf = new Configuration();
63         Job job = Job.getInstance(conf);
64         job.setJarByClass(SumStep.class);
65         
66         job.setMapperClass(SumMapper.class);
67         //以下兩行可以在滿足一定條件的時候省略掉.
68         //在滿足k2和k3,v2和v3一一對應的時候就可以省略掉. 看SumReducer類所在行的泛型.
69         job.setMapOutputKeyClass(Text.class);
70         job.setMapOutputValueClass(InfoBean.class);
71         
72         FileInputFormat.setInputPaths(job, new Path(args[0]));
73         
74         job.setReducerClass(SumReducer.class);
75         job.setOutputKeyClass(Text.class);
76         job.setOutputValueClass(InfoBean.class);
77         FileOutputFormat.setOutputPath(job, new Path(args[1]));
78         job.waitForCompletion(true);
79     }
80 }      

 項目打成jar包放到Linux中,日志源檔案上傳到HDFS上.運作結果如下:

hadoop jar /root/itcastmr.jar itcastmr.SumStep /user/root/trade_info.txt /tradeout      
MapReduce對交易日志進行排序的Demo(MR的二次排序)

但是這個結果并沒有排序.還是按照賬号的字典排序.

以這個MR的輸出當做輸入對其根據InfoBean對象進行排序.....

上代碼SortStep.java:

1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.NullWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 
14 public class SortStep {
15     //這個Mapper讀取的HDFS檔案是SumStep Reduce計算輸出的檔案.
16     public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
17         private InfoBean k = new InfoBean();
18         @Override
19         protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context)
20                 throws IOException, InterruptedException {
21             String line = value.toString();
22             String [] fields = line.split("\t");
23             String account = fields[0];
24             double income = Double.parseDouble(fields[1]);
25             double expenses = Double.parseDouble(fields[2]);
26             k.set(account, income, expenses);
27             //現在是要求按照InfoBean對象中的規則排序(InfoBean中有compareTo方法)...是以InfoBean對象當做k2...
28             context.write(k,NullWritable.get());//不能傳null,NullWritable.get() 是獲得的this對象.
29         }
30     }
31     public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
32         private Text k = new Text();
33         @Override
34         protected void reduce(InfoBean bean, Iterable<NullWritable> values,Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context)
35                 throws IOException, InterruptedException {
36             String account  = bean.getAccount();
37             k.set(account);
38             context.write(k, bean);
39         }
40     }
41     
42     public static void main(String[] args) throws Exception {
43         Configuration conf = new Configuration();
44         Job job = Job.getInstance(conf);
45         job.setJarByClass(SortStep.class);
46         
47         job.setMapperClass(SortMapper.class);
48         //以下兩行可以在滿足一定條件的時候省略掉.
49         //在滿足k2和k3,v2和v3一一對應的時候就可以省略掉. 看SumReducer類所在行的泛型.
50         job.setMapOutputKeyClass(InfoBean.class);
51         job.setMapOutputValueClass(NullWritable.class);
52         
53         FileInputFormat.setInputPaths(job, new Path(args[0]));
54         
55         job.setReducerClass(SortReducer.class);
56         job.setOutputKeyClass(Text.class);
57         job.setOutputValueClass(InfoBean.class);
58         FileOutputFormat.setOutputPath(job, new Path(args[1]));
59         job.waitForCompletion(true);
60     }
61 }      

打成jar包,然後運作指令....輸入為上面SumStep.java的輸出

hadoop jar /root/itcastmr.jar itcastmr.SortStep /tradeout /trade_sort_out      

排序之後的結果:

MapReduce對交易日志進行排序的Demo(MR的二次排序)

在MapReduce讀取輸入資料的時候,如果這個檔案是以下劃線開始的話,那麼會不會讀取這個檔案中的内容...."_SUCCESS"檔案就不會讀取....

如果想對某個類進行排序,

1.這個類要實作WritableComparable接口,

2.還要重寫compareTo方法. 根據自己的業務邏輯自定義排序.

隻需要把要排序的類當做k2 就可以了...架構自動排序.

要排序對象的compareTo方法是架構調用的,架構在shuffle這個階段會調用排序.

shuffle後面會講,shuffle由很多很多的階段組成,分區,排序,分組,combiner等等...把這些小的細節都講完了之後再講shuffle.