天天看点

MapReduce 中的两表 join 实例(二)

<code>package</code> <code>com.baidu.uilt;</code>

<code>import</code> <code>java.io.*;</code>

<code>import</code> <code>org.apache.hadoop.io.*;</code>

<code>public</code> <code>class</code> <code>TextPair </code><code>implements</code> <code>WritableComparable&lt;TextPair&gt; {</code>

<code>  </code><code>private</code> <code>Text first;</code>

<code>  </code><code>private</code> <code>Text second;</code>

<code>  </code> 

<code>  </code><code>public</code> <code>TextPair() {</code>

<code>    </code><code>set(</code><code>new</code> <code>Text(), </code><code>new</code> <code>Text());</code>

<code>  </code><code>}</code>

<code>  </code><code>public</code> <code>TextPair(String first, String second) {</code>

<code>    </code><code>set(</code><code>new</code> <code>Text(first), </code><code>new</code> <code>Text(second));</code>

<code>  </code><code>public</code> <code>TextPair(Text first, Text second) {</code>

<code>    </code><code>set(first, second);</code>

<code>  </code><code>public</code> <code>void</code> <code>set(Text first, Text second) {</code>

<code>    </code><code>this</code><code>.first = first;</code>

<code>    </code><code>this</code><code>.second = second;</code>

<code>  </code><code>public</code> <code>Text getFirst() {</code>

<code>    </code><code>return</code> <code>first;</code>

<code>  </code><code>public</code> <code>Text getSecond() {</code>

<code>    </code><code>return</code> <code>second;</code>

<code>  </code><code>@Override</code>

<code>  </code><code>public</code> <code>void</code> <code>write(DataOutput out) </code><code>throws</code> <code>IOException {</code>

<code>    </code><code>first.write(out);</code>

<code>    </code><code>second.write(out);</code>

<code>  </code><code>public</code> <code>void</code> <code>readFields(DataInput in) </code><code>throws</code> <code>IOException {</code>

<code>    </code><code>first.readFields(in);</code>

<code>    </code><code>second.readFields(in);</code>

<code>  </code><code>public</code> <code>int</code> <code>hashCode() {</code>

<code>    </code><code>return</code> <code>first.hashCode() * </code><code>163</code> <code>+ second.hashCode();</code>

<code>  </code><code>public</code> <code>boolean</code> <code>equals(Object o) {</code>

<code>    </code><code>if</code> <code>(o </code><code>instanceof</code> <code>TextPair) {</code>

<code>      </code><code>TextPair tp = (TextPair) o;</code>

<code>      </code><code>return</code> <code>first.equals(tp.first) &amp;&amp; second.equals(tp.second);</code>

<code>    </code><code>}</code>

<code>    </code><code>return</code> <code>false</code><code>;</code>

<code>  </code><code>public</code> <code>String toString() {</code>

<code>    </code><code>return</code> <code>first + </code><code>"\t"</code> <code>+ second;</code>

<code>  </code><code>public</code> <code>int</code> <code>compareTo(TextPair tp) {</code>

<code>    </code><code>int</code> <code>cmp = first.compareTo(tp.first);</code>

<code>    </code><code>if</code> <code>(cmp != </code><code>0</code><code>) {</code>

<code>      </code><code>return</code> <code>cmp;</code>

<code>    </code><code>return</code> <code>second.compareTo(tp.second);</code>

<code>  </code><code>public</code> <code>static</code> <code>class</code> <code>Comparator </code><code>extends</code> <code>WritableComparator {</code>

<code>    </code> 

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>Text.Comparator TEXT_COMPARATOR = </code><code>new</code> <code>Text.Comparator();</code>

<code>    </code><code>public</code> <code>Comparator() {</code>

<code>      </code><code>super</code><code>(TextPair.</code><code>class</code><code>);</code>

<code>    </code><code>@Override</code>

<code>    </code><code>public</code> <code>int</code> <code>compare(</code><code>byte</code><code>[] b1, </code><code>int</code> <code>s1, </code><code>int</code> <code>l1,</code>

<code>                       </code><code>byte</code><code>[] b2, </code><code>int</code> <code>s2, </code><code>int</code> <code>l2) {</code>

<code>      </code> 

<code>      </code><code>try</code> <code>{</code>

<code>        </code><code>int</code> <code>firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);</code>

<code>        </code><code>int</code> <code>firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);</code>

<code>        </code><code>int</code> <code>cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);</code>

<code>        </code><code>if</code> <code>(cmp != </code><code>0</code><code>) {</code>

<code>          </code><code>return</code> <code>cmp;</code>

<code>        </code><code>}</code>

<code>        </code><code>return</code> <code>TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,</code>

<code>                                       </code><code>b2, s2 + firstL2, l2 - firstL2);</code>

<code>      </code><code>} </code><code>catch</code> <code>(IOException e) {</code>

<code>        </code><code>throw</code> <code>new</code> <code>IllegalArgumentException(e);</code>

<code>      </code><code>}</code>

<code>  </code><code>static</code> <code>{</code>

<code>    </code><code>WritableComparator.define(TextPair.</code><code>class</code><code>, </code><code>new</code> <code>Comparator());</code>

<code>  </code><code>public</code> <code>static</code> <code>class</code> <code>FirstComparator </code><code>extends</code> <code>WritableComparator {</code>

<code>    </code><code>public</code> <code>FirstComparator() {</code>

<code>        </code><code>return</code> <code>TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);</code>

<code>    </code><code>public</code> <code>int</code> <code>compare(WritableComparable a, WritableComparable b) {</code>

<code>      </code><code>if</code> <code>(a </code><code>instanceof</code> <code>TextPair &amp;&amp; b </code><code>instanceof</code> <code>TextPair) {</code>

<code>        </code><code>return</code> <code>((TextPair) a).first.compareTo(((TextPair) b).first);</code>

<code>      </code><code>return</code> <code>super</code><code>.compare(a, b);</code>

<code>}</code>

<code>package</code> <code>com.baidu.loan;</code>

<code>/***</code>

<code> </code><code>* </code>

<code> </code><code>* /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar  com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6  /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928  /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928  /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928</code>

<code> </code><code>* **/</code>

<code>import</code> <code>java.io.IOException;</code>

<code>import</code> <code>java.util.Iterator;</code>

<code>import</code> <code>org.apache.hadoop.mapred.FileOutputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapred.JobClient;</code>

<code>import</code> <code>org.apache.hadoop.mapred.JobConf;</code>

<code>import</code> <code>org.apache.hadoop.mapred.MapReduceBase;</code>

<code>import</code> <code>org.apache.hadoop.mapred.Mapper;</code>

<code>import</code> <code>org.apache.hadoop.mapred.OutputCollector;</code>

<code>import</code> <code>org.apache.hadoop.mapred.Partitioner;</code>

<code>import</code> <code>org.apache.hadoop.mapred.Reducer;</code>

<code>import</code> <code>org.apache.hadoop.mapred.Reporter;</code>

<code>import</code> <code>org.apache.hadoop.mapred.TextInputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapred.lib.MultipleInputs;</code>

<code>import</code> <code>org.apache.hadoop.util.Tool;</code>

<code>import</code> <code>org.apache.hadoop.util.ToolRunner;</code>

<code>import</code> <code>org.apache.hadoop.conf.Configured;</code>

<code>import</code> <code>org.apache.hadoop.fs.Path;</code>

<code>import</code> <code>org.apache.hadoop.io.LongWritable;</code>

<code>import</code> <code>org.apache.hadoop.io.Text;</code>

<code>import</code> <code>com.baidu.uilt.TextPair;</code>

<code>public</code> <code>class</code> <code>LoanIdeainfoJoinIterialByDAILI6 </code><code>extends</code> <code>Configured </code><code>implements</code> <code>Tool {</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>JoinUnitMapper </code><code>extends</code> <code>MapReduceBase </code><code>implements</code>

<code>            </code><code>Mapper&lt;LongWritable, Text, TextPair, Text&gt; {</code>

<code>        </code><code>public</code> <code>void</code> <code>map(LongWritable key, Text value,</code>

<code>                </code><code>OutputCollector&lt;TextPair, Text&gt; output, Reporter reporter)</code>

<code>                </code><code>throws</code> <code>IOException {</code>

<code>            </code><code>String gbkStr = value.toString();</code>

<code>            </code><code>if</code> <code>(gbkStr.split(</code><code>"\t"</code><code>).length &lt; </code><code>2</code> <code>&amp;&amp; gbkStr.split(</code><code>","</code><code>).length == </code><code>4</code><code>) {</code>

<code>                </code><code>String[] strs = gbkStr.split(</code><code>","</code><code>);</code>

<code>                </code><code>output.collect(</code><code>new</code> <code>TextPair(strs[</code><code>0</code><code>], </code><code>"0"</code><code>), value);</code>

<code>            </code><code>}</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>JoinIterialMapper </code><code>extends</code> <code>MapReduceBase </code><code>implements</code>

<code>            </code><code>if</code> <code>(gbkStr.split(</code><code>"\t"</code><code>).length &gt; </code><code>4</code><code>) {</code><code>// LoanIterial</code>

<code>                </code><code>String[] strs = gbkStr.split(</code><code>"\t"</code><code>);</code>

<code>                </code><code>output.collect(</code><code>new</code> <code>TextPair(strs[</code><code>0</code><code>], </code><code>"1"</code><code>), value);</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>JoinReducer </code><code>extends</code> <code>MapReduceBase </code><code>implements</code>

<code>            </code><code>Reducer&lt;TextPair, Text, Text, Text&gt; {</code>

<code>        </code><code>public</code> <code>void</code> <code>reduce(TextPair key, Iterator&lt;Text&gt; values,</code>

<code>                </code><code>OutputCollector&lt;Text, Text&gt; output, Reporter reporter)</code>

<code>            </code><code>Text stationName = </code><code>new</code> <code>Text(values.next());</code>

<code>            </code><code>while</code> <code>(values.hasNext()) {</code>

<code>                </code><code>Text record = values.next();</code>

<code>                </code><code>Text outValue = </code><code>new</code> <code>Text(stationName.toString() + </code><code>"\t"</code>

<code>                        </code><code>+ record.toString());</code>

<code>                </code><code>output.collect(stationName, record);</code>

<code>                </code><code>//output.collect(key.getFirst(), outValue);</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>KeyPartitioner </code><code>implements</code> <code>Partitioner&lt;TextPair, Text&gt; {</code>

<code>        </code><code>@Override</code>

<code>        </code><code>public</code> <code>void</code> <code>configure(JobConf job) {}</code>

<code>        </code> 

<code>        </code><code>public</code> <code>int</code> <code>getPartition(TextPair key, Text value, </code><code>int</code> <code>numPartitions) {</code>

<code>          </code><code>return</code> <code>(key.getFirst().hashCode() &amp; Integer.MAX_VALUE) % numPartitions;</code>

<code>    </code><code>public</code> <code>int</code> <code>run(String[] args) </code><code>throws</code> <code>Exception {</code>

<code>        </code><code>if</code> <code>(args.length != </code><code>3</code><code>) {</code>

<code>              </code><code>return</code> <code>-</code><code>1</code><code>;</code>

<code>            </code> 

<code>            </code><code>JobConf conf = </code><code>new</code> <code>JobConf(getConf(), getClass());</code>

<code>            </code><code>conf.setJobName(</code><code>"Join record with station name"</code><code>);</code>

<code>            </code><code>String strPathUnit =args[</code><code>0</code><code>];</code>

<code>            </code><code>String strPathIterial =args[</code><code>1</code><code>];</code>

<code>            </code><code>Path outputPath= </code><code>new</code> <code>Path(args[</code><code>2</code><code>]);</code>

<code>            </code><code>MultipleInputs.addInputPath(conf, </code><code>new</code> <code>Path(strPathUnit),</code>

<code>                </code><code>TextInputFormat.</code><code>class</code><code>, JoinUnitMapper.</code><code>class</code><code>);</code>

<code>            </code><code>MultipleInputs.addInputPath(conf, </code><code>new</code> <code>Path(strPathIterial),</code>

<code>                </code><code>TextInputFormat.</code><code>class</code><code>, JoinIterialMapper.</code><code>class</code><code>);</code>

<code>            </code><code>FileOutputFormat.setOutputPath(conf, outputPath);</code>

<code>            </code><code>conf.setPartitionerClass(KeyPartitioner.</code><code>class</code><code>);</code>

<code>            </code><code>conf.setOutputValueGroupingComparator(TextPair.FirstComparator.</code><code>class</code><code>);</code>

<code>            </code><code>conf.setMapOutputKeyClass(TextPair.</code><code>class</code><code>);</code>

<code>            </code><code>conf.setReducerClass(JoinReducer.</code><code>class</code><code>);</code>

<code>            </code><code>conf.setOutputKeyClass(Text.</code><code>class</code><code>);</code>

<code>            </code><code>JobClient.runJob(conf);</code>

<code>            </code><code>return</code> <code>0</code><code>;</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>Exception {</code>

<code>        </code><code>int</code> <code>exitCode = ToolRunner.run(</code><code>new</code> <code>LoanIdeainfoJoinIterialByDAILI6(), args);</code>

<code>        </code><code>System.exit(exitCode);</code>

需要注意的是上面的代码只是针对两表的一对一(多)关系,不满足多对多关系。如果需要满足多对多关系则需要加上一下判断即可。

本文转自 梦朝思夕 51CTO博客,原文链接:http://blog.51cto.com/qiangmzsx/1560553