天天看點

利用 Spark DataSource API 實作Rest資料源

先說下這個需求的來源。通常在一個流式計算的主流程裡,會用到很多映射資料,譬如某某對照關系,而這些映射資料通常是通過http接口暴露出來的,尤其是外部系統,你基本沒有辦法直接通過jdbc去讀庫啥的。

上面是一個點,其次是從http讀到的json資料,我其實需要做扁平化處理的。現在如果sql作用于json資料可以解決簡單的嵌套問題,但是更複雜的方式是沒有太大辦法的。

比如下面格式的:

最好能展開成這種格式才能夠被主流程直接join使用:

是以為了實作同僚的需求,我需要第一将rest接口的擷取方式用标準的datasource api 來實作,其次提供一個能夠做如上合并規則的子產品,并且允許配置。

先看看datasource api 的樣子:

如果做成配置化則是:

defaultsource的實作

定義

這是比較典型的命名規範。rest 代表支援的是rest作為接口,json則代表rest接口的資料是json格式的,包的命名讓人一目了然。

先看看defaultsource繼承的兩個接口

datasourceregister

該接口隻有一個shortname 方法。我們看到上面的包名是很長的,你可以給一個更簡短的名字:

是以具體實作就變成了:

這個接口也隻有一個方法:

其傳回值baserelation對象描述了資料源和spark sql互動。createrelation方法允許你根據使用者定義的參數parameters 建立一個合适的baserelation的實作類。

其實除了這個,還有一些攜帶更多資訊的繼承自relationprovider的類,譬如:

在我們的實作裡,隻要實作基礎的relationprovider就好。

我們來看下defaultsource.createrelation的具體代碼:

源碼中已經做了說明。這裡restjsonrelation是整個核心,它實作了spark sql 和資料源的互動。restjsonrelation繼承自baserelation,tablescan等基類

這些參數是你随便定義的。當然,url,xpath,smaplingratio具體的含義在上一章節都提到了。

和資料源進行互動有兩個必要的資訊需要擷取:

1.schema 資訊。隻有兩種方法:使用者告知你,或者程式自己根據資料推導。關于schema資訊這塊,baserelation還提供了幾個基礎的約定:

needconversion,是否需類型轉換,因為spark sql内部的表示是row,裡面的資料需要特定的類型,比如string會被轉化成utf8string。預設為true,官方也是說不要管他就好。

unhandledfilters, 傳回一些資料源沒有辦法pushdown的filter。這樣解析器就知道可以在spark内部做filter了。否則spark 會傻傻的以為你做了過濾,然後資料計算結果就錯了。

2.資料掃描的方法。 目前spark sql 提供了四種

tablescan 全表掃描

prunedscan  可以指定列,其他的列資料源可以不用傳回

prunedfilteredscan 指定列,并且還可以加一些過濾條件,隻傳回滿足條件的資料。這個也就是我們常說的資料源下沉(pushdown)操作。

catalystscan 和prunedfilteredscan類似,支援列過濾,資料過濾,但是接受的過濾條件是spark 裡的expression。 理論上會更靈活些。話說在spark源碼)裡(1.6.1版本),我沒有看到這個類的具體實作案例。這裡我們隻要實作一個簡單的tablescan就可以了,因為拿的是字典資料,并不需要做過濾。

baserelation是需要你給出schema的。這裡我們會先定義一個dataschema的lazy屬性,這樣防止schema方法被反複調用而反複推導。

因為我們是根據資料推導schema,是以首先要擷取資料。我們定義一個方法:

inputpaths 我沿用了檔案系統的概念,其實在我們這裡就是一個url。我們知道,最終spark sql 的直接資料源都是rdd的。是以這裡我們傳回的也是rdd[string]類型。具體實作很簡單,就是通過httpclient根據inputpaths拿到資料之後makerdd一下就可以了。

有了這個類就能擷取到資料,就可以做schema推導了:

inferschema的實作邏輯比較複雜,但最終就是為了傳回structtype(fields: array[structfield]) 這麼個東西。我是直接拷貝的spark json datasource的實作。有興趣的可以自己參看。structtype其實也很簡單了,無非就是一個描述schema的結構,類似你定義一張表,你需要告訴系統字段名稱,類型,是否為null等一些列資訊。

現在我們終于搞定了資料表結構了。

剛才我們說了資料擷取的四種類型,我們這裡使用的是tablescan,繼承自該接口隻要實作一個buildscan方法就好:

其本質工作就是把json格式的string根據我們前面已經拿到的schema轉化為row格式。

具體做法如下:

這裡的代碼還是比較清晰易懂的。但是 convertfield(factory, parser, schema) 直接match 到  internalrow 還是比較讓人困惑的,一個字段轉換咋就變成了internalrow了呢?這裡确實也有乾坤的。我們進去看看convertfield方法:

如果你的json是個map,經過n次比對case後會進入最後一個case 情況。這裡的st:structtype 就是我們之前自己推導出來的dataschema. convertobject 方法如下:

到這裡就真相大白了。為了能夠拿到一條完整的資料,他會while循環直到遇到end_object 。所謂end_object 其實就是一個map 結束了。 在每一次循環裡,拿到一個字段,然後通過名字去schema裡擷取類型資訊,然後再回調convertfield方法将這個字段轉化為row需要的類型,比如字元串類型的就通過utf8string進行轉換。

case (value_string, stringtype) =>  utf8string.fromstring(parser.gettext)

得到的值通過row的函數進行更新,這裡是 row.update 方法。到end_object後,就完成了将一個json map 轉化為一條row的功能了。

到目前為止,我們已經完成了具體的工作了。現在你已經可以按如下的方式使用:

spark datasource api的提出,給spark 建構生态帶來了巨大的好處。各個存儲系統可以實作統一标準的接口去對接spark。學會使用自己實作一個datasoure是的你的存儲可以更好的和生态結合,也能得到更好的性能優化。