天天看點

MapReduce的自制Writable分組輸出及組内排序

問題描述:

輸入檔案格式如下:

name1    2

name3    4

name1    6

name1    1

name3    3

name1    0

要求輸出的檔案格式如下:

name1    0,1,2,6

name3    3,4

要求是按照第一列分組,name1與name3也是按照順序排列的,組内升序排序。

思路:

正常的輸出,無法排序key所對應的多個值的順序。為了排序組内中的值,需要将key與value放在同一個組。Job中有兩個方法setGroupingComparatorClass和setSortComparatorClass,可以利用這兩個方法來實作組内排序。但是這些排序都是基于key的,則就要将key和value定義成組合鍵。

但是必須要保證第一列相同的全部都放在同一個分區中,則就需要自定義分區,分區的時候隻考慮第一列的值。由于partitioner僅僅能保證每一個reducer接受同一個name的所有記錄,但是reducer仍然是通過鍵進行分組的分區,也就說該分區中還是按照鍵來分成不同的組,還需要分組隻參考name值

先按照name分組,再在name中内部進行排序。

解決方法:

運用自定義組合鍵的政策,将name和1定義為一個組合鍵。在分區的時候隻參考name的值,即繼承partitioner。

 由于要按照name分組,則就需要定義分組政策,然後設定setGroupingComparatorClass。

setGroupingComparatorClass主要定義哪些key可以放置在一組,分組的時候會對組合鍵進行比較,由于這裡隻需要考慮組合鍵中的一個值,則定義實作一個WritableComparator,設定比較政策。

對于組内的排序,可以利用setSortComparatorClass來實作,

這個方法主要用于定義key如何進行排序在它們傳遞給reducer之前,

這裡就可以來進行組内排序。

具體代碼:

     Hadoop版本号:hadoop1.1.2

自定義組合鍵

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

<code>package</code> <code>whut;</code>

<code>import</code> <code>java.io.DataInput;</code>

<code>import</code> <code>java.io.DataOutput;</code>

<code>import</code> <code>java.io.IOException;</code>

<code>import</code> <code>org.apache.hadoop.io.IntWritable;</code>

<code>import</code> <code>org.apache.hadoop.io.Text;</code>

<code>import</code> <code>org.apache.hadoop.io.WritableComparable;</code>

<code>//自定義組合鍵政策</code>

<code>//java基本類型資料</code>

<code>public</code> <code>class</code> <code>TextInt </code><code>implements</code> <code>WritableComparable{</code>

<code>    </code><code>//直接利用java的基本資料類型</code>

<code>    </code><code>private</code> <code>String firstKey;</code>

<code>    </code><code>private</code> <code>int</code> <code>secondKey;</code>

<code>    </code><code>//必須要有一個預設的構造函數</code>

<code>    </code><code>public</code> <code>String getFirstKey() {</code>

<code>        </code><code>return</code> <code>firstKey;</code>

<code>    </code><code>}</code>

<code>    </code><code>public</code> <code>void</code> <code>setFirstKey(String firstKey) {</code>

<code>        </code><code>this</code><code>.firstKey = firstKey;</code>

<code>    </code><code>public</code> <code>int</code> <code>getSecondKey() {</code>

<code>        </code><code>return</code> <code>secondKey;</code>

<code>    </code><code>public</code> <code>void</code> <code>setSecondKey(</code><code>int</code> <code>secondKey) {</code>

<code>        </code><code>this</code><code>.secondKey = secondKey;</code>

<code>                                                                                                                                                                         </code> 

<code>    </code><code>@Override</code>

<code>    </code><code>public</code> <code>void</code> <code>write(DataOutput out) </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>// TODO Auto-generated method stub</code>

<code>        </code><code>out.writeUTF(firstKey);</code>

<code>        </code><code>out.writeInt(secondKey);</code>

<code>    </code><code>public</code> <code>void</code> <code>readFields(DataInput in) </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>firstKey=in.readUTF();</code>

<code>        </code><code>secondKey=in.readInt();</code>

<code>    </code><code>//map的鍵的比較就是根據這個方法來進行的</code>

<code>    </code><code>public</code> <code>int</code> <code>compareTo(Object o) {</code>

<code>        </code><code>TextInt ti=(TextInt)o;</code>

<code>        </code><code>//利用這個來控制升序或降序</code>

<code>        </code><code>//this本對象寫在前面代表是升序</code>

<code>        </code><code>//this本對象寫在後面代表是降序</code>

<code>        </code><code>return</code> <code>this</code><code>.getFirstKey().compareTo(ti.getFirstKey());</code>

<code>}</code>

分組政策

<code>import</code> <code>org.apache.hadoop.io.WritableComparator;</code>

<code>//主要就是對于分組進行排序,分組隻按照組建鍵中的一個值進行分組</code>

<code>public</code> <code>class</code> <code>TextComparator </code><code>extends</code> <code>WritableComparator {</code>

<code>    </code><code>//必須要調用父類的構造器</code>

<code>    </code><code>protected</code> <code>TextComparator() {</code>

<code>        </code><code>super</code><code>(TextInt.</code><code>class</code><code>,</code><code>true</code><code>);</code><code>//注冊comparator</code>

<code>    </code><code>public</code> <code>int</code> <code>compare(WritableComparable a, WritableComparable b) {</code>

<code>        </code><code>TextInt ti1=(TextInt)a;</code>

<code>        </code><code>TextInt ti2=(TextInt)b;</code>

<code>        </code><code>return</code> <code>ti1.getFirstKey().compareTo(ti2.getFirstKey());</code>

組内排序政策

<code>//分組内部進行排序,按照第二個字段進行排序</code>

<code>public</code> <code>class</code> <code>TextIntComparator </code><code>extends</code> <code>WritableComparator {</code>

<code>    </code><code>public</code> <code>TextIntComparator()</code>

<code>    </code><code>{</code>

<code>        </code><code>super</code><code>(TextInt.</code><code>class</code><code>,</code><code>true</code><code>);</code>

<code>    </code><code>//這裡可以進行排序的方式管理</code>

<code>    </code><code>//必須保證是同一個分組的</code>

<code>    </code><code>//a與b進行比較</code>

<code>    </code><code>//如果a在前b在後,則會産生升序</code>

<code>    </code><code>//如果a在後b在前,則會産生降序</code>

<code>        </code><code>//首先要保證是同一個組内,同一個組的辨別就是第一個字段相同</code>

<code>        </code><code>if</code><code>(!ti1.getFirstKey().equals(ti2.getFirstKey()))</code>

<code>           </code><code>return</code> <code>ti1.getFirstKey().compareTo(ti2.getFirstKey());</code>

<code>        </code><code>else</code>

<code>           </code><code>return</code> <code>ti2.getSecondKey()-ti1.getSecondKey();</code><code>//0,-1,1</code>

<code>                                                                                                                                                         </code> 

分區政策

<code>import</code> <code>org.apache.hadoop.mapreduce.Partitioner;</code>

<code>//參數為map的輸出類型</code>

<code>public</code> <code>class</code> <code>KeyPartitioner </code><code>extends</code> <code>Partitioner&lt;TextInt, IntWritable&gt; {</code>

<code>    </code><code>public</code> <code>int</code> <code>getPartition(TextInt key, IntWritable value, </code><code>int</code> <code>numPartitions) {</code>

<code>        </code><code>return</code> <code>(key.getFirstKey().hashCode()&amp;Integer.MAX_VALUE)%numPartitions;</code>

MapReduce政策

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

<code>import</code> <code>org.apache.hadoop.conf.Configuration;</code>

<code>import</code> <code>org.apache.hadoop.conf.Configured;</code>

<code>import</code> <code>org.apache.hadoop.fs.Path;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Job;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Reducer;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper.Context;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileInputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;</code>

<code>import</code> <code>org.apache.hadoop.util.Tool;</code>

<code>import</code> <code>org.apache.hadoop.util.ToolRunner;</code>

<code>//需要對資料進行分組以及組内排序的時候</code>

<code>public</code> <code>class</code> <code>SortMain </code><code>extends</code> <code>Configured </code><code>implements</code> <code>Tool{</code>

<code>    </code><code>//這裡設定輸入文格式為KeyValueTextInputFormat</code>

<code>    </code><code>//name1 5</code>

<code>    </code><code>//預設輸入格式都是Text,Text</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>GroupMapper </code><code>extends</code>

<code>       </code><code>Mapper&lt;Text, Text, TextInt, IntWritable&gt;  {</code>

<code>        </code><code>public</code> <code>IntWritable second=</code><code>new</code> <code>IntWritable();</code>

<code>        </code><code>public</code> <code>TextInt tx=</code><code>new</code> <code>TextInt();</code>

<code>        </code><code>@Override</code>

<code>        </code><code>protected</code> <code>void</code> <code>map(Text key, Text value, Context context)</code>

<code>                </code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>            </code><code>String lineKey=key.toString();</code>

<code>            </code><code>String lineValue=value.toString();</code>

<code>            </code><code>int</code> <code>lineInt=Integer.parseInt(lineValue);</code>

<code>            </code><code>tx.setFirstKey(lineKey);</code>

<code>            </code><code>tx.setSecondKey(lineInt);</code>

<code>            </code><code>second.set(lineInt);</code>

<code>            </code><code>context.write(tx, second);</code>

<code>        </code><code>}</code>

<code>    </code><code>//設定reduce</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>GroupReduce </code><code>extends</code> <code>Reducer&lt;TextInt, IntWritable, Text, Text&gt;</code>

<code>        </code><code>protected</code> <code>void</code> <code>reduce(TextInt key, Iterable&lt;IntWritable&gt; values,</code>

<code>               </code><code>Context context)</code>

<code>            </code><code>StringBuffer sb=</code><code>new</code> <code>StringBuffer();</code>

<code>            </code><code>for</code><code>(IntWritable val:values)</code>

<code>            </code><code>{</code>

<code>                </code><code>sb.append(val+</code><code>","</code><code>);</code>

<code>            </code><code>}</code>

<code>            </code><code>if</code><code>(sb.length()&gt;</code><code>0</code><code>)</code>

<code>                </code><code>sb.deleteCharAt(sb.length()-</code><code>1</code><code>);</code>

<code>            </code><code>context.write(</code><code>new</code> <code>Text(key.getFirstKey()), </code><code>new</code> <code>Text(sb.toString()));</code>

<code>                                                                                                                                       </code> 

<code>    </code><code>public</code> <code>int</code> <code>run(String[] args) </code><code>throws</code> <code>Exception {</code>

<code>        </code><code>Configuration conf=getConf();</code>

<code>        </code><code>Job job=</code><code>new</code> <code>Job(conf,</code><code>"SecondarySort"</code><code>);</code>

<code>        </code><code>job.setJarByClass(SortMain.</code><code>class</code><code>);</code>

<code>        </code><code>// 設定輸入檔案的路徑,已經上傳在HDFS</code>

<code>        </code><code>FileInputFormat.addInputPath(job, </code><code>new</code> <code>Path(args[</code><code>0</code><code>]));</code>

<code>        </code><code>// 設定輸出檔案的路徑,輸出檔案也存在HDFS中,但是輸出目錄不能已經存在</code>

<code>        </code><code>FileOutputFormat.setOutputPath(job, </code><code>new</code> <code>Path(args[</code><code>1</code><code>]));</code>

<code>                                                                                                                                           </code> 

<code>        </code><code>job.setMapperClass(GroupMapper.</code><code>class</code><code>);</code>

<code>        </code><code>job.setReducerClass(GroupReduce.</code><code>class</code><code>);</code>

<code>        </code><code>//設定分區方法</code>

<code>        </code><code>job.setPartitionerClass(KeyPartitioner.</code><code>class</code><code>);</code>

<code>        </code><code>//下面這兩個都是針對map端的</code>

<code>        </code><code>//設定分組的政策,哪些key可以放置到一組中</code>

<code>        </code><code>job.setGroupingComparatorClass(TextComparator.</code><code>class</code><code>);</code>

<code>        </code><code>//設定key如何進行排序在傳遞給reducer之前.</code>

<code>        </code><code>//這裡就可以設定對組内如何排序的方法</code>

<code>        </code><code>/*************關鍵點**********/</code>

<code>        </code><code>job.setSortComparatorClass(TextIntComparator.</code><code>class</code><code>);</code>

<code>        </code><code>//設定輸入檔案格式</code>

<code>        </code><code>job.setInputFormatClass(KeyValueTextInputFormat.</code><code>class</code><code>);</code>

<code>        </code><code>//使用預設的輸出格式即TextInputFormat</code>

<code>        </code><code>//設定map的輸出key和value類型</code>

<code>        </code><code>job.setMapOutputKeyClass(TextInt.</code><code>class</code><code>);</code>

<code>        </code><code>job.setMapOutputValueClass(IntWritable.</code><code>class</code><code>);</code>

<code>        </code><code>//設定reduce的輸出key和value類型</code>

<code>        </code><code>//job.setOutputFormatClass(TextOutputFormat.class);</code>

<code>        </code><code>job.setOutputKeyClass(Text.</code><code>class</code><code>);</code>

<code>        </code><code>job.setOutputValueClass(Text.</code><code>class</code><code>);</code>

<code>        </code><code>job.waitForCompletion(</code><code>true</code><code>);</code>

<code>        </code><code>int</code> <code>exitCode=job.isSuccessful()?</code><code>0</code><code>:</code><code>1</code><code>;</code>

<code>        </code><code>return</code> <code>exitCode;</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args)  </code><code>throws</code> <code>Exception</code>

<code>       </code><code>int</code> <code>exitCode=ToolRunner.run(</code><code>new</code> <code>SortMain(), args);</code>

<code>       </code><code>System.exit(exitCode);</code>

注意事項

   1,設定分組排序按照升序還是降序是在自定義WritableComparable中的compareTo()方法實作的,具體升序或者降序的設定在代碼中已經注釋說明

   2,設定組内值進行升序還是降序的排序是在組内排序政策中的compare()方法注釋說明的。

   3,這裡同時最重要的一點是,将第二列即放在組合鍵中,又作為value,這樣對于組合鍵排序也就相當于對于value進行排序了。

   4,在自定義組合鍵的時候,對于組合鍵中的資料的基本類型可以采用Java的基本類型也可以采用Hadoop的基本資料類型,對于Hadoop的基本資料類型一定要記得初始化new一個基本資料類型對象。對于組合鍵類,必須要有預設的構造方法。

本文轉自 zhao_xiao_long 51CTO部落格,原文連結:http://blog.51cto.com/computerdragon/1287721