天天看點

Hive UDF函數與UDTF函數

目錄

    • UDF(User-Defined-Function)
    • UDAF(User-Defined-Aggregation Function)
    • UDTF(User-Defined Table-Generating Functions)

UDF(User-Defined-Function)

一進一出

開發示例:

1.建立maven項目, 引入依賴

<dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.3.6</version>
        </dependency>
    </dependencies>
           

2.編寫自定義類,繼承GenericUDF類,并重寫initialize和evaluate方法。

package hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * 自定義UDF函數
 *
 * 需要繼承GenericUDF類,重寫抽象方法。
 *
 * 函數的調用: select my_len('abcd');
 * UDF
 */
public class StringLengthUDF extends GenericUDF {
    /**
     * 初始化方法 判斷傳入到參數的個數,類型等,約定函數的傳回值類型
     * @param objectInspectors 傳入到函數的參數類型
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        //簡單判斷
        //判斷參數的個數
        if(objectInspectors.length != 1){
            throw new UDFArgumentLengthException("Input Args Length Error!!!");
        }
        //判斷類型
        if(!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
            throw new UDFArgumentTypeException(0,"Input Args Type Error!!!");
        }
        //約定函數的傳回值類型
        return PrimitiveObjectInspectorFactory.javaIntObjectInspector; //int
    }

    /**
     * 函數的邏輯處理
     * @param deferredObjects 傳入到函數的參數值
     * @return
     * @throws HiveException
     */
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        //擷取參數
        Object o = deferredObjects[0].get();
        return o.toString().length();
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "";
    }
}

           

3.maven 打包,上傳到伺服器 /opt/module/hive/datas目錄

Hive UDF函數與UDTF函數

重命名jar包,為了後續友善書寫

Hive UDF函數與UDTF函數

4.将自定義的UDF函數jar包,在hive指令視窗,添加到hive 的class path中:

hive (default)> add jar /opt/module/hive/datas/myudf.jar ;
Added [/opt/module/hive/datas/myudf.jar] to class path
Added resources: [/opt/module/hive/datas/myudf.jar]

           

5.注冊函數名稱,綁定自定義的UDF類(hive.udf.StringLengthUDF)

temporary 可選,表示本次連接配接後失效。

6.使用

hive (default)> select my_len('adsdfsdf');
OK
_c0
8
Time taken: 0.022 seconds, Fetched: 1 row(s)
hive (default)> select my_len(3123123);
OK
_c0
7
Time taken: 0.033 seconds, Fetched: 1 row(s)

--異常報錯:
FAILED: ParseException line 1:34 mismatched input '<EOF>' expecting ) near ')' in function specification
hive (default)> select my_len(split('a,b,v,d',','));
FAILED: SemanticException [Error 10016]: Line 1:14 Argument type mismatch '','': Input Args Type Error!!!
hive (default)> select my_len(3123123,3123);
FAILED: SemanticException [Error 10015]: Line 1:7 Arguments length mismatch '3123': Input Args Length Error!!!

           

UDAF(User-Defined-Aggregation Function)

聚集函數,多進一出

類似于:count/max/min

UDTF(User-Defined Table-Generating Functions)

一進多出

如lateral view explode()

需求:

hive(default)> select myudtf("hello,world,hadoop,hive", ",");
hello
world
hadoop
hive
           

1.自定義類,繼承GenericUDTF類,重寫initialize和Process方法

package hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定義UDTF函數
 *
 * 需要繼承 GenericUDTF類,并重寫抽象方法
 *
 * 函數的使用: select myudtf('hive,hadoop,flume,kafka',',');
 *
 * 結果:  word
 *      hive
 *      hadoop
 *      flume
 *      kafka
 *
 * 擴充:
 * select myudtf2('hadoop-niupi,java-lihai,songsong-kuai,dahai-lang',',','-');
 * 結果: word1   word2
 *      hadoop niupi
 *      java  lihai
 *      songsong kuai
 *      dahai lang
 *
 */
public class StringSplitUDTF extends GenericUDTF {
    private ArrayList<String> outList=new ArrayList<>();

