包含兩個案例
1、hive多列操作----行轉列
2、hive單列操作----使用split切分json資料
一、udtf的介紹
UDTF(User-Defined Table-Generating Functions) 用來解決 輸入一行輸出多行(On-to-many maping) 的需求
二、udtf的使用
1、使用規則必須繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,實作initialize, process, close三個方法。
2、程式流程:
UDTF首先會調用initialize方法,此方法傳回UDTF的傳回行的資訊(傳回個數,類型)。
初始化完成後,會調用process方法,真正的處理過程在process函數中,在process中,每一次forward()調用産生一行;如果産生多列可以将多個列的值放在一個數組中,然後将該數組傳入到forward()函數。
最後close()方法調用,對需要清理的方法進行清理。
三、UDTF 案例一
1、行轉列
如下hive表資料
1 a,b,c 1,2
2 j,k NULL
3 NULL NULL
将行通過UDTF轉為列
如下
a 1
b 2
c NULL
j NULL
k NULL
2、具體實作
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hello</groupId>
<artifactId>hive</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hive</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.vaadin.external.google</groupId>
<artifactId>android-json</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--添加hive依賴 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.1</version>
</dependency>
<!-- Spark dependency 2.0.0 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
具體實作代碼
package com.hello.hive;
/**
* @ProjectName: hive
* @Package: com.hello.hive
* @ClassName: ArrToMapUDTF
* @Author: dongsong
* @Description: UDTF-行變列
* @Date: 2019/8/12 9:19
* @Version: 1.0
*/
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
*@program: hive
*@description: UDTF-行變列
*@author: by song
*@create: 2019-08-12 09:19
*/
public class ArrToMapUDTF extends GenericUDTF {
private String[] obj = new String[2];
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//生成表的字段名數組
List<String> colName = Lists.newLinkedList();
colName.add("key");
colName.add("value");
//生成表的字段對象監控器(object inspector)數組,即生成表的行對象每個字段的類型
List<ObjectInspector> resType = Lists.newLinkedList();
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
// 傳回分别為列名 和 列類型,通過ObjectInspectorFactory.getStandardStructObjectInspector方法
//擷取到hive的值對object進行通路
return ObjectInspectorFactory.getStandardStructObjectInspector(colName, resType);
}
//在udtf中實際對hive的操作,如上面擷取到的是在objectinspector對象中解耦的兩個列值,然後下面的函數是對
//這兩列的實際操作,通過字元串截取來将行轉為列
//process方法中的參數Object[] args,我認為應該是ObjectInspectorFactory.getStandardStructObjectInspector
//對hive解耦後的列值
@Override
public void process(Object[] args) throws HiveException {
if (args[0] == null) {
return;
}
String arg1 = args[0].toString();
String[] arr1 = arg1.split(",");
String[] arr2 = null;
if (args[1] != null) {
arr2 = args[1].toString().split(",");
}
for (int i = 0; i < arr1.length; i++) {
obj[0] = arr1[i];
if (arr2 != null && arr2.length > i) {
obj[1] = arr2[i];
} else {
obj[1] = null;
}
//使用本機調forward會報異常,正好可以列印出不在hive環境運作時的結果
try{
forward(obj);
}catch (Exception e){
System.out.println("***********本機調試***********");
System.out.println(obj[0]+" "+obj[1]);
}
}
}
@Override
public void close() throws HiveException { }
public static void main(String[] args) throws HiveException {
Object[] arg0 = new Object[2];
arg0[0]="a,s,d";
arg0[1]="1,2";
ArrToMapUDTF arr=new ArrToMapUDTF();
arr.process(arg0);
}
}
本機調試結果
3、執行過程
1、maven函數打包
2、打包好後放入Hadoop自定義目錄中
3、進入hive頁面中
-- 執行效果
hive> add jar /*/*.jar;
Added [/*/*.jar] to class path
Added resources: [/*/*.jar]
hive> create temporary function get_map as 'com.hello.hive.ArrToMapUDTF';
OK
Time taken: 0.005 seconds
hive> select get_map(col,col1) from test;
OK
a 1
b 2
c NULL
j NULL
k NULL
Time taken: 1.008 seconds, Fetched: 5 row(s)
以上為get_map函數的使用方法,該方法局限性為使用時無法引用其它列。結合lateral view關鍵詞使用可以達到預期效果。
select t.ind,t.col,t.col1,t1.key,t1.value
from test t
lateral view get_map(col,col1) t1 as key,value;
該使用方法中涉及到t、t1兩張表。lateral view将兩張表進行join操作,過程如下:
對輸入的test表中的col、col1列進行udtf操作,将得到的資料集命名為t1,并對列指令為key,value
将t表和t1表進行join操作,得到結果資料集
可以看到lateral view 和join操作類似。另外lateral view也支援謂詞下推和outer join操作,當udtf不傳回值而左側表有值,此時outer關鍵詞登場了,類似于left outer join操作
四、UDTF 案例二
1、hive單列操作----使用split切分json資料
package com.hello.hive;/**
* @ProjectName: hive
* @Package: com.hello.hive
* @ClassName: SplitJsonUDTF
* @Author: dongsong
* @Description: 用來切分json是資料
* @Date: 2019/8/12 13:54
* @Version: 1.0
*/
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Set;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
*@program: hive
*@description: 用來切分json是資料
*@author: by song
*@create: 2019-08-12 13:54
*/
public class SplitJsonUDTF extends GenericUDTF{
@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 1) {//判斷參數是否為1.
throw new UDFArgumentLengthException(
"Redis_Gmp takes only one argument");
}//判斷參數是否是PRIMITIVE,LIST,MAP,STRUCT,UNION類型;
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(
"Redis_Gmp takes string as a parameter");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("content_id");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("app");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("click");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("impression");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("ctr");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldOIs);
}
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
@Override
public void process(Object[] arg0) throws HiveException {
String input = arg0[0].toString();
String[] split = input.split(",\\[");
String content_id = split[0].replace("(", "");
String json_str = split[1].replace(")", "");
json_str = "["+json_str;
//(98337176,[{"-1":{"impression":167,"click":0.9933209,"ctr":0.02424849}},{"ali":{"impression":163,"click":0.9933209,"ctr":0.025131164}}])
//把Json格式資料轉為數組形式
JSONArray json_arr = JSONArray.parseArray(json_str);
//[{"-1":{"ctr":0.006597938,"impression":150,"click":0.3438084}},{"coolpad":{"ctr":0.018344998,"impression":56,"click":0.3438084}},{"emui":{"ctr":0,"impression":64,"click":0}}]
for(int i =0 ;i < json_arr.size();i++){
String[] result = new String[5];
result[0] = content_id;
//擷取到單個json數組
JSONObject ele = json_arr.getJSONObject(i);//{"-1":{"ctr":0.006597938,"impression":150,"click":0.3438084}}
//通過set對象擷取到json的鍵
Set<String> ks = ele.keySet(); //[-1]
for(String k : ks){
result[1] = k;//将鍵轉為string字元串-1
}
result[2] = ele.getJSONObject(result[1]).getString("click");
result[3] = ele.getJSONObject(result[1]).getString("impression");
result[4] = ele.getJSONObject(result[1]).getString("ctr");
//forward(result);
//使用本機調forward會報異常,正好可以列印出不在hive環境運作時的結果
try{
forward(result);
}catch (Exception e){
System.out.println("***********本機調試***********");
System.out.println(result[0] + " " + result[1] + " " + result[2] +" "+result[3]+" "+result[4]);
}
}
}
public static void main(String[] args) throws HiveException {
SplitJsonUDTF redis_gmp = new SplitJsonUDTF();
String s1 = "(98337176,[{\"-1\":{\"impression\":167,\"click\":0.9933209,\"ctr\":0.02424849}},{\"ali\":{\"impression\":163,\"click\":0.9933209,\"ctr\":0.025131164}}])";
String s2 = "(119962233,[{\"-1\":{\"impression\":150,\"click\":0.3438084,\"ctr\":0.006597938}},{\"coolpad\":{\"impression\":56,\"click\":0.3438084,\"ctr\":0.018344998}},{\"emui\":{\"impression\":64,\"click\":0,\"ctr\":0}}])";
Object[] arg0 = new Object[]{s2};
redis_gmp.process(arg0);
}
}
本機測試結果
2、使用方法
UDTF有兩種使用方法,一種直接放到select後面,一種和lateral view一起使用。
1、直接select中使用
select explode_map(properties) as (col1,col2) from src;
不可以添加其他字段使用
select a, explode_map(properties) as (col1,col2) from src
不可以嵌套調用
select explode_map(explode_map(properties)) from src
不可以和group by/cluster by/distribute by/sort by一起使用
select explode_map(properties) as (col1,col2) from src group by col1, col2
2、和lateral view一起使用
select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;