天天看點

Pig自定義過濾UDF和加載UDF

     Pig是一種資料流程式設計語言,由一系列操作和變換構成,每一個操作或者變換都對輸入進行處理,然後産生輸出結果,整體操作表示一個資料流。Pig的執行環境将資料流翻譯為可執行的内部表示,在Pig内部,這些變換操作被轉換為一系列的MapReduce作業。

      Pig自身有許多個方法,有時候需要我們自己定制特定的處理方法即UDF。

      UDF具體的步驟如下:

第一步,繼承計算類或者過濾類或者加載類或者存儲類,重寫裡面的需要實作的方法,将寫好的類進行打包生成jar檔案。諸如命名為example.jar

第二步,進入Pig的grunt中,利用register将打包的檔案注冊進入Pig中。進入Pig的grunt中,目前本地路徑就是使用者輸入Pig時候所在的路徑。打封包件一定要加上它所在的路徑。如register example.jar。

第三步,直接使用該自定義的UDF,在使用的過程中需要加上該類的權限定包名,如果這裡example.jar的包結構為com.whut.FilterFunct。則引用的時候就是com.whut.FilterFunct(參數)。注意類的名稱就是使用時候的方法名,必須要區分大小寫。

第四步,為自己的UDF定義别名,這樣使用的時候就不許要加包名了,如

define Goog com.whut.FilterFunct()。這樣使用的時候就直接利用Goog了。

自定義過濾UDF:

        過濾UDF需要繼承FilterFunc。實作其exec方法。該方法傳回的是boolean型。在對溫度統計的時候,就可以利用過濾UDF來過濾是否正确的氣溫。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

<code>package</code> <code>whut;</code>

<code>import</code> <code>java.io.IOException;</code>

<code>import</code> <code>java.util.ArrayList;</code>

<code>import</code> <code>java.util.List;</code>

<code>import</code> <code>org.apache.pig.FilterFunc;</code>

<code>import</code> <code>org.apache.pig.FuncSpec;</code>

<code>import</code> <code>org.apache.pig.backend.executionengine.ExecException;</code>

<code>import</code> <code>org.apache.pig.data.DataType;</code>

<code>import</code> <code>org.apache.pig.data.Tuple;</code>

<code>import</code> <code>org.apache.pig.impl.logicalLayer.FrontendException;</code>

<code>//删除記錄中不符合要求的記錄</code>

<code>//pig的自定義函數,過濾函數</code>

<code>public</code> <code>class</code> <code>IsGoodQuality </code><code>extends</code> <code>FilterFunc{</code>

<code>    </code><code>@Override</code>

<code>    </code><code>public</code> <code>Boolean exec(Tuple tuple) </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>// TODO Auto-generated method stub</code>

<code>        </code><code>if</code><code>(tuple ==</code><code>null</code> <code>||tuple.size()==</code><code>0</code><code>)</code>

<code>        </code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>try</code><code>{</code>

<code>            </code><code>Object obj=tuple.get(</code><code>0</code><code>);</code>

<code>            </code><code>if</code><code>(obj==</code><code>null</code><code>)</code>

<code>                </code><code>return</code> <code>false</code><code>;</code>

<code>            </code><code>//這裡強制轉換為一個整形</code>

<code>            </code><code>int</code> <code>i=(Integer)obj;</code>

<code>            </code><code>return</code> <code>i==</code><code>0</code> <code>||i==</code><code>1</code> <code>|| i==</code><code>2</code> <code>|| i==</code><code>3</code><code>;</code>

<code>        </code><code>}</code><code>catch</code><code>(ExecException e)</code>

<code>        </code><code>{</code>

<code>            </code><code>throw</code> <code>new</code> <code>IOException(e);</code>

<code>        </code><code>}</code>

<code>    </code><code>}</code>

<code>}</code>

        這裡的參數是一個元組,可以包含多個輸入參數,在方法中直接利用get(索引位置)來直接擷取。

