目錄
-
- UDF(User-Defined-Function)
- UDAF(User-Defined-Aggregation Function)
- UDTF(User-Defined Table-Generating Functions)
UDF(User-Defined-Function)
一進一出
開發示例:
1.建立maven項目, 引入依賴
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.6</version>
</dependency>
</dependencies>
2.編寫自定義類,繼承GenericUDF類,并重寫initialize和evaluate方法。
package hive.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
* 自定義UDF函數
*
* 需要繼承GenericUDF類,重寫抽象方法。
*
* 函數的調用: select my_len('abcd');
* UDF
*/
public class StringLengthUDF extends GenericUDF {
/**
* 初始化方法 判斷傳入到參數的個數,類型等,約定函數的傳回值類型
* @param objectInspectors 傳入到函數的參數類型
* @return
* @throws UDFArgumentException
*/
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
//簡單判斷
//判斷參數的個數
if(objectInspectors.length != 1){
throw new UDFArgumentLengthException("Input Args Length Error!!!");
}
//判斷類型
if(!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentTypeException(0,"Input Args Type Error!!!");
}
//約定函數的傳回值類型
return PrimitiveObjectInspectorFactory.javaIntObjectInspector; //int
}
/**
* 函數的邏輯處理
* @param deferredObjects 傳入到函數的參數值
* @return
* @throws HiveException
*/
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
//擷取參數
Object o = deferredObjects[0].get();
return o.toString().length();
}
@Override
public String getDisplayString(String[] strings) {
return "";
}
}
3.maven 打包,上傳到伺服器 /opt/module/hive/datas目錄
重命名jar包,為了後續友善書寫
4.将自定義的UDF函數jar包,在hive指令視窗,添加到hive 的class path中:
hive (default)> add jar /opt/module/hive/datas/myudf.jar ;
Added [/opt/module/hive/datas/myudf.jar] to class path
Added resources: [/opt/module/hive/datas/myudf.jar]
5.注冊函數名稱,綁定自定義的UDF類(hive.udf.StringLengthUDF)
temporary 可選,表示本次連接配接後失效。
6.使用
hive (default)> select my_len('adsdfsdf');
OK
_c0
8
Time taken: 0.022 seconds, Fetched: 1 row(s)
hive (default)> select my_len(3123123);
OK
_c0
7
Time taken: 0.033 seconds, Fetched: 1 row(s)
--異常報錯:
FAILED: ParseException line 1:34 mismatched input '<EOF>' expecting ) near ')' in function specification
hive (default)> select my_len(split('a,b,v,d',','));
FAILED: SemanticException [Error 10016]: Line 1:14 Argument type mismatch '','': Input Args Type Error!!!
hive (default)> select my_len(3123123,3123);
FAILED: SemanticException [Error 10015]: Line 1:7 Arguments length mismatch '3123': Input Args Length Error!!!
UDAF(User-Defined-Aggregation Function)
聚集函數,多進一出
類似于:count/max/min
UDTF(User-Defined Table-Generating Functions)
一進多出
如lateral view explode()
需求:
hive(default)> select myudtf("hello,world,hadoop,hive", ",");
hello
world
hadoop
hive
1.自定義類,繼承GenericUDTF類,重寫initialize和Process方法
package hive.udtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
/**
* 自定義UDTF函數
*
* 需要繼承 GenericUDTF類,并重寫抽象方法
*
* 函數的使用: select myudtf('hive,hadoop,flume,kafka',',');
*
* 結果: word
* hive
* hadoop
* flume
* kafka
*
* 擴充:
* select myudtf2('hadoop-niupi,java-lihai,songsong-kuai,dahai-lang',',','-');
* 結果: word1 word2
* hadoop niupi
* java lihai
* songsong kuai
* dahai lang
*
*/
public class StringSplitUDTF extends GenericUDTF {
private ArrayList<String> outList=new ArrayList<>();
/**
* 初始化方法 約定函數的傳回值類型 和 函數的傳回值列名
* @param argOIs
* @return
* @throws UDFArgumentException
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//基本判斷
List<? extends StructField> fieldRefs = argOIs.getAllStructFieldRefs();
if(fieldRefs.size() != 2){
throw new UDFArgumentException("Input Args Length Error!!!");
}
//約定函數傳回的列的名字
ArrayList<String> fieldNames=new ArrayList<>();
fieldNames.add("word");
ArrayList<ObjectInspector> fieldOIs=new ArrayList<>();
//約定函數傳回的列的類型
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
/**
* 函數邏輯處理
* @param objects
* @throws HiveException
*/
@Override
public void process(Object[] objects) throws HiveException {
//擷取參數
String argsData = objects[0].toString();
String argsSplit = objects[1].toString();
//切分資料
String[] words = argsData.split(argsSplit);
//寫出資料 每個單詞一行
for (String word : words) {
//因為集合時重用的,是以每次要先清空
outList.clear();
//将目前的單詞全放到集合中
outList.add(word);
//将目前的單詞一行一行寫出
forward(outList);
}
}
@Override
public void close() throws HiveException {
}
}
2.打包
3.将打好的jar包添加到linux中
4.将上傳的jar包添加到hive的環境中,建立自定義函數
hive (default)> add jar /opt/module/hive/datas/myudtf.jar;
Added [/opt/module/hive/datas/myudtf.jar] to class path
Added resources: [/opt/module/hive/datas/myudtf.jar]
hive (default)> create temporary function myudtf as "hive.udtf.StringSplitUDTF";
OK
Time taken: 0.104 seconds
hive (default)> select myudtf("hive,hadoop,flume,kafka",",");
OK
word
hive
hadoop
flume
kafka
Time taken: 0.278 seconds, Fetched: 4 row(s)
測試
如果自定義的包有問題,則hive重複 add jar 更新包不會生效。斷掉本次連接配接,重新進入,方能生效。
擴充測試:傳回兩列
package hive.udtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
/**
* 自定義UDTF函數
*
* 需要繼承 GenericUDTF類,并重寫抽象方法
*
* 函數的使用: select myudtf('hive,hadoop,flume,kafka',',');
*
* 結果: word
* hive
* hadoop
* flume
* kafka
*
* 擴充:
* select myudtf2('hadoop-niupi,java-lihai,songsong-kuai,dahai-lang',',','-');
* 結果: word1 word2
* hadoop niupi
* java lihai
* songsong kuai
* dahai lang
*
*/
public class StringSplitUDTF2 extends GenericUDTF {
private ArrayList<String> outList=new ArrayList<>();
/**
* 初始化方法 約定函數的傳回值類型 和 函數的傳回值列名
* @param argOIs
* @return
* @throws UDFArgumentException
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//基本判斷
List<? extends StructField> fieldRefs = argOIs.getAllStructFieldRefs();
if(fieldRefs.size() != 3){
throw new UDFArgumentException("Input Args Length Error!!!");
}
//約定函數傳回的列的名字
ArrayList<String> fieldNames=new ArrayList<>();
fieldNames.add("word1");
fieldNames.add("word2");
ArrayList<ObjectInspector> fieldOIs=new ArrayList<>();
//約定函數傳回的列的類型
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
/**
* 函數邏輯處理
* @param objects
* @throws HiveException
*/
@Override
public void process(Object[] objects) throws HiveException {
//擷取參數
String argsData = objects[0].toString();//'hadoop-niupi,java-lihai,songsong-kuai,dahai-lang'
String rowsSplit = objects[1].toString(); // ,
String colsSplit = objects[2].toString();// -
String[] rows = argsData.split(rowsSplit);
for (String row : rows) {
//因為集合是複用的,使用前先清空
outList.clear();
//row:hadoop-niupi
String[] cols = row.split(colsSplit);
for (String word : cols) {
outList.add(word);//一行中的兩列資料
}
//寫出
forward(outList);
}
}
@Override
public void close() throws HiveException {
}
}
hive (default)> add jar /opt/module/hive/datas/myudtf2.jar;
Added [/opt/module/hive/datas/myudtf2.jar] to class path
Added resources: [/opt/module/hive/datas/myudtf2.jar]
hive (default)> create temporary function myudtf2 as 'hive.udtf.StringSplitUDTF2';
OK
Time taken: 0.449 seconds
hive (default)> select myudtf2('hadoop-niup','songsong-lihai','dahai-kuai',',','-');
FAILED: UDFArgumentException Input Args Length Error!!!
hive (default)> select myudtf2('hadoop-niup,songsong-lihai,dahai-kuai',',','-');
OK
word1 word2
hadoop niup
songsong lihai
dahai kuai
Time taken: 0.194 seconds, Fetched: 3 row(s)