天天看点

pig的安装和使用

1、  Pig简介

Pig是这样一个平台,它能够对由高级语言编写的数据 分析程序 组成的大型数据集 进行分析、评估。Pig程序最突出的优势是它的结构能够经受得住高度并行化的检验,这个特性让它能够处理大型的数据集。

    目前,Pig的底层由一个编译 器组成,它在运行 的时候会产生一些Map-Reduce程序序列,并且这里大规模的并行执行依然存在(例如,Hadoop 子工程)。当前,Pig的语言层是由一叫做Pig Latin的正文型语言组成,它有如下的特点:

Ø  易于编程:对于那些简单的并且不易并行的数据分析 任务达并不需要实现并行执行的目标。多重关联的数据转换任务需要明确地被编码成数据流序列,让他们易于被书写、理解和掌握。

Ø  最优化机会:任务被编码的方式允许系统执行的时候进行自动地优化,这就是说,用户更需要注意的是程序语义的设计而不是效率。

Ø  扩展性:用户能自定义函数来实现特殊的目的

2、  安装指导

http://pig.apache.org/docs/r0.8.1/setup.html

Hadoop 0.20.2 - http://hadoop.apache.org/common/releases.html

3、  下载路径

http://labs.renren.com/apache-mirror/pig/pig-0.8.1/

4、  解压

tar -zxvf pig-0.8.1.tar.gz

5、  修改~/.bash_profile,并source该文件:

PIG_INSTALL=/home/hadoop/pig_home/pig-0.8.1

export PIG_INSTALL

PATH=$PIG_INSTALL/bin:$PATH

export PATH

export PIG_HADOOP_VERSION=18

export PIG_CLASSPATH=$HADOOP_HOME/conf/

6、  尝试运行pig(两种模式:pig –x local和pig):

[hadoop@  pig-0.8.1]$ pig

2011-12-29 19:20:06,600 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/hadoop/pig_home/pig-0.8.1/pig_1325157606596.log

2011-12-29 19:20:06,942 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000

2011-12-29 19:20:07,068 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001

grunt> quit

7、  简单测试

[hadoop@  pig_test]$ hadoop fs -put pig_data.txt /user/hadoop/yangkai/

[hadoop@  pig_test]$ hadoop fs -cat /user/hadoop/yangkai/pig_data.txt

a       1

b       2

c       3

d       4

[hadoop@  pig_test]$ pig

2012-01-04 10:25:54,029 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/hadoop/pig_test/pig_1325643954025.log

2012-01-04 10:25:54,280 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000

2012-01-04 10:25:54,452 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001

grunt>

--加载数据

grunt> records = LOAD '/user/hadoop/yangkai/pig_data.txt'

>> AS (ch:chararray, in:int);

--输出数据                        

grunt> DUMP records;                                    

2012-01-04 10:30:09,384 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN

2012-01-04 10:30:09,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - pig.usenewlogicalplan is set to true. New logical plan will be used.

2012-01-04 10:30:09,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: records: Store(hdfs://localhost:9000/tmp/temp-752614598/tmp-129419475:org.apache.pig.impl.io.InterStorage) - scope-18 Operator Key: scope-18)

2012-01-04 10:30:09,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false

2012-01-04 10:30:09,408 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1

2012-01-04 10:30:09,408 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1

2012-01-04 10:30:09,427 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job

2012-01-04 10:30:09,428 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3

2012-01-04 10:30:11,196 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job

2012-01-04 10:30:11,200 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.

2012-01-04 10:30:11,433 [Thread-22] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

2012-01-04 10:30:11,434 [Thread-22] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

2012-01-04 10:30:11,443 [Thread-22] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1

2012-01-04 10:30:11,702 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete

2012-01-04 10:30:12,458 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201112291909_0003

2012-01-04 10:30:12,458 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://localhost:50030/jobdetails.jsp?jobid=job_201112291909_0003

2012-01-04 10:30:22,606 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete

2012-01-04 10:30:27,152 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete

2012-01-04 10:30:27,153 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Script Statistics:

HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features

0.20.2  0.8.1   hadoop  2012-01-04 10:30:09     2012-01-04 10:30:27     UNKNOWN

Success!

Job Stats (time in seconds):

JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs

job_201112291909_0003   1       0       6       6       6       0       0       0       records MAP_ONLY        hdfs://localhost:9000/tmp/temp-752614598/tmp-129419475,

Input(s):

Successfully read 4 records (16 bytes) from: "/user/hadoop/yangkai/pig_data.txt"

Output(s):

Successfully stored 4 records (43 bytes) in: "hdfs://localhost:9000/tmp/temp-752614598/tmp-129419475"

Counters:

Total records written : 4

Total bytes written : 43

Spillable Memory Manager spill count : 0

Total bags proactively spilled: 0

Total records proactively spilled: 0

Job DAG:

job_201112291909_0003

2012-01-04 10:30:27,157 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

2012-01-04 10:30:27,166 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

2012-01-04 10:30:27,166 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

(a,1)

(b,2)

(c,3)

(d,4)

grunt> DESCRIBE records;

records: {ch: chararray,in: int}

8、  进一步使用

[hadoop@  pig_test]$ vim pig_data.txt

a       1

b       2

c       3

d       4

a       5

a       6

c       7

 [hadoop@  pig_test]$ hadoop fs -rm /user/hadoop/yangkai/pig_data.txt

Deleted hdfs://localhost:9000/user/hadoop/yangkai/pig_data.txt

[hadoop@  pig_test]$ hadoop fs -put pig_data.txt /user/hadoop/yangkai/             

[hadoop@  pig_test]$ pig

。。。。。。

grunt> records = LOAD '/user/hadoop/yangkai/pig_data.txt'

>> AS (ch:chararray, in:int);                       

grunt> DESCRIBE records;                                

records: {ch: chararray,in: int}

--过滤

grunt> filtered_records = FILTER records BY ch != 'd';

grunt> DUMP filtered_records;

。。。。。。

(a,1)

(b,2)

(c,3)

(a,5)

(a,6)

(c,7)

--group by

grunt> grouped_records = GROUP filtered_records BY ch;

grunt> DUMP grouped_records;

(a,{(a,1),(a,5),(a,6)})

(b,{(b,2)})

(c,{(c,3),(c,7)})

grunt> DESCRIBE grouped_records;

grouped_records: {group: chararray,filtered_records: {ch: chararray,in: int}}

--取最大值

grunt> max_in = FOREACH grouped_records GENERATE group, MAX(filtered_records.in);

grunt> DUMP max_in;

。。。。。。

(a,6)

(b,2)

(c,7)

--生成用例

grunt> ILLUSTRATE max_in;

2012-01-04 10:55:31,959 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000

2012-01-04 10:55:31,959 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001

2012-01-04 10:55:31,992 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

2012-01-04 10:55:31,992 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

-----------------------------------------------

| records     | ch: bytearray | in: bytearray |

-----------------------------------------------

|             | c             | 3             |

|             | d             | 4             |

|             | c             | 7             |

-----------------------------------------------

-----------------------------------------

| records     | ch: chararray | in: int |

-----------------------------------------

|             | c             | 3       |

|             | d             | 4       |

|             | c             | 7       |

-----------------------------------------

--------------------------------------------------

| filtered_records     | ch: chararray | in: int |

--------------------------------------------------

|                      | c             | 3       |

|                      | c             | 7       |

--------------------------------------------------

-------------------------------------------------------------------------------------------

| grouped_records     | group: chararray | filtered_records: bag({ch: chararray,in: int}) |

-------------------------------------------------------------------------------------------

|                     | c                | {(c, 3), (c, 7)}                               |

-------------------------------------------------------------------------------------------

-----------------------------------------

| max_in     | group: chararray | int   |

-----------------------------------------

|            | c                | 7     |

-----------------------------------------

--连接

[hadoop@  pig_test]$ cat A B

a       1

b       2

c       3

d       4

a       12

b       22

c       32

d       42

grunt> A = LOAD '/user/hadoop/yangkai/A';

grunt> B = LOAD '/user/hadoop/yangkai/B';

grunt> C = JOIN A BY $0, B BY $0;

(a,1,a,12)

(b,2,b,22)

(c,3,c,32)

(d,4,d,42)

--存储

grunt> STORE C INTO 'output/C'

。。。。。。

[hadoop@  pig_test]$ hadoop fs -cat /user/hadoop/output/C/part-r-00000

a       1       a       12

b       2       b       22

c       3       c       32

d       4       d       42

9、  编写说明

A、 语句结束必须用分号;

B、 双减号表示单行注释,注释多行

10、              Pig与sql比较

A、 pig是数据流编程语言,sql是描述型编程语言;

B、 sql的查询规划器把数据存储定义在了具有严格模式的表内,而pig对所处理数据的要求比较宽松,可以在运行时定义模式;

C、 pig不支持小部分数据的随机读写,sql支持;

D、 pig所有的写都是批量的、流式的写,sql不是;

11、              Pig Latin关系操作

类型 操作 描述
加载与存储 LOAD 将数据从外部文件或其它存储中加载数据,存入关系
STORE 将一个关系存放到文件系统或其它存储中
DUMP 将关系打印到控制台
过滤 FILTER 从关系中删除不需要的行
DISTINCT 从关系中删除重复的行

FOREACH…

GENERATE

对于集合的每个元素,生成或删除字段
STREAM 使用外部程序对关系进行变换
SAMPLE 从关系中随机取样
分组与连接 JOIN 连接两个或多个关系
COGROUP 在两个或多个关系中分组
GROUP 在一个关系中对数据分组
CROSS 获取两个或更多关系的乘积(叉乘)
排序 ORDER 根据一个或多个字段对某个关系进行排序
LIMIT 限制关系的元组个数
合并与分割 UNION 合并两个或多个关系
SPLIT 把某个关系切分成两个或多个关系

12、              Pig Latin的诊断操作

操作 描述
DESCRIBE 打印关系的模式
EXPLAIN 打印逻辑和物理计划
ILLUSTRATE 使用生成的输入子集显示逻辑计划的试运行结果

13、              Pig Latin UDF语句

REGISTER   在Pig运行时环境中注册一个JAR文件
DEFINE      为UDF、流式脚本或命令规范新建别名

14、              Hadoopmapreduce工具

kill    中止某个MapReduce任务
exec 在一个新的Grunt shell程序中以批处理模式运行一个脚本
run   在当前Grunt外壳程序中运行程序
quit  退出解释器
set   设置Pig选项

15、              Pig Latin表达式

类型 表达式 描述 示例
字段 $n 第n个字段 $0
字段 f 字段名f year
投影 c.$n, c.f 在关系、包或元组中的字段 records.$0, records.year
Map查找 m#k 在映射m中键k对应的值 items’Coat’
类型转换 (t)f 将字段t转换成f类型 (int)year
函数型平面化 fn(f1, f2, …) 在字段上应用函数fn isGood(quality)
FLATTEN(f) 从包和元组中去除嵌套 flatten(group)

其它的表达式,如算术、条件、比较和布尔型类似其它语言,不详述。

16、              Pig Latin类型

数据类型包括int (32位有符号整数), long(64位有符号整数), float(32位浮点数), double(64位浮点数), chararray(UTF16格式的字符数组), Bytearray(字节数组)。

三种嵌套类型:tuple(元组), bag(包), map(键值对):

Ø  tuple: (1, ‘hello’)         // 任何类型的字段序列

Ø  bag: {(1, ‘hello’), (2)}  //  元组的无序多重集合(允许重复元组)

Ø  map: [‘a’ ‘hello’]        // 一组键值对,键必须是字符数组

17、              模式(Schema)

Pig的一个关系可以有一个关联的模式,模式为关系的字段指定名称和类型。Pig的这种模式声明方式和SQL数据库要求数据加载前必须先声明模式截然不同,Pig设计的目的是用于分析不包含数据类型信息的纯文本输入文件的。但是尽量定义模式,会让程序运行地更高效。

SQL数据库在加载数据时,会强制检查表模式中的约束。在pig中,如果一个值无法被强制转换为模式中申明的类型,pig会用空值null代替,显示一个空位。大数据集普遍都有被损坏的值、无效值或意料之外的值,简单的方法是过滤掉无效值:

grunt>good_records = filter records by temperature is not null;

另一种技巧是使用SPLIT操作把数据划分成好和坏两个关系,然后在分别进行分析:

grunt> split records into good_records if temperature is not null,

                                     bad_records if temperature is null;

grunt> dump good_records;

在Pig中,不用为数据流中的每个新产生的关系声明模式。大多数情况下,Pig能够根据关系操作的输入关系的模式来确定输出结果的模式。有些操作不改变模式,如Limit。而Union会自动生成新的模式。

如果要重新定义一个关系的模式,可以使用带as子句的FOREACH…GENERATE操作来定义输入关系的一部分或全部字段的模式。

18、              函数

Pig的函数分为计算函数,过滤函数,加载函数和存储函数。

计算函数: AVG, COUNT, CONCAT, COUNTSTAR, DIFF, MAX, MIN, SIZE, SUM, TOKENIZE

过滤函数:IsEmpty

加载/存储函数:PigStorage, BinStorage, BinaryStorage, TextLoader, PigDump

19、              用户自定义函数(UDF)

public abstract class EvalFunc<T> {

  public abstract T exec(Tuple input) throws IOException;

}

输入元组的字段包含传递给函数的表达式,输出是泛型;对于过滤函数输入是元组类型、输出就是Boolean类型。实例如下:

public class IsEmpty extends FilterFunc {

    public boolean exec(Tuple input) throws IOException {

        return (input.getBagField(0) == 0);

    }

}

编写完自定义函数后,用ant pig打包到JAR文件,然后通过RIGISTER操作指定文件的本地路径,告诉pig这个JAR文件的信息;

Grunt>REGISTER pig-examples.jar;

 pi

继续阅读