MaxCompute(原ODPS)是阿裡雲自主研發的具有業界領先水準的分布式大資料處理平台, 尤其在集團内部得到廣泛應用,支撐了多個BU的核心業務。 MaxCompute除了持續優化性能外,也緻力于提升SQL語言的使用者體驗和表達能力,提高廣大ODPS開發者的生産力。
MaxCompute基于ODPS2.0新一代的SQL引擎,顯著提升了SQL語言編譯過程的易用性與語言的表達能力。我們在此推出MaxCompute(ODPS2.0)重裝上陣系列文章
- 第一彈 - 善用MaxCompute編譯器的錯誤和警告
- 第二彈 - 新的基本資料類型與内建函數
- 第三彈 - 複雜類型
- 第四彈 - CTE,VALUES,SEMIJOIN
- 第五彈 - SELECT TRANSFORM
- 第六彈 - User Defined Type
- 第七彈 - Grouping Set, Cube and Rollup
第五彈向您介紹了MaxCompute如何嵌入其他語言的腳本。SELECT TRANSFORM的優勢在于可以不建立function甚至不上傳資源的情況下執行其他語言的腳本,而即使需要編寫資源也沒有任何由MaxCompute規定的格式要求和依賴。
本文将介紹另一種将這一優勢提升到更高層次的新功能:User Defined Type,簡稱UDT。
-
場景1
某個功能通過其他語言既可非常簡單的實作,如用java預計隻需要一次内置類的方法調用就可以實作,但MaxCompute沒有合适的内置函數實作這一功能。為如此簡單的功能寫一個UDF非常繁瑣,體驗很差。
-
場景2
SELECT TRANSFORM能夠讓我直接把腳本寫到sql語句中,大大提升了代碼的可讀性(UDF為黑盒,而直接寫在sql裡面的腳本,功能一目了然)和維護性(不需要commit多個代碼檔案,特别是sql和其他的腳本檔案存放的repository還不一樣)。但是某些語言無法這麼用,比如java源代碼必須經過編譯才能執行,那麼有沒有辦法能夠讓這些語言享受相同的優勢?
-
場景3
sql中需要調用第三方庫來實作相關功能。希望能夠在SQL裡面直接調用,而不需要再wrap一層UDF。
上述場景的問題,通過UDT能夠非常好地解決,接下來将具體介紹UDT使用。
本文中很多例子采用 MaxCompute Studio 作展示,沒有安裝MaxCompute Studio的使用者,可以參照文檔 安裝MaxCompute Studio ), 導入測試MaxCompute項目,建立工程 。
功能簡介
MaxCompute中的UDT(User Defined Type)功能支援在SQL中直接引用第三方語言的類或者對象,擷取其資料内容或者調用其方法 。
在其他的SQL引擎中也有UDT的概念,但是和MaxCompute的概念有許多差異。很多SQL引擎中的概念比較像MaxCompute的struct複雜類型。而某些語言提供了調用第三方庫的功能,如Oracle 的 CREATE TYPE。相比之下,MaxCompute的UDT更像這種CREATE TYPE的概念,Type中不僅僅包含資料域,還包含方法。而且MaxCompute做的更徹底:開發者不需要用特殊的DDL文法來定義類型的映射,而是在SQL中直接使用。
一個簡單的例子如下:
set odps.sql.type.system.odps2=true; -- 打開新類型,因為下面的操作會用到 Integer,即 int類型
SELECT java.lang.Integer.MAX_VALUE;
上面的例子輸出:
+-----------+
| max_value |
+-----------+
| 2147483647 |
+-----------+
和java語言一樣,java.lang這個package是可以省略的。是以上面例子更可以簡寫為:
set odps.sql.type.system.odps2=true;
SELECT Integer.MAX_VALUE;
可以看到,上面的例子在select清單中直接寫上了類似于java表達式的表達式,而這個表達式的确就按照java的語義來執行了。這個例子表現出來的能力就是MaxCompute的UDT。
UDT所提供的所有擴充能力,實際上用UDF都可以實作。譬如上面的例子,如果使用UDF實作,需要做下列操作。
首先,定義一個UDF的類:
package com.aliyun.odps.test;
public class IntegerMaxValue extends com.aliyun.odps.udf.UDF {
public Integer evaluate() {
return Integer.MAX_VALUE;
}
}
然後,将上面的UDF編譯,并打成jar包。然後再上傳jar包,并建立function
add jar odps-test.jar;
create function integer_max_value as 'com.aliyun.odps.test.IntegerMaxValue' using 'odps-test.jar';
最後才可以在sql中使用
select integer_max_value();
UDT相當于簡化了上述一系列的過程,讓開發者能夠輕松簡單地用其他語言擴充SQL的功能。
上述例子表現的是java靜态域通路的能力,而UDT的能力遠不限于此。譬如下面的例子:
-- 示例資料
@table1 := select * from values ('100000000000000000000') as t(x);
@table2 := select * from values (100L) as t(y);
-- 代碼邏輯
@a := select new java.math.BigInteger(x) x from @table1; -- new建立對象
@b := select java.math.BigInteger.valueOf(y) y from @table2; -- 靜态方法調用
select /*+mapjoin(b)*/ x.add(y).toString() from @a a join @b b; -- 執行個體方法調用
上述例子輸出結果 100000000000000000100。
這個例子還表現了一種用UDF比較不好實作的功能:子查詢的結果允許UDT類型的列。例如上面變量a的x列是java.math.BigInteger類型,而不是内置類型。UDT類型的資料可以被帶到下一個operator中再調用其他方法,甚至能參與資料shuffle。比如上面的例子,在MaxCompute studio中的執行圖如下:

可以看出圖中共有三個STAGE: M1, R2 和 J3。熟悉MapReduce原理的使用者會知道,由于join的存在需要做資料reshuffle,是以會出現多個stage。一般情況下,不同stage不僅是在不同程序,甚至是在不同實體機器上運作的。輕按兩下代表M1的方塊,顯示如下:
可以看到,M1僅僅執行了 new java.math.BigInteger(x) 這個操作。而同樣點開代表J3的方塊,可以看到 J3 在不同的階段執行了 java.math.BigInteger.valueOf(y) 的操作,和 x.add(y).toString() 的操作:
這幾個操作不僅僅是分階段執行的,甚至是在不同程序,不同實體機器上執行的。但是UDT把這個過程封裝起來,讓使用者看起來和在同一個JVM中執行的效果幾乎一樣。
UDT同樣允許使用者上傳自己的jar包,并且直接引用。如上面UDF的jar包。用UDT來使用:
set odps.sql.type.system.odps2=true;
set odps.sql.session.resources=odps-test.jar; --指定要引用的jar,這些jar一定要事先上傳到project,并且需要是jar類型的資源
select new com.aliyun.odps.test.IntegerMaxValue().evaluate();
如果覺得寫 package全路徑麻煩,還可以像java的import一樣,用flag來指定預設的package。
set odps.sql.type.system.odps2=true;
set odps.sql.session.resources=odps-test.jar;
set odps.sql.session.java.imports=com.aliyun.odps.test.*; -- 指定預設的package
select new IntegerMaxValue().evaluate();
詳細說明
- 目前UDT 隻支援java語言。
- 提供一些提升使用效率的flag:
- odps.sql.session.resources :指定引用的資源,可以指定多個,用英文逗号隔開:
注意這個flag和SELECT TRANSFORM中指定資源的flag相同,是以這個flag會同時影響SELECT TRANSFORM和UDT兩個功能。set odps.sql.session.resources=foo.sh,bar.txt;
- odps.sql.session.java.imports :指定預設的package,可以指定多個,用逗号隔開。和java的import語句類似,可以提供完整類路徑,如 java.math.BigInteger,也可以使用
。暫不支援static import。*
- odps.sql.session.resources :指定引用的資源,可以指定多個,用英文逗号隔開:
- UDT支援的操作包括:
- 執行個體化對象的new操作。
- 執行個體化數組的new操作,包括使用初始化清單建立數組,如
new Integer[] { 1, 2, 3 }
- 方法調用,包括靜态方法調用(是以能用工廠方法建構對象).
- 域通路,包括靜态域。
- 注意:
- 隻支援公有方法和共有域的通路。
- UDT中的辨別符是大小寫敏感的,包括package,類名,方法名,域(field)名。
- UDT支援類型轉換,但限于SQL形式,如 cast(1 as java.lang.Object)。不支援java形式的類型轉換,如(Object)1。
- 暫不支援匿名類和lambda表達式(後續版本可能會支援)。
- 暫不支援無傳回值的函數調用(這個是因為UDT都是出現在expression中,沒有傳回值的函數調用無法嵌入到expression中,這個問題在後續的版本中可能會有解決方案)。
- Java SDK 的類都是預設可用的。但是需要注意目前runtime使用的JDK版本是JDK1.8,比該版本更新的JDK功能可能不支援。
- 需要特别注意的是, 所有的運算符都是MaxCompute SQL的語義,不是UDT的語義 。如
String.valueOf(1) + String.valueOf(2)
的結果是 3 (string隐式轉換為double,并且double相加),而不是'12' (java中string相加是concatenate的語義)。
除了string的相加操作比較容易混淆外,另一個比較容易混淆的是
操作。SQL中的=
不是指派 而是判斷相等。而對于java對象來說,判斷相等應該用equals方法,通過等号判斷的相等無法保證其行為(在UDT場景下,同一對象的概念是不能保證的,具體原因參考下述第8點)。=
- 内置類型與特定java類型有一一映射關系,見 UDF類型映射 。這個映射在UDT也有效:
- 内置類型的資料能夠直接調用其映射到的Java類型的方法,如
'123'.length() , 1L.hashCode()
- UDT類型能夠直接參與内置函數或者UDF的運算, 如
,其中 Long.valueOf 傳回的是 java.lang.Long 類型的資料,而内置函數chr接受的資料類型是内置類型BIGINT。chr(Long.valueOf('100'))
- Java的primitive類型可以自動轉化為其boxing類型,并應用上面兩條規則
- 注意:某些内置類型是需要
才能使用的。否則會報錯。set odps.sql.type.system.odps2=true;
- 内置類型的資料能夠直接調用其映射到的Java類型的方法,如
- UDT對泛型有比較完整的支援,如
,編譯器能夠根據參數類型知道該方法的傳回值是java.util.Arrays.asList(new java.math.BigInteger('1'))
java.util.List<java.math.BigInteger>
類型
注意構造函數需要指定類型參數,否則使用
,這一點和java保持一緻:java.lang.Object
new java.util.ArrayList(java.util.Arrays.asList('1', '2'))
的結果是
java.util.ArrayList<Object>
類型;
而
new java.util.ArrayList<String>(java.util.Arrays.asList('1', '2'))
java.util.ArrayList<String>
類型。
- UDT對 "同一對象" 的概念是模糊的。這是由資料的reshuffle導緻的。從上面第一部分的join的示例可以看出,對象有可能會在不同程序,不同實體機器之間傳輸,在傳輸過程中同一個對象的兩個引用後面可能分别引用了不同的對象(比如對象先被shuffle到兩台機器,然後下次又shuffle回一起)。
- 在使用UDT的時候,應該避免使用
來判斷相等,而是使用= operator
方法。equals
- 某行某列的對象,其内部包含的各個資料對象的相關性是可以保證的。不能保證的是不同行或者不同列的對象的資料相關性。
- 在使用UDT的時候,應該避免使用
-
目前UDT不能用作shuffle key:包括join,group by,distribute by,sort by, order by, cluster by 等結構的key
并不是說UDT不能用在這些結構裡面,UDT可以在expression中間的任意階段使用,隻是不能作為最終輸出。比如雖然不能
,但是可以group by new java.math.BigInteger('123')
。因為hashCode的傳回值是int.class類型可以當做内置類型int來使用(應上述“内置類型與特定java類型映射”的規則)。group by new java.math.BigInteger('123').hashCode()
注意:這個限制未來的版本會計劃去掉。
- UDT擴充了類型轉換規則:
- UDT對象能夠被隐式類型轉換為其基類對象。
- UDT對象能夠被強制類型轉換為其基類或子類對象。
- 沒有繼承關系的兩個對象之間遵守原來的類型轉換規則,注意這時候可能會導緻内容變化,比如java.lang.Long類型的資料是可以強制轉換為java.lang.Integer的,應用的是内置類型的bigint強制轉換為int的過程,而這個過程會真的導緻資料内容的變化,甚至可能會有精度損失。
- 目前UDT對象不能落盤。這意味着不能将UDT對象insert到表中(實際上DDL不支援UDT,建立不出來這樣的表),當然,隐式類型轉換變成了内置類型的除外。同時,屏顯的最終結果也不能是UDT類型,對于屏顯的場景,由于所有的java類都有toString()方法,而
類型是合法的。是以debug的時候,可以用這種方法來觀察UDT的内容。java.lang.String
- 可以設定
這樣MaxCompute會自動把所有的以UDT為最終輸出的列wrap上set odps.sql.udt.display.tostring=true;
,進而友善調試。這個flag隻對屏顯語句生效,對insert語句不生效,是以專門用在調試中。java.util.Objects.toString(...)
- 内置類型支援binary或者string類型,是以可自定義實作serialize的過程,将byte[]的資料落盤。下次讀出來的時候再還原回來。見後面的例子
- 某些類可能自帶序列化和反序列化的方法,如protobuffer。目前UDT依舊支援落盤,還是需要自行調用序列化反序列化方法,變成binary資料類型來落盤。
- 可以設定
- UDT不僅能夠實作scalar函數的功能,配合着内置函數collect_list和explode(doc),完全能夠實作 aggregator和table function的功能。
更多示例
使用Java數組
set odps.sql.type.system.odps2=true;
set odps.sql.udt.display.tostring=true;
select
new Integer[10], -- 建立一個10個元素的數組
new Integer[] {c1, c2, c3}, -- 通過初始化清單建立一個長度為3的數組
new Integer[][] { new Integer[] {c1, c2}, new Integer[] {c3, c4} }, -- 建立多元數組
new Integer[] {c1, c2, c3} [2], -- 通過下标操作通路數組元素
java.util.Arrays.asList(c1, c2, c3); -- 這個建立了一個 List<Integer>,這個也能當做array<int>來用,是以這是另一個建立内置array資料的方法
from values (1,2,3,4) as t(c1, c2, c3, c4);
JSON使用者的福音
UDT的runtime自帶一個gson的依賴(2.2.4)。是以使用者可以直接使用gson
set odps.sql.type.system.odps2=true;
set odps.sql.session.java.imports=java.util.*,java,com.google.gson.*; -- 同時import多個package,用逗号隔開
@a := select new Gson() gson; -- 建構gson對象
select
gson.toJson(new ArrayList<Integer>(Arrays.asList(1, 2, 3))), -- 将任意對象轉成 json 字元串
cast(gson.fromJson('["a","b","c"]', List.class) as array<string>) -- 反序列化json字元串, 注意gson的接口,直接反序列化出來是List<Object>類型,是以這裡強轉成了 List<String>,友善後續使用
from @a;
相比于get_json_object,上述用法不僅僅是使用友善了,在需要對json字元串多個部分做内容提取時,先将gson字元串反序列成格式化資料,其效率要高得多。
除了GSON, MaxCompute runtime自帶的依賴還包括: commons-logging(1.1.1), commons-lang(2.5), commons-io(2.4),protobuf-java(2.4.1)。
複雜類型操作
内置類型array和map 與 java.util.List 和 java.util.Map 存在映射關系。結果就是:
- Java中實作了java.util.List 或者 java.util.Map 接口的類的對象,都可以參與MaxComputeSQL的複雜類型操作。
- MaxCompute 中 array, map的資料,能夠直接調用 List 或者 Map 的接口。
set odps.sql.type.system.odps2=true;
set odps.sql.session.java.imports=java.util.*;
select
size(new ArrayList<Integer>()), -- 對 ArrayList資料調用内置函數size
array(1,2,3).size(), -- 對内置類型array調用 List的方法
sort_array(new ArrayList<Integer>()), -- 對 ArrayList 的資料進行排序
al[1], -- 雖然java的List不支援下标操作,但是别忘了array是支援的
Objects.toString(a)), -- 過去不支援将array類型cast成string,現在有繞過方法了
array(1,2,3).subList(1, 2) -- 求subList
from (select new ArrayList<Integer>(array(1,2,3)) as al, array(1,2,3) as a) t;
還可以實作一些特殊的功能,比如 array的distinct
select cast (new java.util.ArrayList<Long>(new java.util.HashSet<Long>(array(1L, 2L, 2L))) as array<bigint>); -- 輸出 [1, 2]
聚合操作的實作
UDT實作聚合的原理是,先用COLLECT_SET 或 COLLECT_LIST 函數将資料轉變成 List, 之後對該List應用UDT的标量方法求得這一組資料的聚合值。
如用下面的示例實作對BigInteger求中位數(由于資料是 java.math.BigInteger類型的,是以不能直接用内置的median函數)
set odps.sql.session.java.imports=java.math.*;
@test_data := select * from values (1),(2),(3),(5) as t(value);
@a := select collect_list(new BigInteger(value)) values from @test_data; -- 先把資料聚合成list
@b := select sort_array(values) as values, values.size() cnt from @a; -- 求中位數的邏輯,先将資料排序
@c := select if(cnt % 2 == 1, new BigDecimal(values[cnt div 2]), new BigDecimal(values[cnt div 2 - 1].add(values[cnt div 2])).divide(new BigDecimal(2))) med from @b;
-- 最終結果
select med.toString() from @c;
由于collect_list會先把所有資料都收集到一塊,是沒有辦法實作partial aggregate的,是以這個做法的效率會比内置的aggregator或者udaf低,是以 在内置aggregator能實作的情況下,應盡量使用内置的aggregator 。同時把一個group的所有資料都收集到一起的做法,會增加資料傾斜的風險。
但是另一方面,如果UDAF本身的邏輯就是要将所有資料收集到一塊(比如類似wm_concat的功能),此時使用上述方法,反而可能比UDAF(注意不是内置aggregator)高。
表值函數的實作
表值函數允許輸入多行多列資料,輸出多行多列資料。可以按照下述原理實作:
- 對于輸入多行多列資料,可以參考聚合函數實作的示例。
- 要實作多行的輸出,可以讓UDT方法輸出一個Collection類型的資料(List 或者 Map),然後調用explode函數,将Collections展開成多行。
- UDT本身就可以包含多個資料域,通過調用不同的getter方法來擷取各個域的内容即可展開成多列。
下述示例實作将一個json字元串的内容展開出來的功能
@a := select '[{"a":"1","b":"2"},{"a":"1","b":"2"}]' str; -- 示例資料
@b := select new com.google.gson.Gson().fromJson(str, java.util.List.class) l from @a; -- 反序列化json
@c := select cast(e as java.util.Map<Object,Object>) m from @b lateral view explode(l) t as e; -- 用explode打成多行
@d := select m.get('a') as a, m.get('b') as b from @c; -- 展開成多列
select a.toString() a, b.toString() b from @d; -- 最終結果輸出(注意變量d的輸出中a, b兩列是Object類型)
讀取資源檔案
我們知道在UDF中可以通過ExecutionContext對象來讀取資源檔案。現在UDT也可以通過
com.aliyun.odps.udt.UDTExecutionContext.get()
方法來或者這樣的一個 ExecutionContext 對象。
下述示例将資源檔案 1.txt 讀取到一個string對象中,并輸出:
set odps.sql.session.resources=1.txt;
select new String(com.aliyun.odps.udt.UDTExecutionContext.get().readResourceFile('1.txt')) text;
UDT對象持久化
UDT對象預設是不支援落盤的。但是有方法能夠把UDT的對象持久化。基本的思想是将資料序列化成為binary或者string來做持久化,或者将udt對象展開,持久化裡面的能轉成内置類型的關鍵資料。
如下UDT定義:
public class Shape
{
public List<Point> points;
public Shape(List<Point> points)
{
this.points = points;
}
}
public class Point
{
public int x;
public int y;
public Point(int x, int y)
{
this.x = x;
this.y = y;
}
}
将對象展開成内置類型:
@data := select key, shape from ...;
@exploded := select key, point from @data lateral view explode(shape.points) t as point;
@expanded := select key, point.x, point.y from @exploded;
insert into table points select * from @expanded;
需要用時再重新構造:
select key, new Shape(collect_list(new Point(x, y))) as shape from points group by key;
或者将對象serialize成binary。
平展開的最大問題是,序列化和反序列化的麻煩。當然可以直接轉成binary。如改造Shape類:
-- 改造 Shape 類
public com.aliyun.odps.data.Binary serialize() {
ByteBuffer buffer = ByteBuffer.allocate(points.size() * 8 + 4);
buffer.putInt(points.size());
for (Point point : points) {
buffer.putInt(point.x);
buffer.putInt(point.y);
}
return new com.aliyun.odps.data.Binary(buffer.array());
}
public static Shape deserialize(com.aliyun.odps.data.Binary bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes.data());
int size = buffer.getInt();
List<Point> points = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
points.add(new Point(buffer.getInt(), buffer.getInt()));
}
return new Shape(points);
}
-- 需要持久化的時候,調用serialize()
select key, shape.serialize() data from ...
-- 需要讀取的時候,調用deserialize方法
select key, Shape.deserialize(data) as Shape from ...
如果直接利用已有的架構,也許會更友善。如 Shape 是用 ProtoBuffer 定義的
-- shape 的定義
message Point
{
required int32 x = 1;
required int32 y = 2;
}
message Shape
{
repeated Point points = 1;
}
SQL中直接調用pb的方法
select key, new com.aliyun.odps.data.Binary(shape.toByteArray()) from ...
MaxCompute Studio的支援
本功能和 MaxCompute Studio 搭配着使用,才能發揮其最大的價值。
- MaxCompute Studio 智能提示能夠大大提升編碼效率。
- MaxCompute Studio 的類型推導過程,能讓您知道某個表達式是什麼類型。
MaxCompute - ODPS重裝上陣 第六彈 - User Defined Type功能簡介詳細說明更多示例MaxCompute Studio的支援功能,性能與安全性總結 - MaxCompute Studio 的實時文法檢查,能快速定位問題文法問題。
MaxCompute - ODPS重裝上陣 第六彈 - User Defined Type功能簡介詳細說明更多示例MaxCompute Studio的支援功能,性能與安全性總結
功能,性能與安全性
功能方面,UDT的優勢是顯而易見的:
- 使用簡單,不需要定義任何function。
- 支援JDK的所有功能,進而擴充了SQL的能力。
- 代碼直接和SQL放在一塊,便于管理。
- 其它類庫拿來即用,代碼重用率高。
- 可以使用面向對象的思想設計某些功能。
在性能方面,UDT執行過程和UDF非常接近,其性能與UDF幾乎是一緻的,而且産品針對UDT做了很多優化,在某些場景下UDT的性能甚至略高一籌:
- 對象在一個程序内實際上是不需要做列化反序列化的,隻有跨程序的時候才需要。簡單地說,就是在沒有join或者aggregator等需要做資料reshuffle的情況下,UDT并沒有序列化反序列化的開銷。
- UDT的Runtime實作是基于codegen,而不是反射,是以不會存在反射帶來的性能損失
- 連續的多個UDT的操作,實際上會合并在一起,在一個FunctionCall裡一起執行,如上述例子中
這個看似存在多次UDT方法調用的操作,實際上隻有一次調用。是以雖然UDT操作的單元都比較小,但是并不會是以造成多次函數調用的接口上的額外開銷。values[x].add(values[y]).divide(java.math.BigInteger.valueOf(2))
在安全控制方面,UDT和UDF完全一樣。即都會受到沙箱policy的限制。是以如果要使用受限的操作,需要打開沙箱隔離,或者申請沙箱白名單。
總結
本文從使用的角度介紹了UDT的功能。UDT能夠在SQL中直接寫java的表達式,并可以引用jdk中的類。這一功能極大地友善擴充SQL的功能。
當然,UDT的功能還有許多功能還有待完善。文中也提到了幾點有待完善的功能:
- 支援無傳回值的函數調用(或者有傳回值,但是忽略傳回值,直接取操作數本身,如調用List的add方法,結束後傳回執行完add操作的List)。
- 支援匿名類和lambda表達式。
- 支援用作shuffle key。
- 支援JAVA外的其他語言,如python。