自定義加載函數UDF

      在Pig中經常會使用到加載外部檔案,一般使用Load進行加載,如Load 'input/tempdata' as (a:chararray,b:int) 。這裡預設使用了内部加載存儲函數,PigStorage。

即Load 'input/tempdata' using PigStorage()  as (a:chararray,b:int)。這裡PigStorage預設的每一行的字段分割符是制表符,當然也可以傳遞一個自己的字段分割符号。有時候每一行是一串字元串,想從中取出某一個字段,則就需要自己定義一個加載函數。以下面這個檔案為例子。

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

<code>aaaaa1990aaaaaa0039a</code>

<code>bbbbb1991bbbbbb0045a</code>

<code>ccccc1992cccccc0011c</code>

<code>ddddd1993dddddd0043d</code>

<code>eeeee1994eeeeee0047e</code>

<code>aaaaa1990aaaaaa0037a</code>

<code>bbbbb1991bbbbbb0027a</code>

<code>ccccc1992cccccc0032c</code>

<code>ddddd1993dddddd0090d</code>

<code>eeeee1994eeeeee0091e</code>

<code>aaaaa1980aaaaaa0041a</code>

<code>bbbbb1981bbbbbb0050a</code>

<code>ccccc1992cccccc0020c</code>

<code>ddddd1993dddddd0033d</code>

<code>eeeee1984eeeeee0061e</code>

<code>aaaaa1980aaaaaa0054a</code>

<code>bbbbb1991bbbbbb0075a</code>

<code>ccccc1982cccccc0011c</code>

<code>ddddd1993dddddd0003d</code>

<code>eeeee1974eeeeee0041e</code>

<code>bbbbb1961bbbbbb0041a</code>

<code>ccccc1972cccccc0070c</code>

<code>ddddd1993dddddd0042d</code>

<code>eeeee1974eeeeee0043e</code>

<code>aaaaa1990aaaaaa0034a</code>

<code>bbbbb1971bbbbbb0025a</code>

<code>ccccc1992cccccc0056c</code>

<code>ddddd1993dddddd0037d</code>

<code>eeeee1984eeeeee0038e</code>

<code>aaaaa1990aaaaaa0049a</code>

<code>bbbbb1991bbbbbb0011a</code>

<code>ccccc1962cccccc0012c</code>

<code>ddddd1993dddddd0023d</code>

<code>eeeee1984eeeeee0031e</code>

<code>aaaaa1980aaaaaa0094a</code>

<code>bbbbb1971bbbbbb0045a</code>

<code>ccccc1992cccccc0041c</code>

<code>eeeee1984eeeeee0081e</code>

<code>aaaaa1960aaaaaa0099a</code>

<code>bbbbb1971bbbbbb0050a</code>

<code>ccccc1952cccccc0055c</code>

<code>ddddd1963dddddd0043d</code>

<code>eeeee1994eeeeee0041e</code>

<code>aaaaa1990aaaaaa0031a</code>

<code>bbbbb1991bbbbbb0020a</code>

<code>ccccc1952cccccc0030c</code>

<code>ddddd1983dddddd0013d</code>

<code>eeeee1974eeeeee0061e</code>

<code>aaaaa1980aaaaaa0071a</code>

<code>bbbbb1961bbbbbb0060a</code>

<code>ccccc1992cccccc0080c</code>

<code>ddddd1953dddddd0033d</code>

<code>eeeee1964eeeeee0051e</code>

<code>aaaaa1960aaaaaa0024a</code>

<code>bbbbb1951bbbbbb0035a</code>

<code>ccccc1952cccccc0048c</code>

<code>ddddd1953dddddd0053d</code>

<code>eeeee1954eeeeee0048e</code>

       為了從中取出年份和溫度,則就需要自己定義加載函數,這裡每一列序号以0開始。自定義加載函數需要繼承LoadFunc。具體的代碼如下。

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

<code>import</code> <code>org.apache.commons.logging.Log;</code>

