天天看点

[Hive]Hive自定义函数UDF

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/53244868

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数

用户自定义函数(user defined function),针对单条记录。 

编写一个UDF,需要继承UDF类,并实现evaluate()函数。在查询执行过程中,查询中对应的每个应用到这个函数的地方都会对这个类进行实例化。对于每行输入都会调用到evaluate()函数。而evaluate()函数处理的值会返回给Hive。同时用户是可以重载evaluate方法的。Hive会像Java的方法重载一样,自动选择匹配的方法。

1. 简单UDF

1.1 自定义Java类

下面自定义一个Java类OperationAddUDF,实现了Int,Double,Float以及String类型的加法操作。

  1. package com.sjf.open.hive.udf;

  2. import org.apache.hadoop.hive.ql.exec.UDF;

  3. import org.apache.hadoop.hive.serde2.ByteStream;

  4. import org.apache.hadoop.hive.serde2.io.DoubleWritable;

  5. import org.apache.hadoop.hive.serde2.lazy.LazyInteger;

  6. import org.apache.hadoop.io.FloatWritable;

  7. import org.apache.hadoop.io.IntWritable;

  8. import org.apache.hadoop.io.Text;

  9. /**

  10. * Created by xiaosi on 16-11-19.

  11. */

  12. public class OperationAddUDF extends UDF {

  13.    private final ByteStream.Output out = new ByteStream.Output();

  14.    /**

  15.     * IntWritable

  16.     * @param num1

  17.     * @param num2

  18.     * @return

  19.     */

  20.    public IntWritable evaluate(IntWritable num1, IntWritable num2){

  21.        if(num1 == null || num2 == null){

  22.            return null;

  23.        }

  24.        return new IntWritable(num1.get() + num2.get());

  25.    }

  26.    /**

  27.     * DoubleWritable

  28.     * @param num1

  29.     * @param num2

  30.     * @return

  31.     */

  32.    public DoubleWritable evaluate(DoubleWritable num1, DoubleWritable num2){

  33.        if(num1 == null || num2 == null){

  34.            return null;

  35.        }

  36.        return new DoubleWritable(num1.get() + num2.get());

  37.    }

  38.    /**

  39.     * FloatWritable

  40.     * @param num1

  41.     * @param num2

  42.     * @return

  43.     */

  44.    public FloatWritable evaluate(FloatWritable num1, FloatWritable num2){

  45.        if(num1 == null || num2 == null){

  46.            return null;

  47.        }

  48.        return new FloatWritable(num1.get() + num2.get());

  49.    }

  50.    /**

  51.     * Text

  52.     * @param num1

  53.     * @param num2

  54.     * @return

  55.     */

  56.    public Text evaluate(Text num1, Text num2){

  57.        if(num1 == null || num2 == null){

  58.            return null;

  59.        }

  60.        try{

  61.            Integer n1 = Integer.valueOf(num1.toString());

  62.            Integer n2 = Integer.valueOf(num2.toString());

  63.            Integer result = n1 + n2;

  64.            out.reset();

  65.            LazyInteger.writeUTF8NoException(out, result);

  66.            Text text = new Text();

  67.            text.set(out.getData(), 0, out.getLength());

  68.            return text;

  69.        }

  70.        catch (Exception e){

  71.            return null;

  72.        }

  73.    }

  74. }

UDF中evaluate()函数的参数和返回值类型只能是Hive可以序列化的数据类型。例如,如果用户处理的全是数值,那么UDF的输出参数类型可以是基本数据类型int,Integer封装的对象或者是一个IntWritable对象,也就是Hadoop对整型封装后的对象。用户不需要特别的关心将调用到哪个类型,因为当类型不一致的时候,Hive会自动将数据类型转换成匹配的类型。null值在Hive中对于任何数据类型都是合法的,但是对于Java基本数据类型,不能是对象,也不能是null。

1.2 Hive中使用

如果想在Hive中使用UDF,那么需要将Java代码进行编译,然后将编译后的UDF二进制类文件打包成一个Jar文件。然后,在Hive会话中,将这个Jar文件加入到类路径下,在通过CREATE FUNCTION 语句定义好使用这个Java类的函数:

1.2.1 添加Jar文件到类路径下

  1. hive (test)> add jar /home/xiaosi/open-hive-1.0-SNAPSHOT.jar;

  2. Added [/home/xiaosi/open-hive-1.0-SNAPSHOT.jar] to class path

  3. Added resources: [/home/xiaosi/open-hive-1.0-SNAPSHOT.jar]

需要注意的是,Jar文件路径是不需要用引号括起来的,同时,到目前为止这个路径需要是当前文件系统的全路径。Hive不仅仅将这个Jar文件加入到classpath下,同时还将其加入到分布式缓存中,这样整个集群的机器都是可以获得该Jar文件的。

