天天看點

hive小操作·自定義函數 udtf

包含兩個案例

1、hive多列操作----行轉列

2、hive單列操作----使用split切分json資料

一、udtf的介紹

UDTF(User-Defined Table-Generating Functions) 用來解決 輸入一行輸出多行(On-to-many maping) 的需求

二、udtf的使用

1、使用規則必須繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,實作initialize, process, close三個方法。

2、程式流程:

UDTF首先會調用initialize方法,此方法傳回UDTF的傳回行的資訊(傳回個數,類型)。

初始化完成後,會調用process方法,真正的處理過程在process函數中,在process中,每一次forward()調用産生一行;如果産生多列可以将多個列的值放在一個數組中,然後将該數組傳入到forward()函數。

最後close()方法調用,對需要清理的方法進行清理。

三、UDTF 案例一

1、行轉列

如下hive表資料

1   a,b,c   1,2
  2   j,k     NULL
  3   NULL    NULL
           

将行通過UDTF轉為列

如下

a   1
b   2
c   NULL
j   NULL
k   NULL
           

2、具體實作

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hello</groupId>
    <artifactId>hive</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hive</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>com.vaadin.external.google</groupId>
                    <artifactId>android-json</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--添加hive依賴 -->
       <dependency>
           <groupId>org.apache.hive</groupId>
           <artifactId>hive-exec</artifactId>
           <version>3.1.1</version>
       </dependency>
        <!-- Spark dependency  2.0.0 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
           

具體實作代碼

package com.hello.hive;
/**
 * @ProjectName: hive
 * @Package: com.hello.hive
 * @ClassName: ArrToMapUDTF
 * @Author: dongsong
 * @Description: UDTF-行變列
 * @Date: 2019/8/12 9:19
 * @Version: 1.0
 */

import java.util.List;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
 *@program: hive
 *@description: UDTF-行變列
 *@author: by song
 *@create: 2019-08-12 09:19
 */
public class ArrToMapUDTF extends GenericUDTF {
    private String[] obj = new String[2];
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {

        //生成表的字段名數組
        List<String> colName = Lists.newLinkedList();
        colName.add("key");
        colName.add("value");
        //生成表的字段對象監控器(object inspector)數組,即生成表的行對象每個字段的類型
        List<ObjectInspector> resType = Lists.newLinkedList();
        resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        // 傳回分别為列名 和 列類型,通過ObjectInspectorFactory.getStandardStructObjectInspector方法
        //擷取到hive的值對object進行通路
        return ObjectInspectorFactory.getStandardStructObjectInspector(colName, resType);
    }
    //在udtf中實際對hive的操作,如上面擷取到的是在objectinspector對象中解耦的兩個列值,然後下面的函數是對
    //這兩列的實際操作,通過字元串截取來将行轉為列
    //process方法中的參數Object[] args,我認為應該是ObjectInspectorFactory.getStandardStructObjectInspector
    //對hive解耦後的列值
    @Override
    public void process(Object[] args) throws HiveException {
        if (args[0] == null) {
            return;
        }
        String arg1 = args[0].toString();
        String[] arr1 = arg1.split(",");
        String[] arr2 = null;

        if (args[1] != null) {
            arr2 = args[1].toString().split(",");
        }
        for (int i = 0; i < arr1.length; i++) {
            obj[0] = arr1[i];
            if (arr2 != null && arr2.length > i) {
                obj[1] = arr2[i];
            } else {
                obj[1] = null;
            }
            //使用本機調forward會報異常,正好可以列印出不在hive環境運作時的結果
            try{
                forward(obj);
            }catch (Exception e){
                System.out.println("***********本機調試***********");
                System.out.println(obj[0]+" "+obj[1]);
            }


        }
    }

    @Override
    public void close() throws HiveException { }


    public static void main(String[] args) throws HiveException {
        Object[] arg0 = new Object[2];
        arg0[0]="a,s,d";
        arg0[1]="1,2";
        ArrToMapUDTF arr=new ArrToMapUDTF();
        arr.process(arg0);
    }

}
           

本機調試結果

hive小操作·自定義函數 udtf

3、執行過程

1、maven函數打包

2、打包好後放入Hadoop自定義目錄中

3、進入hive頁面中