<code>import</code> <code>org.apache.commons.logging.LogFactory;</code>

<code>import</code> <code>org.apache.hadoop.io.Text;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.InputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Job;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.RecordReader;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileInputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.TextInputFormat;</code>

<code>import</code> <code>org.apache.pig.LoadFunc;</code>

<code>import</code> <code>org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;</code>

<code>import</code> <code>org.apache.pig.data.DataByteArray;</code>

<code>import</code> <code>org.apache.pig.data.TupleFactory;</code>

<code>class</code> <code>Range</code>

<code>{</code>

<code>    </code><code>//列的索引以0開始</code>

<code>    </code><code>//字段分割的列的位置</code>

<code>    </code><code>private</code> <code>int</code> <code>start;</code>

<code>    </code><code>private</code> <code>int</code> <code>end;</code>

<code>    </code><code>//根據輸入來解析</code>

<code>    </code><code>//字元串格式必須是(2~3,5~6)</code>

<code>    </code><code>public</code> <code>static</code> <code>List&lt;Range&gt; parse(String cutStr)</code><code>throws</code> <code>Exception</code>

<code>    </code><code>{</code>

<code>        </code><code>List&lt;Range&gt; rangeList=</code><code>new</code> <code>ArrayList&lt;Range&gt;();</code>

<code>        </code><code>//首先要判斷是否格式正确</code>

<code>        </code><code>boolean</code> <code>state=cutStr.matches(</code><code>"\\d+~\\d+(,\\d+~\\d+)*"</code><code>);</code>

<code>        </code><code>if</code><code>(!state)</code>

<code>          </code><code>throw</code> <code>new</code> <code>Exception(</code><code>"InputForat Error:\n"</code> <code>+</code>

<code>            </code><code>"Usage:number~number,number~number;Such 2~7,10~19"</code><code>);</code>

<code>        </code><code>//先截取幾個字段的列起止位置如2~8 </code>

<code>        </code><code>String[] splits=cutStr.split(</code><code>","</code><code>);</code>

<code>        </code><code>//周遊長度設定Range</code>

<code>        </code><code>for</code><code>(</code><code>int</code> <code>i=</code><code>0</code><code>;i&lt;splits.length;i++)</code>

<code>            </code><code>Range range=</code><code>new</code> <code>Range();</code>

<code>            </code><code>String sub=splits[i];</code>

<code>            </code><code>String[] subSplits=sub.split(</code><code>"~"</code><code>);</code>

<code>            </code><code>int</code> <code>subStart=Integer.parseInt(subSplits[</code><code>0</code><code>]);</code>

<code>            </code><code>int</code> <code>subEnd=Integer.parseInt(subSplits[</code><code>1</code><code>]);</code>

<code>            </code><code>if</code><code>(subStart&gt;subEnd)</code>

<code>            </code><code>throw</code> <code>new</code> <code>Exception(</code><code>"InputForat Error:\n"</code> <code>+</code>

<code>                    </code><code>"Detail:first number must less than second number"</code><code>);</code>

<code>            </code><code>range.setStart(subStart);</code>

<code>            </code><code>range.setEnd(subEnd);</code>

<code>            </code><code>rangeList.add(range);</code>

<code>        </code><code>return</code> <code>rangeList;</code>

<code>    </code><code>public</code> <code>int</code> <code>getStart() {</code>

<code>        </code><code>return</code> <code>start;</code>

<code>    </code><code>public</code> <code>void</code> <code>setStart(</code><code>int</code> <code>start) {</code>

<code>        </code><code>this</code><code>.start = start;</code>

<code>    </code><code>public</code> <code>int</code> <code>getEnd() {</code>

<code>        </code><code>return</code> <code>end;</code>

<code>    </code><code>public</code> <code>void</code> <code>setEnd(</code><code>int</code> <code>end) {</code>

<code>        </code><code>this</code><code>.end = end;</code>

<code>              </code> 

<code>    </code><code>public</code> <code>String getSubString(String inStr)</code>

<code>        </code><code>String res=inStr.substring(start, end);</code>

<code>        </code><code>return</code> <code>res;</code>

<code>//定義加載函數,從每一行字元串提出年份,溫度</code>

<code>public</code> <code>class</code> <code>LineLoadFunc </code><code>extends</code> <code>LoadFunc{</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>Log LOG=LogFactory.getLog(LineLoadFunc.</code><code>class</code><code>);</code>

<code>    </code><code>//負責産生元組的各個字段</code>

<code>    </code><code>private</code> <code>final</code> <code>TupleFactory tupleFactory=TupleFactory.getInstance();</code>

<code>    </code><code>//負責讀取輸入記錄</code>

<code>    </code><code>private</code> <code>RecordReader reader;</code>

<code>    </code><code>//存每個字段的集合</code>

<code>    </code><code>private</code> <code>List&lt;Range&gt; ranges;</code>

<code>    </code><code>//傳遞參數設定列的位置分割</code>

<code>    </code><code>public</code> <code>LineLoadFunc(String cutPattern)</code><code>throws</code> <code>Exception</code>

<code>        </code><code>ranges=Range.parse(cutPattern);</code>

<code>    </code><code>//設定檔案的加載位置</code>

<code>    </code><code>public</code> <code>void</code> <code>setLocation(String location, Job job) </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>FileInputFormat.setInputPaths(job, location);</code>

<code>    </code><code>//設定加載檔案的輸入檔案格式</code>

<code>    </code><code>//為每一個分片建立一個RecordReader</code>

<code>    </code><code>public</code> <code>InputFormat getInputFormat() </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>return</code> <code>new</code> <code>TextInputFormat();</code>

<code>    </code><code>public</code> <code>void</code> <code>prepareToRead(RecordReader reader, PigSplit split)</code>

<code>            </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>this</code><code>.reader=reader;</code>

<code>    </code><code>public</code> <code>Tuple getNext() </code><code>throws</code> <code>IOException {</code>

<code>            </code><code>if</code><code>(!reader.nextKeyValue())</code>

<code>                </code><code>return</code> <code>null</code><code>;</code>

<code>            </code><code>//TextInputFormat</code>

<code>            </code><code>//key:LongWritable,value:Text</code>

<code>            </code><code>Text value=(Text)reader.getCurrentValue();</code>

<code>            </code><code>String line=value.toString();</code>

<code>            </code><code>//設定每一個元組有幾個字段</code>

<code>            </code><code>Tuple tuple=tupleFactory.newTuple(ranges.size());</code>

<code>            </code><code>for</code><code>(</code><code>int</code> <code>i=</code><code>0</code><code>;i&lt;ranges.size();i++)</code>

<code>            </code><code>{</code>

<code>                </code><code>Range range=ranges.get(i);</code>

<code>                </code><code>if</code><code>(range.getEnd()&gt;line.length())</code>

<code>                </code><code>{</code>

<code>                    </code><code>throw</code> <code>new</code> <code>ExecException(</code><code>"InputFormat:Error\n"</code> <code>+</code>

<code>                            </code><code>"field length more than total length"</code><code>);</code>

<code>                </code><code>}</code>

<code>                </code><code>//必須使用DataByteArray來構造字段的類型</code>

<code>                </code><code>tuple.set(i, </code><code>new</code> <code>DataByteArray(range.getSubString(line)));</code>

<code>            </code><code>}</code>

<code>            </code><code>return</code> <code>tuple;</code>

<code>        </code><code>}</code><code>catch</code><code>(InterruptedException e)</code>

<code>            </code><code>throw</code> <code>new</code> <code>ExecException();</code>

           具體使用的方法就是按照剛才所說的步驟進行的。

本文轉自 zhao_xiao_long 51CTO部落格,原文連結:http://blog.51cto.com/computerdragon/1288228