天天看點

MapReduce的一對多連接配接操作

問題描述:

一個trade table表

product1"trade1

product2"trade2

product3"trade3

一個pay table表

product1"pay1

product2"pay2

product2"pay3

product1"pay4

product3"pay5

product3"pay6

建立兩個表之間的連接配接,該兩表是一對多關系的

如下:

trade1pay1

trade1pay4

trade2pay2

...

思路:

       為了将兩個表整合到一起,由于有相同的第一列,且第一個表與第二個表是一對多關系的。

這裡依然采用分組,以及組内排序,隻要保證一方最先到達reduce端,則就可以進行疊代處理了。

為了保證第一個表先到達reduce端,可以為定義一個組合鍵,包含兩個值,第一個值為product,第二個值為0或者1,來分别代表第一個表和第二個表,隻要按照組内升序排列即可。

具體代碼:

自定義組合鍵政策

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

<code>package</code> <code>whut.onetomany;</code>

<code>import</code> <code>java.io.DataInput;</code>

<code>import</code> <code>java.io.DataOutput;</code>

<code>import</code> <code>java.io.IOException;</code>

<code>import</code> <code>org.apache.hadoop.io.WritableComparable;</code>

<code>public</code> <code>class</code> <code>TextIntPair </code><code>implements</code> <code>WritableComparable{</code>

<code>    </code><code>//product1 0/1</code>

<code>    </code><code>private</code> <code>String firstKey;</code><code>//product1</code>

<code>    </code><code>private</code> <code>int</code> <code>secondKey;</code><code>//0,1;0代表是trade表,1代表是pay表</code>

<code>    </code><code>//隻需要保證trade表在pay表前面就行,則隻需要對組順序排列</code>

<code>                                                          </code> 

<code>    </code><code>public</code> <code>String getFirstKey() {</code>

<code>        </code><code>return</code> <code>firstKey;</code>

<code>    </code><code>}</code>

<code>    </code><code>public</code> <code>void</code> <code>setFirstKey(String firstKey) {</code>

<code>        </code><code>this</code><code>.firstKey = firstKey;</code>

<code>    </code><code>public</code> <code>int</code> <code>getSecondKey() {</code>

<code>        </code><code>return</code> <code>secondKey;</code>

<code>    </code><code>public</code> <code>void</code> <code>setSecondKey(</code><code>int</code> <code>secondKey) {</code>

<code>        </code><code>this</code><code>.secondKey = secondKey;</code>

<code>    </code><code>@Override</code>

<code>    </code><code>public</code> <code>void</code> <code>write(DataOutput out) </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>out.writeUTF(firstKey);</code>

<code>        </code><code>out.writeInt(secondKey);</code>

<code>    </code><code>public</code> <code>void</code> <code>readFields(DataInput in) </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>// TODO Auto-generated method stub</code>

<code>        </code><code>firstKey=in.readUTF();</code>

<code>        </code><code>secondKey=in.readInt();</code>

<code>    </code><code>public</code> <code>int</code> <code>compareTo(Object o) {</code>

<code>        </code><code>TextIntPair tip=(TextIntPair)o;</code>

<code>        </code><code>return</code> <code>this</code><code>.getFirstKey().compareTo(tip.getFirstKey());</code>

<code>}</code>

分組政策

<code>import</code> <code>org.apache.hadoop.io.WritableComparator;</code>

<code>public</code> <code>class</code> <code>TextComparator </code><code>extends</code> <code>WritableComparator{</code>

<code>    </code><code>protected</code> <code>TextComparator() {</code>

<code>        </code><code>super</code><code>(TextIntPair.</code><code>class</code><code>,</code><code>true</code><code>);</code><code>//注冊比較器</code>

<code>    </code><code>public</code> <code>int</code> <code>compare(WritableComparable a, WritableComparable b) {</code>

<code>        </code><code>TextIntPair tip1=(TextIntPair)a;</code>

<code>        </code><code>TextIntPair tip2=(TextIntPair)b;</code>

<code>        </code><code>return</code> <code>tip1.getFirstKey().compareTo(tip2.getFirstKey());</code>

組内排序政策:目的是保證第一個表比第二個表先到達

<code>//分組内部進行排序,按照第二個字段進行排序</code>

<code>public</code> <code>class</code> <code>TextIntComparator </code><code>extends</code> <code>WritableComparator {</code>

<code>    </code><code>public</code> <code>TextIntComparator()</code>

<code>    </code><code>{</code>

<code>        </code><code>super</code><code>(TextIntPair.</code><code>class</code><code>,</code><code>true</code><code>);</code>

<code>    </code><code>//這裡可以進行排序的方式管理</code>

<code>    </code><code>//必須保證是同一個分組的</code>

<code>    </code><code>//a與b進行比較</code>

<code>    </code><code>//如果a在前b在後,則會産生升序</code>

<code>    </code><code>//如果a在後b在前,則會産生降序</code>

<code>        </code><code>TextIntPair ti1=(TextIntPair)a;</code>

<code>        </code><code>TextIntPair ti2=(TextIntPair)b;</code>

<code>        </code><code>//首先要保證是同一個組内,同一個組的辨別就是第一個字段相同</code>

<code>        </code><code>if</code><code>(!ti1.getFirstKey().equals(ti2.getFirstKey()))</code>

<code>           </code><code>return</code> <code>ti1.getFirstKey().compareTo(ti2.getFirstKey());</code>

<code>        </code><code>else</code>

<code>           </code><code>return</code> <code>ti1.getSecondKey()-ti2.getSecondKey();</code><code>//0,-1,1</code>

<code>                                     </code> 

分區政策:

<code>import</code> <code>org.apache.hadoop.io.Text;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Partitioner;</code>

<code>public</code> <code>class</code> <code>PartitionByText </code><code>extends</code> <code>Partitioner&lt;TextIntPair, Text&gt; {</code>

<code>    </code><code>public</code> <code>int</code> <code>getPartition(TextIntPair key, Text value, </code><code>int</code> <code>numPartitions) {</code>

<code>        </code><code>return</code> <code>(key.getFirstKey().hashCode()&amp;Integer.MAX_VALUE)%numPartitions;</code>

MapReduce

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

<code>import</code> <code>java.util.Iterator;</code>

<code>import</code> <code>org.apache.hadoop.conf.Configuration;</code>

<code>import</code> <code>org.apache.hadoop.conf.Configured;</code>

<code>import</code> <code>org.apache.hadoop.fs.Path;</code>

<code>import</code> <code>org.apache.hadoop.io.LongWritable;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Job;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper.Context;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Reducer;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileInputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileSplit;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.MultipleInputs;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;</code>

<code>import</code> <code>org.apache.hadoop.util.GenericOptionsParser;</code>

<code>import</code> <code>org.apache.hadoop.util.Tool;</code>

<code>import</code> <code>org.apache.hadoop.util.ToolRunner;</code>

<code>public</code> <code>class</code> <code>JoinMain </code><code>extends</code> <code>Configured </code><code>implements</code> <code>Tool {</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>JoinMapper </code><code>extends</code> <code>Mapper&lt;LongWritable, Text, TextIntPair, Text&gt;</code>

<code>        </code><code>private</code> <code>TextIntPair tp=</code><code>new</code> <code>TextIntPair();</code>

<code>        </code><code>private</code> <code>Text val=</code><code>new</code> <code>Text();</code>

<code>        </code><code>@Override</code>

<code>        </code><code>protected</code> <code>void</code> <code>map(LongWritable key, Text value, Context context)</code>

<code>                </code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>            </code><code>// TODO Auto-generated method stub</code>

<code>            </code><code>//擷取要處理的檔案的名稱</code>

<code>            </code><code>FileSplit file=(FileSplit)context.getInputSplit();</code>

<code>            </code><code>String fileName=file.getPath().toString();</code>

<code>            </code><code>//擷取輸入行分隔</code>

<code>            </code><code>String line=value.toString();</code>

<code>            </code><code>String[] lineKeyValue=line.split(</code><code>"\""</code><code>);</code>

<code>            </code><code>String lineKey=lineKeyValue[</code><code>0</code><code>];</code>

<code>            </code><code>String lineValue=lineKeyValue[</code><code>1</code><code>];</code>

<code>            </code><code>tp.setFirstKey(lineKey);</code>

<code>            </code><code>//判斷是否是trade檔案</code>

<code>            </code><code>if</code><code>(fileName.indexOf(</code><code>"trade"</code><code>)&gt;=</code><code>0</code><code>)</code>

<code>            </code><code>{</code>

<code>                </code><code>tp.setSecondKey(</code><code>0</code><code>);</code>

<code>                </code><code>val.set(lineValue);</code>

<code>            </code><code>}</code>

<code>            </code><code>//判斷是否是pay檔案</code>

<code>            </code><code>else</code> <code>if</code><code>(fileName.indexOf(</code><code>"pay"</code><code>)&gt;=</code><code>0</code><code>)</code>

<code>                </code><code>tp.setSecondKey(</code><code>1</code><code>);</code>

<code>            </code><code>context.write(tp, val);</code>

<code>        </code><code>}</code>

<code>                      </code> 

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>JoinReducer </code><code>extends</code> <code>Reducer&lt;TextIntPair, Text, Text, Text&gt;</code>

<code>        </code><code>protected</code> <code>void</code> <code>reduce(TextIntPair key, Iterable&lt;Text&gt; values,</code>

<code>                </code><code>Context context)</code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>            </code><code>Iterator&lt;Text&gt; valList=values.iterator();</code>

<code>            </code><code>//注意這裡一定要寫成string不可變,寫成Text有問題</code>

<code>            </code><code>//Text trade=valList.next();</code>

<code>            </code><code>String tradeName=valList.next().toString();</code>

<code>            </code><code>while</code><code>(valList.hasNext())</code>

<code>                </code><code>Text pay=valList.next();</code>

<code>                </code><code>context.write(</code><code>new</code> <code>Text(tradeName), pay);</code>

<code>    </code><code>public</code> <code>int</code> <code>run(String[] args) </code><code>throws</code> <code>Exception</code>

<code>        </code><code>Configuration conf=getConf();</code>

<code>        </code><code>Job job=</code><code>new</code> <code>Job(conf,</code><code>"JoinJob"</code><code>);</code>

<code>        </code><code>job.setJarByClass(JoinMain.</code><code>class</code><code>);</code>

<code>        </code><code>//ToolRunner已經利用GenericOptionsParser解析了指令行中的參數</code>

<code>        </code><code>//并且将其存放在數組中,傳遞給該run()方法了</code>

<code>        </code><code>FileInputFormat.addInputPath(job, </code><code>new</code> <code>Path(args[</code><code>0</code><code>]));</code>

<code>        </code><code>FileInputFormat.addInputPath(job, </code><code>new</code> <code>Path(args[</code><code>1</code><code>]));</code>

<code>        </code><code>//輸入檔案必須以,隔開</code>

<code>        </code><code>//FileInputFormat.addInputPaths(job, args[0]);</code>

<code>        </code><code>FileOutputFormat.setOutputPath(job, </code><code>new</code> <code>Path(args[</code><code>2</code><code>]));</code>

<code>                          </code> 

<code>        </code><code>job.setMapperClass(JoinMapper.</code><code>class</code><code>);</code>

<code>        </code><code>job.setReducerClass(JoinReducer.</code><code>class</code><code>);</code>

<code>        </code><code>//設定分區方法</code>

<code>        </code><code>job.setPartitionerClass(PartitionByText.</code><code>class</code><code>);</code>

<code>        </code><code>//設定分組排序</code>

<code>        </code><code>job.setGroupingComparatorClass(TextComparator.</code><code>class</code><code>);</code>

<code>        </code><code>job.setSortComparatorClass(TextIntComparator.</code><code>class</code><code>);</code>

<code>        </code><code>job.setMapOutputKeyClass(TextIntPair.</code><code>class</code><code>);</code>

<code>        </code><code>job.setMapOutputValueClass(Text.</code><code>class</code><code>);</code>

<code>        </code><code>job.setOutputKeyClass(Text.</code><code>class</code><code>);</code>

<code>        </code><code>job.setOutputValueClass(Text.</code><code>class</code><code>);</code>

<code>        </code><code>job.waitForCompletion(</code><code>true</code><code>);</code>

<code>        </code><code>int</code> <code>exitCode=job.isSuccessful()?</code><code>0</code><code>:</code><code>1</code><code>;</code>

<code>        </code><code>return</code> <code>exitCode;</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args)</code><code>throws</code> <code>Exception</code>

<code>        </code><code>int</code> <code>code=ToolRunner.run(</code><code>new</code> <code>JoinMain(), args);</code>

<code>        </code><code>System.exit(code);</code>

注意:

     一般有些地方沒有定義組内排序政策,但是經過多次測試,發現無法保證第一個表在第二個表之前到達,則這裡就自定義了組内排序政策。版本号為Hadoop1.1.2

本文轉自 zhao_xiao_long 51CTO部落格,原文連結:http://blog.51cto.com/computerdragon/1287744