場景
日志服務内置了20+類
SQL函數。面對使用者複雜的業務場景,例如使用json來沉澱業務資料,普通的SQL函數可能就無法滿足需求,需要一些使用者自定義處理邏輯。為了處理json類的業務資料,我們可以采用
把json展開成多行的形式進行統計分析,今天我們介紹使用UDF(
lambda)的方式來編寫自定義邏輯,處理json、array、map類型的資料。
資料樣例:
__source__: 11.164.232.105
__tag__:__hostname__: vm-req-170103232316569850-tianchi111932.tc
__topic__: TestTopic_4
array_column: [1,2,3]
double_column: 1.23
map_column: {"a":1,"b":2}
text_column: 商品
lambda函數對array類型的資料進行求均值
為了周遊每一個array元素,并且把計算所有元素的均值,我們通過reduce函數進行計算。
* | select array_column, reduce( cast( json_parse(array_column) as array(bigint)) , CAST(ROW(0.0, 0) AS ROW(sum DOUBLE, count INTEGER)) , (s,x) -> cast(row( x+ s.sum, s.count+1) as ROW(sum double, count INTEGER)), s -> IF(s.count = 0, NULL, s.sum / s.count))

reduce 函數的具體語義參考
文法文檔。參數分為四部分
-
表示輸入的數組資料cast( json_parse(array_column) as array(bigint))
-
定義起始狀态為一個複雜的row類型,分别記錄sum和countCAST(ROW(0.0, 0) AS ROW(sum DOUBLE, count INTEGER))
- 對每一個元素,計算累加值,
s代表已經有的狀态,x代表新輸入的元素,計算結果通過cast強制定義為row類型(s,x) -> cast(row( x+ s.sum, s.count+1) as ROW(sum double, count INTEGER))
- 最後對最終狀态,計算avg值,
。s代表最終狀态。s -> IF(s.count = 0, NULL, s.sum / s.count)
對所有行的array元素求avg:
* | select sum(rows.sum ) / sum(rows.count) from(
select array_column, reduce( cast( json_parse(array_column) as array(bigint)) , CAST(ROW(0.0, 0) AS ROW(sum DOUBLE, count INTEGER)) , (s,x) -> cast(row( x+ s.sum, s.count+1) as ROW(sum double, count INTEGER)), s -> s) as rows from log
)
通過子查詢的方式,先reduce每一行的array的sum 和count。之後在嵌套查詢中,求所有行的sum和count,最後相除求avg: