天天看點

StreamingPro添加Scala script 子產品支援

sql 在解析字元串方面,能力還是有限,因為支援的算子譬如substring,split等有限,且不具備複雜的流程表達能力。我們内部有個通過json描述的dsl引擎友善配置化解析,然而也有一定的學習時間成本。

我們當然可以通過sql的 udf函數等來完成字元串解析,在streamingpro中也很簡單,隻要注冊下你的udf函數庫即可:

這樣你就可以在sql中使用mlfunctions裡面所有的udf函數了。然而為此專門提供一個jar包也是略顯麻煩。

這個時候如果能直接寫腳本解析就好了,最好是能支援各種腳本,比如groovy,javascript,python,scala,java等。任何一個會程式設計的人都可以實作一個比較複雜的解析邏輯。

核心是scriptcompositor子產品:

如果我想在代碼裡直接處理所有的列,則如下:

通過添加usedocmap為true,則你在代碼裡可以通過doc(doc是個map[string,any]) 來擷取你想要的任何字段,然後形成一個新的map。

如果你隻要新生成map裡的字段,忽略掉舊的,則設定ignoreoldcolumns=true 即可。

你可以把代碼放到一個檔案裡,如下:

通過inputtablename指定輸入的表,outputtablename作為輸出結果表。 raw代表inputtablename中你需要解析的字段,然後通過你的scala腳本進行解析。在腳本中 rawline 是固定的,對應raw字段(其他字段也是一樣)的值。腳本隻有一個要求,最後的傳回結果暫時需要是個map[string,any]。

這裡,你隻是提供了一個map作為傳回值,作為一行,然後以outputtablename指定的名字輸出,作為下一條sql的輸入,是以streamingpro需要推測出你的schema。 資料量大到一定程度,推測schema的效率就得不到保證,這個時候,你可以通過配置schema來提升性能:

schema.scala的内容大緻如下:

後續roadmap是:

支援外部腳本,比如放在hdfs或者http伺服器上。

支援java 腳本

支援javascript腳本

支援 python 腳本

支援 ruby腳本

支援 groovy 腳本

舉個案例,從hdfs讀取一個檔案,并且映射為隻有一個raw字段的表,接着通過scriptcompositor配置的scala代碼解析raw字段,展開成a,b兩個字段,然後繼續用sql繼續處理,最後輸出。