本文是pig系统分析系列中的最后一篇了,主要讨论如何扩展pig功能,不仅介绍pig本身提供的udfs扩展机制,还从架构上探讨pig扩展可能性。
补充说明:前些天同事发现twitter推动的pig on spark项目:,准备研究下。
通过udfs(用户自定义函数),可以自定义数据处理方法,扩展pig功能。实际上,udfs除了使用之前需要register/define外,和内置函数没什么不同。
以内置的abs函数为例:
函数都继承evalfunc接口,泛型参数double代表返回类型。
exec方法:输入参数类型为元组,代表一行记录。
outputschema方法:用于处理输入和输出schema
getargtofuncmapping:用于支持各种数据类型重载。
evalfuc方法也能实现聚合函数,这是因为group操作对每个分组都返回一条记录,每组中包含一个bag,所以exec方法中迭代处理bag中记录即可。
以count函数为例:
如前所述,具备algebraic性质的聚合函数在map-reduce过程中能被combiner优化。直观来理解,具备algebraic性质的函数处理过程能被分为三部分:initial(初始化,处理部分输入数据)、intermediate(中间过程,处理初始化过程的结果)和final(收尾,处理中间过程的结果)。比如count函数,初始化过程为count计数操作,中间过程和收尾为sum求和操作。更进一步,如果函数在这三个阶段中都能进行相同的操作,那么函数具备distributive性质,比如sum函数。
pig提供了algebraic 接口:
其中每个方法都返回evalfunc实现类的名称。继续以count函数为例,count实现了algebraic接口,针对以下语句:
pig会重写mr执行计划:
algebraic 接口通过combiner优化减少数据传输量,而accumulator接口则关注的是内存使用量。udf实现accumulator接口后,pig保证所有key相同的数据(通过shuffle)以增量的形式传递给udf(默认pig.accumulative.batchsize=20000)。同样,count也实现了accumulator接口。
通过udfs构造函数传递数据是最简单的方法,然后通过define语句定义udf实例时指定构造方法参数。但有些情况下,比如数据在运行期才产生,或者数据不能用string格式表达,这时候就得使用udfcontext了。udf通过getudfcontext方法获取保存在threadloacl中的udfcontext实例。udfcontext包含以下信息:
jconf:hadoop configuration。
clientsysprops:系统属性。
hashmap<udfcontextkey,properties> udfconfs:用户自己保存的属性,其中udfcontextkey由udf类名生成。
pig哲学之三——pigs live anywhere。理论上,pig并不被限定运行在hadoop框架上,有几个可以参考的实现和提议。
pigen,pig on tez。,架构图如下:
pig的后端抽象层:。目前已经实现了piglatin运行在galago上。
pig官网:
pig paper at sigmod 2008:building a high level dataflowsystem on top of mapreduce:the pig experience
programming.pig:dataflow.scripting.with.hadoop(2011.9).alan.gates