天天看點

UDF精簡使用大全一、UDF的分類二、UDF參數解析三、UDF的使用方式

一、UDF的分類

UDF類型 描述
UDF(User Defined Scalar Function) 使用者自定義标量值函數。其輸入與輸出是一對一的關系,即讀入一行資料,輸出一個值。
UDTF(User Defined Table Valued Function) 自定義表值函數。用于解決調用一次函數輸出多行資料的需求。UDTF是唯一能夠傳回多個字段的自定義函數。UDTF不等于UDT(User Defined Type)。
UDAF(User Defined Aggregation Function) 自定義聚合函數。其輸入與輸出是多對一的關系,即将多條輸入記錄聚合成一個輸出值。UDAF可以與SQL中的GROUP BY語句聯用。具體文法請參見 聚合函數

二、UDF參數解析

MaxCompute資料類型與Java資料類型的對應關系如下。

注意點:

  • 此處ARRAY類型對應的Java類型是List,而不是數組。
  • VARCHAR,BINART,STRUCT一些資料類型是ODPS獨有的
  • Java中對應的資料類型以及傳回值資料類型是對象,資料類型首字母需大寫。
MaxCompute Type Java Type
TINYINT java.lang.Byte
SMALLINT java.lang.Short
INT java.lang.Integer
BIGINT java.lang.Long
FLOAT java.lang.Float
DOUBLE java.lang.Double
DECIMAL java.math.BigDecimal
BOOLEAN java.lang.Boolean
STRING java.lang.String
VARCHAR com.aliyun.odps.data.Varchar
BINARY com.aliyun.odps.data.Binary
DATETIME java.util.Date
TIMESTAMP java.sql.Timestamp
ARRAY java.util.List
MAP java.util.Map
STRUCT com.aliyun.odps.data.Struct

MaxCompute 2.0版本支援定義Java UDF時,使用Writable類型作為參數和傳回值。MaxCompute資料類型和Java Writable類型的映射關系如下。

Java Writable Type
ByteWritable
ShortWritable
IntWritable
LongWritable
FloatWritable
DoubleWritable
BigDecimalWritable
BooleanWritable
Text
VarcharWritable
BytesWritable
DatetimeWritable
TimestampWritable
INTERVAL_YEAR_MONTH IntervalYearMonthWritable
INTERVAL_DAY_TIME IntervalDayTimeWritable
N/A
MaxCompute SQL Type Python 2 Type
STR
BOOL
CHAR
BYTEARRAY
DATE
DECIMAL.DECIMAL
LIST
DICT
COLLECTIONS.NAMEDTUPLE
Python 3 Type
UNICODE
DATETIME.DATETIME
BYTES
DATETIME.DATE

三、UDF的使用方式

UDF、UDTF、UDAT可進行參考文檔

https://help.aliyun.com/document_detail/27867.html?spm=a2c4g.11186623.6.762.463d7468xnFPHb

JAVA UDF

UDF的進階使用:

3.1UDF中的變長參數

java語言:

package com.mrtest.cn;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.ArrayList;
import java.util.List;
@Resolve({"*->array"})
public class TestUDF extends UDF {
    public List evaluate(String ... s) {
        List list = new ArrayList();
        for (String name : s) {
            list.add(name);
        }
        return list;
    }
}      

Python語言:

from odps.udf import annotate
@annotate("*->bigint")
class ParamFunc(object):
   def evaluate(self, *nums):
        sum = 0
        for num in nums:
            sum=num+sum
        return sum      

3.2UDF的重載

注意事項:對于List與List是不能解析對應的方法的,這種屬于類型擦除

package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.UDF;
public class UDFExample extends UDF {
  public String evaluate(String a) {
    return "s2s:" + a;
  }
  public String evaluate(String a, String b) {
    return "ss2s:" + a + "," + b;
  }
  public String evaluate(String a, String b, String c) {
    return "sss2s:" + a + "," + b + "," + c;
  }
}      

3.3UDF通路對應檔案和表

package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
public class UDFResource extends UDF {
  ExecutionContext ctx;
  long fileResourceLineCount;
  long tableResource1RecordCount;
  long tableResource2RecordCount;
  @Override
  public void setup(ExecutionContext ctx) throws UDFException {
    this.ctx = ctx;
    try {
      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
      BufferedReader br = new BufferedReader(new InputStreamReader(in));
      String line;
      fileResourceLineCount = 0;
      while ((line = br.readLine()) != null) {
        fileResourceLineCount++;
      }
      br.close();
      Iterator iterator = ctx.readResourceTable("table_resource1").iterator();
      tableResource1RecordCount = 0;
      while (iterator.hasNext()) {
        tableResource1RecordCount++;
        iterator.next();
      }
      iterator = ctx.readResourceTable("table_resource2").iterator();
      tableResource2RecordCount = 0;
      while (iterator.hasNext()) {
        tableResource2RecordCount++;
        iterator.next();
      }
    } catch (IOException e) {
      throw new UDFException(e);
    }
  }
  /**
   * project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb
   */
  public String evaluate(String a, String b) {
    return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount
        + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount="
        + tableResource2RecordCount;
  }
}      

