問題描述:
一個trade table表
product1"trade1
product2"trade2
product3"trade3
一個pay table表
product1"pay1
product2"pay2
product2"pay3
product1"pay4
product3"pay5
product3"pay6
建立兩個表之間的連接配接,該兩表是一對多關系的
如下:
trade1pay1
trade1pay4
trade2pay2
...
思路:
為了将兩個表整合到一起,由于有相同的第一列,且第一個表與第二個表是一對多關系的。
這裡依然采用分組,以及組内排序,隻要保證一方最先到達reduce端,則就可以進行疊代處理了。
為了保證第一個表先到達reduce端,可以為定義一個組合鍵,包含兩個值,第一個值為product,第二個值為0或者1,來分别代表第一個表和第二個表,隻要按照組内升序排列即可。
具體代碼:
自定義組合鍵政策
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<code>package</code> <code>whut.onetomany;</code>
<code>import</code> <code>java.io.DataInput;</code>
<code>import</code> <code>java.io.DataOutput;</code>
<code>import</code> <code>java.io.IOException;</code>
<code>import</code> <code>org.apache.hadoop.io.WritableComparable;</code>
<code>public</code> <code>class</code> <code>TextIntPair </code><code>implements</code> <code>WritableComparable{</code>
<code> </code><code>//product1 0/1</code>
<code> </code><code>private</code> <code>String firstKey;</code><code>//product1</code>
<code> </code><code>private</code> <code>int</code> <code>secondKey;</code><code>//0,1;0代表是trade表,1代表是pay表</code>
<code> </code><code>//隻需要保證trade表在pay表前面就行,則隻需要對組順序排列</code>
<code> </code>
<code> </code><code>public</code> <code>String getFirstKey() {</code>
<code> </code><code>return</code> <code>firstKey;</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>void</code> <code>setFirstKey(String firstKey) {</code>
<code> </code><code>this</code><code>.firstKey = firstKey;</code>
<code> </code><code>public</code> <code>int</code> <code>getSecondKey() {</code>
<code> </code><code>return</code> <code>secondKey;</code>
<code> </code><code>public</code> <code>void</code> <code>setSecondKey(</code><code>int</code> <code>secondKey) {</code>
<code> </code><code>this</code><code>.secondKey = secondKey;</code>
<code> </code><code>@Override</code>
<code> </code><code>public</code> <code>void</code> <code>write(DataOutput out) </code><code>throws</code> <code>IOException {</code>
<code> </code><code>out.writeUTF(firstKey);</code>
<code> </code><code>out.writeInt(secondKey);</code>
<code> </code><code>public</code> <code>void</code> <code>readFields(DataInput in) </code><code>throws</code> <code>IOException {</code>
<code> </code><code>// TODO Auto-generated method stub</code>
<code> </code><code>firstKey=in.readUTF();</code>
<code> </code><code>secondKey=in.readInt();</code>
<code> </code><code>public</code> <code>int</code> <code>compareTo(Object o) {</code>
<code> </code><code>TextIntPair tip=(TextIntPair)o;</code>
<code> </code><code>return</code> <code>this</code><code>.getFirstKey().compareTo(tip.getFirstKey());</code>
<code>}</code>
分組政策
<code>import</code> <code>org.apache.hadoop.io.WritableComparator;</code>
<code>public</code> <code>class</code> <code>TextComparator </code><code>extends</code> <code>WritableComparator{</code>
<code> </code><code>protected</code> <code>TextComparator() {</code>
<code> </code><code>super</code><code>(TextIntPair.</code><code>class</code><code>,</code><code>true</code><code>);</code><code>//注冊比較器</code>
<code> </code><code>public</code> <code>int</code> <code>compare(WritableComparable a, WritableComparable b) {</code>
<code> </code><code>TextIntPair tip1=(TextIntPair)a;</code>
<code> </code><code>TextIntPair tip2=(TextIntPair)b;</code>
<code> </code><code>return</code> <code>tip1.getFirstKey().compareTo(tip2.getFirstKey());</code>
組内排序政策:目的是保證第一個表比第二個表先到達
<code>//分組内部進行排序,按照第二個字段進行排序</code>
<code>public</code> <code>class</code> <code>TextIntComparator </code><code>extends</code> <code>WritableComparator {</code>
<code> </code><code>public</code> <code>TextIntComparator()</code>
<code> </code><code>{</code>
<code> </code><code>super</code><code>(TextIntPair.</code><code>class</code><code>,</code><code>true</code><code>);</code>
<code> </code><code>//這裡可以進行排序的方式管理</code>
<code> </code><code>//必須保證是同一個分組的</code>
<code> </code><code>//a與b進行比較</code>
<code> </code><code>//如果a在前b在後,則會産生升序</code>
<code> </code><code>//如果a在後b在前,則會産生降序</code>
<code> </code><code>TextIntPair ti1=(TextIntPair)a;</code>
<code> </code><code>TextIntPair ti2=(TextIntPair)b;</code>
<code> </code><code>//首先要保證是同一個組内,同一個組的辨別就是第一個字段相同</code>
<code> </code><code>if</code><code>(!ti1.getFirstKey().equals(ti2.getFirstKey()))</code>
<code> </code><code>return</code> <code>ti1.getFirstKey().compareTo(ti2.getFirstKey());</code>
<code> </code><code>else</code>
<code> </code><code>return</code> <code>ti1.getSecondKey()-ti2.getSecondKey();</code><code>//0,-1,1</code>
<code> </code>
分區政策:
<code>import</code> <code>org.apache.hadoop.io.Text;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Partitioner;</code>
<code>public</code> <code>class</code> <code>PartitionByText </code><code>extends</code> <code>Partitioner<TextIntPair, Text> {</code>
<code> </code><code>public</code> <code>int</code> <code>getPartition(TextIntPair key, Text value, </code><code>int</code> <code>numPartitions) {</code>
<code> </code><code>return</code> <code>(key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;</code>
MapReduce
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
<code>import</code> <code>java.util.Iterator;</code>
<code>import</code> <code>org.apache.hadoop.conf.Configuration;</code>
<code>import</code> <code>org.apache.hadoop.conf.Configured;</code>
<code>import</code> <code>org.apache.hadoop.fs.Path;</code>
<code>import</code> <code>org.apache.hadoop.io.LongWritable;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Job;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper.Context;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Reducer;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileInputFormat;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileSplit;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.MultipleInputs;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;</code>
<code>import</code> <code>org.apache.hadoop.util.GenericOptionsParser;</code>
<code>import</code> <code>org.apache.hadoop.util.Tool;</code>
<code>import</code> <code>org.apache.hadoop.util.ToolRunner;</code>
<code>public</code> <code>class</code> <code>JoinMain </code><code>extends</code> <code>Configured </code><code>implements</code> <code>Tool {</code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>JoinMapper </code><code>extends</code> <code>Mapper<LongWritable, Text, TextIntPair, Text></code>
<code> </code><code>private</code> <code>TextIntPair tp=</code><code>new</code> <code>TextIntPair();</code>
<code> </code><code>private</code> <code>Text val=</code><code>new</code> <code>Text();</code>
<code> </code><code>@Override</code>
<code> </code><code>protected</code> <code>void</code> <code>map(LongWritable key, Text value, Context context)</code>
<code> </code><code>throws</code> <code>IOException, InterruptedException {</code>
<code> </code><code>// TODO Auto-generated method stub</code>
<code> </code><code>//擷取要處理的檔案的名稱</code>
<code> </code><code>FileSplit file=(FileSplit)context.getInputSplit();</code>
<code> </code><code>String fileName=file.getPath().toString();</code>
<code> </code><code>//擷取輸入行分隔</code>
<code> </code><code>String line=value.toString();</code>
<code> </code><code>String[] lineKeyValue=line.split(</code><code>"\""</code><code>);</code>
<code> </code><code>String lineKey=lineKeyValue[</code><code>0</code><code>];</code>
<code> </code><code>String lineValue=lineKeyValue[</code><code>1</code><code>];</code>
<code> </code><code>tp.setFirstKey(lineKey);</code>
<code> </code><code>//判斷是否是trade檔案</code>
<code> </code><code>if</code><code>(fileName.indexOf(</code><code>"trade"</code><code>)>=</code><code>0</code><code>)</code>
<code> </code><code>{</code>
<code> </code><code>tp.setSecondKey(</code><code>0</code><code>);</code>
<code> </code><code>val.set(lineValue);</code>
<code> </code><code>}</code>
<code> </code><code>//判斷是否是pay檔案</code>
<code> </code><code>else</code> <code>if</code><code>(fileName.indexOf(</code><code>"pay"</code><code>)>=</code><code>0</code><code>)</code>
<code> </code><code>tp.setSecondKey(</code><code>1</code><code>);</code>
<code> </code><code>context.write(tp, val);</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>JoinReducer </code><code>extends</code> <code>Reducer<TextIntPair, Text, Text, Text></code>
<code> </code><code>protected</code> <code>void</code> <code>reduce(TextIntPair key, Iterable<Text> values,</code>
<code> </code><code>Context context)</code><code>throws</code> <code>IOException, InterruptedException {</code>
<code> </code><code>Iterator<Text> valList=values.iterator();</code>
<code> </code><code>//注意這裡一定要寫成string不可變,寫成Text有問題</code>
<code> </code><code>//Text trade=valList.next();</code>
<code> </code><code>String tradeName=valList.next().toString();</code>
<code> </code><code>while</code><code>(valList.hasNext())</code>
<code> </code><code>Text pay=valList.next();</code>
<code> </code><code>context.write(</code><code>new</code> <code>Text(tradeName), pay);</code>
<code> </code><code>public</code> <code>int</code> <code>run(String[] args) </code><code>throws</code> <code>Exception</code>
<code> </code><code>Configuration conf=getConf();</code>
<code> </code><code>Job job=</code><code>new</code> <code>Job(conf,</code><code>"JoinJob"</code><code>);</code>
<code> </code><code>job.setJarByClass(JoinMain.</code><code>class</code><code>);</code>
<code> </code><code>//ToolRunner已經利用GenericOptionsParser解析了指令行中的參數</code>
<code> </code><code>//并且将其存放在數組中,傳遞給該run()方法了</code>
<code> </code><code>FileInputFormat.addInputPath(job, </code><code>new</code> <code>Path(args[</code><code>0</code><code>]));</code>
<code> </code><code>FileInputFormat.addInputPath(job, </code><code>new</code> <code>Path(args[</code><code>1</code><code>]));</code>
<code> </code><code>//輸入檔案必須以,隔開</code>
<code> </code><code>//FileInputFormat.addInputPaths(job, args[0]);</code>
<code> </code><code>FileOutputFormat.setOutputPath(job, </code><code>new</code> <code>Path(args[</code><code>2</code><code>]));</code>
<code> </code>
<code> </code><code>job.setMapperClass(JoinMapper.</code><code>class</code><code>);</code>
<code> </code><code>job.setReducerClass(JoinReducer.</code><code>class</code><code>);</code>
<code> </code><code>//設定分區方法</code>
<code> </code><code>job.setPartitionerClass(PartitionByText.</code><code>class</code><code>);</code>
<code> </code><code>//設定分組排序</code>
<code> </code><code>job.setGroupingComparatorClass(TextComparator.</code><code>class</code><code>);</code>
<code> </code><code>job.setSortComparatorClass(TextIntComparator.</code><code>class</code><code>);</code>
<code> </code><code>job.setMapOutputKeyClass(TextIntPair.</code><code>class</code><code>);</code>
<code> </code><code>job.setMapOutputValueClass(Text.</code><code>class</code><code>);</code>
<code> </code><code>job.setOutputKeyClass(Text.</code><code>class</code><code>);</code>
<code> </code><code>job.setOutputValueClass(Text.</code><code>class</code><code>);</code>
<code> </code><code>job.waitForCompletion(</code><code>true</code><code>);</code>
<code> </code><code>int</code> <code>exitCode=job.isSuccessful()?</code><code>0</code><code>:</code><code>1</code><code>;</code>
<code> </code><code>return</code> <code>exitCode;</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args)</code><code>throws</code> <code>Exception</code>
<code> </code><code>int</code> <code>code=ToolRunner.run(</code><code>new</code> <code>JoinMain(), args);</code>
<code> </code><code>System.exit(code);</code>
注意:
一般有些地方沒有定義組内排序政策,但是經過多次測試,發現無法保證第一個表在第二個表之前到達,則這裡就自定義了組内排序政策。版本号為Hadoop1.1.2
本文轉自 zhao_xiao_long 51CTO部落格,原文連結:http://blog.51cto.com/computerdragon/1287744