    /**
     * 初始化方法  約定函數的傳回值類型 和 函數的傳回值列名
     * @param argOIs
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //基本判斷
        List<? extends StructField> fieldRefs = argOIs.getAllStructFieldRefs();
        if(fieldRefs.size() != 2){
            throw new UDFArgumentException("Input Args Length Error!!!");
        }
        //約定函數傳回的列的名字
        ArrayList<String> fieldNames=new ArrayList<>();
        fieldNames.add("word");
        ArrayList<ObjectInspector> fieldOIs=new ArrayList<>();
        //約定函數傳回的列的類型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);

    }

    /**
     * 函數邏輯處理
     * @param objects
     * @throws HiveException
     */
    @Override
    public void process(Object[] objects) throws HiveException {
        //擷取參數
        String argsData = objects[0].toString();
        String argsSplit = objects[1].toString();
        //切分資料
        String[] words = argsData.split(argsSplit);
        //寫出資料 每個單詞一行
        for (String word : words) {
            //因為集合時重用的,是以每次要先清空
            outList.clear();
            //将目前的單詞全放到集合中
            outList.add(word);
            //将目前的單詞一行一行寫出
            forward(outList);
        }

    }

    @Override
    public void close() throws HiveException {

    }
}

           

2.打包

3.将打好的jar包添加到linux中

4.将上傳的jar包添加到hive的環境中,建立自定義函數

hive (default)> add jar /opt/module/hive/datas/myudtf.jar;
Added [/opt/module/hive/datas/myudtf.jar] to class path
Added resources: [/opt/module/hive/datas/myudtf.jar]

hive (default)> create temporary function myudtf as "hive.udtf.StringSplitUDTF";
OK
Time taken: 0.104 seconds

hive (default)> select myudtf("hive,hadoop,flume,kafka",",");
OK
word
hive
hadoop
flume
kafka
Time taken: 0.278 seconds, Fetched: 4 row(s)

           

測試

如果自定義的包有問題,則hive重複 add jar 更新包不會生效。斷掉本次連接配接,重新進入,方能生效。

擴充測試:傳回兩列

package hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定義UDTF函數
 *
 * 需要繼承 GenericUDTF類,并重寫抽象方法
 *
 * 函數的使用: select myudtf('hive,hadoop,flume,kafka',',');
 *
 * 結果:  word
 *      hive
 *      hadoop
 *      flume
 *      kafka
 *
 * 擴充:
 * select myudtf2('hadoop-niupi,java-lihai,songsong-kuai,dahai-lang',',','-');
 * 結果: word1   word2
 *      hadoop niupi
 *      java  lihai
 *      songsong kuai
 *      dahai lang
 *
 */
public class StringSplitUDTF2 extends GenericUDTF {
    private ArrayList<String> outList=new ArrayList<>();

    /**
     * 初始化方法  約定函數的傳回值類型 和 函數的傳回值列名
     * @param argOIs
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //基本判斷
        List<? extends StructField> fieldRefs = argOIs.getAllStructFieldRefs();
        if(fieldRefs.size() != 3){
            throw new UDFArgumentException("Input Args Length Error!!!");
        }
        //約定函數傳回的列的名字
        ArrayList<String> fieldNames=new ArrayList<>();
        fieldNames.add("word1");
        fieldNames.add("word2");
        ArrayList<ObjectInspector> fieldOIs=new ArrayList<>();
        //約定函數傳回的列的類型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);

    }

    /**
     * 函數邏輯處理
     * @param objects
     * @throws HiveException
     */
    @Override
    public void process(Object[] objects) throws HiveException {
        //擷取參數
        String argsData = objects[0].toString();//'hadoop-niupi,java-lihai,songsong-kuai,dahai-lang'
        String rowsSplit = objects[1].toString(); // ,
        String colsSplit = objects[2].toString();// -
        String[] rows = argsData.split(rowsSplit);
        for (String row : rows) {
            //因為集合是複用的,使用前先清空
            outList.clear();
            //row:hadoop-niupi
            String[] cols = row.split(colsSplit);
            for (String word : cols) {
                outList.add(word);//一行中的兩列資料
            }
            //寫出
            forward(outList);
        }


    }

    @Override
    public void close() throws HiveException {

    }
}

           
hive (default)> add jar /opt/module/hive/datas/myudtf2.jar;
Added [/opt/module/hive/datas/myudtf2.jar] to class path
Added resources: [/opt/module/hive/datas/myudtf2.jar]
hive (default)> create temporary function myudtf2 as 'hive.udtf.StringSplitUDTF2';
OK
Time taken: 0.449 seconds
hive (default)> select myudtf2('hadoop-niup','songsong-lihai','dahai-kuai',',','-');
FAILED: UDFArgumentException Input Args Length Error!!!
hive (default)> select myudtf2('hadoop-niup,songsong-lihai,dahai-kuai',',','-');
OK
word1   word2
hadoop  niup
songsong        lihai
dahai   kuai
Time taken: 0.194 seconds, Fetched: 3 row(s)

           

繼續閱讀