-- 執行效果
  hive> add jar /*/*.jar;
  Added [/*/*.jar] to class path
  Added resources: [/*/*.jar]
  hive> create temporary function get_map as 'com.hello.hive.ArrToMapUDTF';
  OK
  Time taken: 0.005 seconds
  hive> select get_map(col,col1) from test;
  OK
  a   1
  b   2
  c   NULL
  j   NULL
  k   NULL
  Time taken: 1.008 seconds, Fetched: 5 row(s)
           

以上為get_map函數的使用方法,該方法局限性為使用時無法引用其它列。結合lateral view關鍵詞使用可以達到預期效果。

select t.ind,t.col,t.col1,t1.key,t1.value
from test t 
lateral view get_map(col,col1) t1 as key,value;
           

該使用方法中涉及到t、t1兩張表。lateral view将兩張表進行join操作,過程如下:

對輸入的test表中的col、col1列進行udtf操作,将得到的資料集命名為t1,并對列指令為key,value

将t表和t1表進行join操作,得到結果資料集

可以看到lateral view 和join操作類似。另外lateral view也支援謂詞下推和outer join操作,當udtf不傳回值而左側表有值,此時outer關鍵詞登場了,類似于left outer join操作

四、UDTF 案例二

1、hive單列操作----使用split切分json資料

package com.hello.hive;/**
 * @ProjectName: hive
 * @Package: com.hello.hive
 * @ClassName: SplitJsonUDTF
 * @Author: dongsong
 * @Description: 用來切分json是資料
 * @Date: 2019/8/12 13:54
 * @Version: 1.0
 */
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Set;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

/**
 *@program: hive
 *@description: 用來切分json是資料
 *@author: by song
 *@create: 2019-08-12 13:54
 */

public class SplitJsonUDTF extends GenericUDTF{


    @Override
    public StructObjectInspector initialize(ObjectInspector[] args)
            throws UDFArgumentException {
        if (args.length != 1) {//判斷參數是否為1.
            throw new UDFArgumentLengthException(
                    "Redis_Gmp takes only one argument");
        }//判斷參數是否是PRIMITIVE,LIST,MAP,STRUCT,UNION類型;
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException(
                    "Redis_Gmp takes string as a parameter");
        }

        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("content_id");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("app");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("click");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("impression");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("ctr");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(
                fieldNames, fieldOIs);
    }

    @Override
    public void close() throws HiveException {
        // TODO Auto-generated method stub

    }

    @Override
    public void process(Object[] arg0) throws HiveException {

        String input = arg0[0].toString();
        String[] split = input.split(",\\[");

        String content_id = split[0].replace("(", "");
        String json_str = split[1].replace(")", "");
        json_str = "["+json_str;
        //(98337176,[{"-1":{"impression":167,"click":0.9933209,"ctr":0.02424849}},{"ali":{"impression":163,"click":0.9933209,"ctr":0.025131164}}])
        //把Json格式資料轉為數組形式
        JSONArray json_arr = JSONArray.parseArray(json_str);
      //[{"-1":{"ctr":0.006597938,"impression":150,"click":0.3438084}},{"coolpad":{"ctr":0.018344998,"impression":56,"click":0.3438084}},{"emui":{"ctr":0,"impression":64,"click":0}}]
        for(int i =0 ;i < json_arr.size();i++){
            String[] result = new String[5];
            result[0] = content_id;
            //擷取到單個json數組
            JSONObject ele = json_arr.getJSONObject(i);//{"-1":{"ctr":0.006597938,"impression":150,"click":0.3438084}}
            //通過set對象擷取到json的鍵
            Set<String> ks = ele.keySet();   //[-1]
            for(String k : ks){
                result[1] = k;//将鍵轉為string字元串-1
            }
            result[2] = ele.getJSONObject(result[1]).getString("click");
            result[3] = ele.getJSONObject(result[1]).getString("impression");
            result[4] = ele.getJSONObject(result[1]).getString("ctr");
            //forward(result);
            //使用本機調forward會報異常,正好可以列印出不在hive環境運作時的結果
            try{
                forward(result);
            }catch (Exception e){
                System.out.println("***********本機調試***********");
                System.out.println(result[0] + " " + result[1] + " " + result[2] +" "+result[3]+" "+result[4]);
            }
        }
    }


    public static void main(String[] args) throws HiveException {
        SplitJsonUDTF redis_gmp = new SplitJsonUDTF();
        String s1 = "(98337176,[{\"-1\":{\"impression\":167,\"click\":0.9933209,\"ctr\":0.02424849}},{\"ali\":{\"impression\":163,\"click\":0.9933209,\"ctr\":0.025131164}}])";
        String s2 = "(119962233,[{\"-1\":{\"impression\":150,\"click\":0.3438084,\"ctr\":0.006597938}},{\"coolpad\":{\"impression\":56,\"click\":0.3438084,\"ctr\":0.018344998}},{\"emui\":{\"impression\":64,\"click\":0,\"ctr\":0}}])";
        Object[] arg0 = new Object[]{s2};
        redis_gmp.process(arg0);
    }
}
           

本機測試結果

hive小操作·自定義函數 udtf

2、使用方法

UDTF有兩種使用方法,一種直接放到select後面,一種和lateral view一起使用。

1、直接select中使用

select explode_map(properties) as (col1,col2) from src;
           

不可以添加其他字段使用

select a, explode_map(properties) as (col1,col2) from src
           

不可以嵌套調用

select explode_map(explode_map(properties)) from src
           

不可以和group by/cluster by/distribute by/sort by一起使用

select explode_map(properties) as (col1,col2) from src group by col1, col2
           

2、和lateral view一起使用

select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;
           

繼續閱讀