Java_ODPS-D2-離線數倉-4-自定義函數UDTF,一進多出,處理複雜事件json串
原始資料和結果都自行想象吧 或者 翻翻筆記p89 p92
FuntionStudio
- 建立一個項目gmall_udtf,運作環境選udfjava
- 建立一個FlatEventUDTF
- pom.xml中加入fastjson依賴
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28.odps</version>
</dependency>
- 編輯udtf,導包
package com.alibaba.dataworks.udtf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
// TODO define input and output types, e.g. "string,string->string,bigint".
@Resolve({"string->bigint,string,string"})
public class FlatEventUDTF extends UDTF {
@Override
public void setup(ExecutionContext ctx) throws UDFException {
}
@Override
public void process(Object[] args) throws UDFException {
// TODO
String event = (String) args[0]; //傳進來唯一的et字串
JSONArray jsonArray= JSON.parseArray(event); //把字串json格式化
// 循環周遊出每一塊,再把每一塊中的三個字段解析出來
for(int i=0;i<jsonArray.size();i++ ){
JSONObject jsonObject = jsonArray.getJSONObject(i);
String ett = jsonObject.getString("ett");
String eventName = jsonObject.getString("en");
String eventJson = jsonObject.getString("kv");
// 傳回的資料 注意輸出字段類型
forward(Long.parseLong(ett),eventName,eventJson);
}
}
@Override
public void close() throws UDFException {
}
}
DataStudio
- 送出到DataStudio後,測試一下
--測試FlatEventUDTF自定義函數 一進多出
SELECT
FLATEVENTUDTF(GET_JSON_OBJECT(log_string,'$.et')) as (event_time,event_name,event_json) --$.et 就是事件
FROM ods_base_log
WHERE ds='00000000';
-
UDTF在sql中的實際應用
列轉行
LATERAL VIEW FLATEVENTUDTF(GET_JSON_OBJECT(log_string,’$.et’)) event_view AS event_time,event_name,event_json
--手動将ods層資料導入到dwd層
INSERT OVERWRITE TABLE dwd_start_log PARTITION(ds,hh,mm)
SELECT GET_JSON_OBJECT(log_string,'$.cm.mid') mid
,GET_JSON_OBJECT(log_string,'$.cm.uid') user_id
,GET_JSON_OBJECT(log_string,'$.cm.vc' ) version_code
,GET_JSON_OBJECT(log_string,'$.cm.vn') version_name
,GET_JSON_OBJECT(log_string,'$.cm.l') lang
,GET_JSON_OBJECT(log_string,'$.cm.sr') source
,GET_JSON_OBJECT(log_string,'$.cm.os') os
,GET_JSON_OBJECT(log_string,'$.cm.ar') area
,GET_JSON_OBJECT(log_string,'$.cm.md') model
,GET_JSON_OBJECT(log_string,'$.cm.ba') brand
,GET_JSON_OBJECT(log_string,'$.cm.sv') sdk_version
,GET_JSON_OBJECT(log_string,'$.cm.hw') height_width
,GET_JSON_OBJECT(log_string,'$.cm.g') email
,GET_JSON_OBJECT(log_string,'$.cm.hw') sv
,GET_JSON_OBJECT(log_string,'$.cm.ln') ln
,GET_JSON_OBJECT(log_string,'$.cm.la') la
,GET_JSON_OBJECT(event_view.event_json,'$.entry') entry
,GET_JSON_OBJECT(event_view.event_json,'$.loading_time') loading_time
,GET_JSON_OBJECT(event_view.event_json,'$.action') action
,GET_JSON_OBJECT(event_view.event_json,'$.open_ad_type') open_ad_type
,GET_JSON_OBJECT(event_view.event_json,'$.detail') detail
,event_view.event_time
,ds
,hh
,mm
FROM ods_base_log
LATERAL VIEW FLATEVENTUDTF(GET_JSON_OBJECT(log_string,'$.et')) event_view AS event_time,event_name,event_json
WHERE ds = '00000000'
AND event_view.event_name = 'start'
;
--檢視導入結果
SELECT * from dwd_start_log WHERE ds='20200308' LIMIT 3;
-- mid user_id version_code version_name lang source os area model brand sdk_version email height_width network lng lat entry open_ad_type action loading_time detail event_time ds hh mm
-- 0 0 9 1.0.7 es V 8.1.1 MX HTC-0 HTC V2.5.9 640*960 [email protected] 640*960 -112.5 29.2 3 13 1 2 1583531265269 00000000 01 15