本文是pig系統分析系列中的最後一篇了,主要讨論如何擴充pig功能,不僅介紹pig本身提供的udfs擴充機制,還從架構上探讨pig擴充可能性。
補充說明:前些天同僚發現twitter推動的pig on spark項目:,準備研究下。
通過udfs(使用者自定義函數),可以自定義資料處理方法,擴充pig功能。實際上,udfs除了使用之前需要register/define外,和内置函數沒什麼不同。
以内置的abs函數為例:
函數都繼承evalfunc接口,泛型參數double代表傳回類型。
exec方法:輸入參數類型為元組,代表一行記錄。
outputschema方法:用于處理輸入和輸出schema
getargtofuncmapping:用于支援各種資料類型重載。
evalfuc方法也能實作聚合函數,這是因為group操作對每個分組都傳回一條記錄,每組中包含一個bag,是以exec方法中疊代處理bag中記錄即可。
以count函數為例:
如前所述,具備algebraic性質的聚合函數在map-reduce過程中能被combiner優化。直覺來了解,具備algebraic性質的函數處理過程能被分為三部分:initial(初始化,處理部分輸入資料)、intermediate(中間過程,處理初始化過程的結果)和final(收尾,進行中間過程的結果)。比如count函數,初始化過程為count計數操作,中間過程和收尾為sum求和操作。更進一步,如果函數在這三個階段中都能進行相同的操作,那麼函數具備distributive性質,比如sum函數。
pig提供了algebraic 接口:
其中每個方法都傳回evalfunc實作類的名稱。繼續以count函數為例,count實作了algebraic接口,針對以下語句:
pig會重寫mr執行計劃:
algebraic 接口通過combiner優化減少資料傳輸量,而accumulator接口則關注的是記憶體使用量。udf實作accumulator接口後,pig保證所有key相同的資料(通過shuffle)以增量的形式傳遞給udf(預設pig.accumulative.batchsize=20000)。同樣,count也實作了accumulator接口。
通過udfs構造函數傳遞資料是最簡單的方法,然後通過define語句定義udf執行個體時指定構造方法參數。但有些情況下,比如資料在運作期才産生,或者資料不能用string格式表達,這時候就得使用udfcontext了。udf通過getudfcontext方法擷取儲存在threadloacl中的udfcontext執行個體。udfcontext包含以下資訊:
jconf:hadoop configuration。
clientsysprops:系統屬性。
hashmap<udfcontextkey,properties> udfconfs:使用者自己儲存的屬性,其中udfcontextkey由udf類名生成。
pig哲學之三——pigs live anywhere。理論上,pig并不被限定運作在hadoop架構上,有幾個可以參考的實作和提議。
pigen,pig on tez。,架構圖如下:
pig的後端抽象層:。目前已經實作了piglatin運作在galago上。
pig官網:
pig paper at sigmod 2008:building a high level dataflowsystem on top of mapreduce:the pig experience
programming.pig:dataflow.scripting.with.hadoop(2011.9).alan.gates