python語言:

#coding: utf-8
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('double -> double')
class Compute(object):
    def __init__(self):
            import json
            #擷取對應文本檔案
            cache_file = get_cache_file('file.txt')
            dataMat = []
            for line in cache_file :
                curLine = line.strip().split(',')
                #處理邏輯
            cache_file.close()
            #擷取對應的表檔案
            records = list(get_cache_table('table_resource1'))
         for record in records:
             self.my_dict[record[0]] = [record[1]]
                #處理邏輯
    def evaluate(self, input):
        #處理邏輯      

3.4UDF通路外部網絡(VPC、外部網絡、專有網絡)

https://help.aliyun.com/document_detail/187866.html

3.5UDF使用第三方包

https://help.aliyun.com/document_detail/189752.html
#coding: utf-8
# explode.py
from odps.udf import annotate
from odps.distcache import get_cache_archive
import datetime
def include_package_path(res_name):
    import os, sys
    archive_files = get_cache_archive(res_name)
    dir_names = sorted ([os.path.dirname(os.path.normpath(f.name)) for f in archive_files 
                        if '.dist_info' not in f.name], key=lambda v: len(v))
    sys.path.append(os.path.dirname(dir_names[0]))
@annotate("string->boolean")
class is_workday_udf(object):
    def __init__(self):
        include_package_path('chinese-calendar-master.zip')
    def evaluate(self, date_str):
        # try:
        import chinese_calendar
        date_strs = date_str.split("-")
        year_num = int(date_strs[0])
        month_num = int(date_strs[1])
        day_num = int(date_strs[2])
        date_num = datetime.date(year=year_num, month=month_num, day=day_num)
        result = chinese_calendar.is_workday(date_num)
        return result
        # except:
        #     return True      

函數的注冊

UDF精簡使用大全一、UDF的分類二、UDF參數解析三、UDF的使用方式

執行的select的的操作

set odsp.pypy.enabled=false;
set odps.isolation.session.enable=true;
select my_json('{"info":"11","desc":"a|b","filename":"4b-2a-3c-4d-5b"}') as a;      

3.6使用嵌入式開發UDF

CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING
#CODE ('lang'='JAVA')
package com.mypackage;
import com.aliyun.odps.udf.UDF;
public class Reverse extends UDF {
  public String evaluate(String input) {
    if (input == null) return null;
    StringBuilder ret = new StringBuilder();
    for (int i = input.toCharArray().length - 1; i >= 0; i--) {
      ret.append(input.toCharArray()[i]);
    }
    return ret.toString();
  }
}
#END CODE;      

SELECT foo('abdc');

  • 嵌入式代碼塊可以置于USING後或腳本末尾,置于USING後的代碼塊作用域僅為CREATE TEMPORARY FUNCTION語句。
  • CREATE TEMPORARY FUNCTION建立的函數為臨時函數,僅在本次執行生效,不會存入MaxCompute的Meta系統。
CREATE TEMPORARY FUNCTION foo AS 'embedded.UDFTest' USING
#CODE ('lang'='PYTHON', 'filename'='embedded')
from odps.udf import annotate
@annotate("bigint->bigint")
class UDFTest(object):
  def evaluate(self, a):
    return a * a
#END CODE;      

SELECT foo(4);

  • Python代碼的縮進需要符合Python語言規範。
  • 由于注冊Python UDF時AS後的類名需要包含Python源碼的檔案名,您可以通過’filename’=’embedded’指定一個虛拟檔案名。

3.7使用SQL語言定義函數

create sql function my_sum(@a BIGINT, @b BIGINT, @c BIGINT) returns @my_sum BIGINT
as begin 
    @temp := @a + @b;
    @my_sum := @temp + @c;
end;      
create sql function my_func(@s STRING)
AS if(@s rlike '"git_(m|a)"', 1, 0);      

歡迎加入“MaxCompute開發者社群2群”,點選連結申請加入或掃描二維碼

https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
UDF精簡使用大全一、UDF的分類二、UDF參數解析三、UDF的使用方式