1.2.2 创建函数add

  1. hive (test)> create temporary function add as 'com.sjf.open.hive.udf.OperationAddUDF';

  2. OK

  3. Time taken: 0.004 seconds

注意的是create temporary function语句中的temporary关键字,当前会话中声明的函数只会在当前会话中有效。因此用户需要在每个会话中都增加Jar文件然后创建函数。不过如果用户需要频繁的使用同一个Jar文件和函数的话,那么可以将相关语句增加到$HOME/.hiverc文件中去。

1.2.3 使用

现在这个数值相加函数可以像其他的函数一样使用了。

  1. hive (test)> select add(12, 34) from employee_part;

  2. OK

  3. 46

  4. Time taken: 0.078 seconds, Fetched: 1 row(s)

  5. hive (test)> select add(12.3, 20.1) from employee_part;

  6. OK

  7. 32.400000000000006

  8. Time taken: 0.098 seconds, Fetched: 1 row(s)

  9. hive (test)> select add("12", "45") from employee_part;

  10. OK

  11. 57

  12. Time taken: 0.077 seconds, Fetched: 1 row(s)

1.2.4 删除UDF

当我们使用完自定义UDF后,我们可以通过如下命令删除此函数:

  1. hive (test)> drop temporary function if exists add;

2. 复杂UDF

2.1 GenericUDF

和UDF相比,GenericUDF(org.apache.hadoop.hive.ql.udf.generic.GenericUDF)支持复杂类型(比如List,struct,map等)的输入和输出。GenericUDF可以让我们通过ObjectInspector来管理方法的参数,检查接收参数的类型和数量。

GenericUDF要求实现一下三个方法:

  1. // this is like the evaluate method of the simple API. It takes the actual arguments and returns the result

  2. abstract Object evaluate(GenericUDF.DeferredObject[] arguments);

  3. // Doesn't really matter, we can return anything, but should be a string representation of the function.

  4. abstract String getDisplayString(String[] children);

  5. // called once, before any evaluate() calls. You receive an array of object inspectors that represent the arguments of the function

  6. // this is where you validate that the function is receiving the correct argument types, and the correct number of arguments.

  7. abstract ObjectInspector initialize(ObjectInspector[] arguments);

2.2 Example

我们想要在Hive实现一个strContain方法,需要两个参数,一个是包含字符串的列表(list<String>),另一个是待寻找的字符串(String)。如果列表中包含我们提供的字符串,返回tue,否则返回false。功能如下所示:

  1. strContain(List("a", "b", "c"), "b"); // true

  2. strContain(List("a", "b", "c"), "d"); // false

2.3 代码

  1. package com.sjf.open.hive.udf;

  2. import org.apache.hadoop.hive.ql.exec.Description;

  3. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

  4. import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

  5. import org.apache.hadoop.hive.ql.metadata.HiveException;

  6. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

  7. import org.apache.hadoop.hive.serde2.lazy.LazyString;

  8. import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;

  9. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

  10. import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;

  11. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

  12. import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

  13. import org.apache.hadoop.io.BooleanWritable;

  14. import com.google.common.base.Objects;

  15. import org.slf4j.Logger;

  16. import org.slf4j.LoggerFactory;

  17. import java.util.List;

  18. /**

  19. * Created by xiaosi on 16-11-21.

  20. *

  21. */

  22. @Description(name = "contain", value = "_FUNC_(List<T>, T) ")

  23. public class GenericUDFStrContain extends GenericUDF {

  24. private static final Logger logger = LoggerFactory.getLogger(GenericUDFStrContain.class);

  25. private ListObjectInspector listObjectInspector;

  26. private StringObjectInspector stringObjectInspector;

  27. @Override

  28. public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

  29. logger.info("--------- GenericUDFStrContain --- initialize");

  30. // 参数个数校验

  31. if (arguments.length != 2) {

  32. throw new UDFArgumentLengthException(

  33. "The function 'Contain' only accepts 2 argument : List<T> and T , but got " + arguments.length);

  34. }

  35. ObjectInspector argumentOne = arguments[0];

  36. ObjectInspector argumentTwo = arguments[1];

  37. // 参数类型校验

  38. if (!(argumentOne instanceof ListObjectInspector)) {

  39. throw new UDFArgumentException("The first argument of function must be a list / array");

  40. }

  41. if (!(argumentTwo instanceof StringObjectInspector)) {

  42. throw new UDFArgumentException("The second argument of function must be a string");

  43. }

  44. this.listObjectInspector = (ListObjectInspector) argumentOne;

  45. this.stringObjectInspector = (StringObjectInspector) argumentTwo;

  46. // 链表元素类型检查

  47. if (!(listObjectInspector.getListElementObjectInspector() instanceof StringObjectInspector)) {

  48. throw new UDFArgumentException("The first argument must be a list of strings");

  49. }

  50. // 返回值类型

  51. return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;

  52. }

  53. @Override

  54. public Object evaluate(DeferredObject[] arguments) throws HiveException {

  55. logger.info("--------- GenericUDFStrContain --- evaluate");

  56. // 利用ObjectInspector从DeferredObject[]中获取元素值

  57. List<LazyString> list = (List<LazyString>) this.listObjectInspector.getList(arguments[0].get());

  58. String str = this.stringObjectInspector.getPrimitiveJavaObject(arguments[1].get());

  59. if (Objects.equal(list, null) || Objects.equal(str, null)) {

  60. return null;

  61. }

  62. // 判断是否包含查询元素

  63. for (LazyString lazyString : list) {

  64. String s = lazyString.toString();

  65. if (Objects.equal(str, s)) {

  66. return new Boolean(true);

  67. }

  68. }

  69. return new Boolean(false);

  70. }

  71. @Override

  72. public String getDisplayString(String[] children) {

  73. return "arrayContainsExample() strContain(List<T>, T)";

  74. }

  75. }

2.4 测试

Java测试:

  1. package com.sjf.open.hive.udf;

  2. import org.apache.hadoop.hive.ql.metadata.HiveException;

  3. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

  4. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

  5. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

  6. import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;

  7. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

  8. import java.util.ArrayList;

  9. import java.util.List;

  10. /**

  11. * Created by xiaosi on 16-11-22.

  12. */

  13. public class GenericUDFStrContainTest {

  14. public static void test() throws HiveException {

  15. GenericUDFStrContain genericUDFStrContain = new GenericUDFStrContain();

  16. ObjectInspector stringOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;

  17. ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(stringOI);

  18. BooleanObjectInspector resultInspector = (BooleanObjectInspector) genericUDFStrContain.initialize(new ObjectInspector[]{listOI, stringOI});

  19. // create the actual UDF arguments

  20. List<String> list = new ArrayList<String>();

  21. list.add("a");

  22. list.add("b");

  23. list.add("c");

  24. // test our results

  25. // the value exists

  26. Object result = genericUDFStrContain.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject(list), new GenericUDF.DeferredJavaObject("a")});

  27. System.out.println("-----------" + result);

  28. // the value doesn't exist

  29. Object result2 = genericUDFStrContain.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject(list), new GenericUDF.DeferredJavaObject("d")});

  30. System.out.println("-----------" + result2);

  31. // arguments are null

  32. Object result3 = genericUDFStrContain.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject(null), new GenericUDF.DeferredJavaObject(null)});

  33. System.out.println("-----------" + result3);

  34. }

  35. public static void main(String[] args) throws HiveException {

  36. test();

  37. }

  38. }

Hive测试:

在Hive中使用跟简单UDF一样,需要将Java代码进行编译,然后将编译后的UDF二进制类文件打包成一个Jar文件。然后,在Hive会话中,将这个Jar文件加入到类路径下,在通过CREATE FUNCTION 语句定义好使用这个Java类的函数:

  1. hive (test)> add jar /home/xiaosi/code/openDiary/HiveCode/target/open-hive-1.0-SNAPSHOT.jar;

  2. Added [/home/xiaosi/code/openDiary/HiveCode/target/open-hive-1.0-SNAPSHOT.jar] to class path

  3. Added resources: [/home/xiaosi/code/openDiary/HiveCode/target/open-hive-1.0-SNAPSHOT.jar]

  4. hive (test)> create temporary function strContain as 'com.sjf.open.hive.udf.GenericUDFStrContain';

  5. OK

  6. Time taken: 0.021 seconds

使用:

  1. hive (test)> select subordinates, strContain(subordinates, "tom") from employee2;

  2. OK

  3. ["lily","lucy","tom"] true

  4. ["lucy2","tom2"] false

  5. ["lily","lucy","tom"] true

  6. ["lily","yoona","lucy"] false

  7. Time taken: 1.147 seconds, Fetched: 4 row(s)

  8. hive (test)> select subordinates, strContain(subordinates, 1) from employee2;

  9. FAILED: SemanticException [Error 10014]: Line 1:21 Wrong arguments '1': The second argument of function must be a string

  10. hive (test)> select subordinates, strContain("yoona", 1) from employee2;

  11. FAILED: SemanticException [Error 10014]: Line 1:21 Wrong arguments '1': The first argument of function must be a list / array

  12. hive (test)> select subordinates, strContain("yoona", 1, 3) from employee2;

  13. FAILED: SemanticException [Error 10015]: Line 1:21 Arguments length mismatch '3': The function 'Contain' only accepts 2 argument : List<T> and T , but got 3

备注:

subordinates是一个array<string>类型集合。

资料:

http://blog.matthewrathbone.com/2013/08/10/guide-to-writing-hive-udfs.html#the-complex-api