天天看点

Hive之——Streaming

Hive中提供了多个语法来使用streaming,包括:MAP()、REDUCE()和TRANSFORM()。要注释的是MAP()实际上并非可以强制在map阶段执行streaming,正如reduce并非可以强制在reduce阶段执行streaming。因为这个原因,对于相同的功能,通常建议使用TRANSFORM()语句,这样可以避免误导读者对查询语句产生疑惑。

这里,我们创建一个表a来执行我们的streaming例子,如下:

hive> create table a (a int, b int) row format delimited fields terminated by '\t';
hive> load data local inpath '/root/a.txt';
hive> select * from a;
OK
4       5
3       2

hive> desc a;
OK
a                       int                                         
b                       int      

恒等变换

最基本的streaming job就是恒等运算。/bin/cat这个shell命令可以将传递给它的数据直接输出,所以满足恒等运算。

hive> select transform(a, b) using '/bin/cat' as newA, newB from a;
OK
4       5
3       2

hive> select transform(a,b) using '/bin/cat' as newA from a;
OK
4
3

hive> select transform(a) using '/bin/cat' as newA, newB from a;
4       NULL
3       NULL
Time taken: 12.955 seconds, Fetched: 2 row(s)      

改变类型

transform返回的字段数据类型默认是字符串类型。通过如下语法将类型转换成其他数据类型:

hive> select transform(a, b) using '/bin/cat' as (newA int, newB double) from a;
OK
4       5.0
3       2.0      

投影变换

Streaming中可以使用cut命令提取或者映射出特定的字段。换句话说,可以达到和select语句相同的行为:

hive> select transform(a,b) using '/bin/cut -f1' as newA, newB from a;
OK
4       NULL
3       NULL
hive> select transform(a,b) using '/bin/cut -f2' as newA, newB from a;
OK
5       NULL
2       NULL      

可以看出,查询从外部处理过程中返回的只有一个字段,而实际期望的是2个字段,因此,newB的值总是NULL,默认情况下,transform需要2个字段,不过实际上可以为比其小的任意个数的字段:

hive> select transform(a,b) using '/bin/cut -f1' as newA, newB from a;
OK
4       NULL
3       NULL

hive> select transform(a,b) using '/bin/cut -f2' as newA, newB from a;
OK
5       NULL
2       NULL

hive> select transform(a,b) using '/bin/cut -f1' as newA from a;
OK
4
3

hive> select transform(a,b) using '/bin/cut -f2' as newA from a;
OK
5
2      

操作转换

/bin/sed程序(对于Mac OS X系统是/usr/bin/sed)是一个流编辑器。可以接受输入数据流,按照用户的指定进行编辑,最后将编辑后的结果输出到输出流中。如下例子中将字符串"4"替换成了字符串"10":

hive> select transform(a, b) using '/bin/sed s/4/10' as newA, newB from a;
OK
10  5
3 2      

使用分布式内存

当一个查询所需要的文件没有在每个TaskTracker上事先安装好时,用户需要使用分布式缓存数据或者程序文件传输到集群中,然后在job完成后会清理掉这些数据和文件。缓存中会独立保存每个job的缓存文件,而不会相互干扰。

下面的例子是一个将摄氏温度转换成华氏温度的bash shell脚本ctof.sh:

#!/bin/bash
while read LINE
do
  res = $(echo "scale=2;((9/5) * $LINE) + 32" | bc)
  echo $res
done      

Hive的ADD FILE功能可以将文件加入到分布式缓存中,而焙增加的文件会被存储到每个task节点机器的当前工作目录下。这样可以使得transform task直接使用脚本而不用确定到哪里去找这些文件:

hive> add file ${env:HOME}/prog_hive/ctof.sh
Added resource: /home/binghe/prog_hive/ctof.sh

hive> select transform(a) using 'ctof.sh' as convert from a;
39.20
37.40      

由一行产生多行

可以使用Streaming对每个输入行产生多行输出。这个

给定一个输入文件 $HOME/kv_data.txt 内容如下:

k1=v1,k2=v2,k3=v3
k4=v4,k5=v5,k6=v6
k7=v7,k8=v8,k9=v9      

希望将这些数据以表格的形式展示,可以让常见的HiveQL操作符处理这些行:

k1  v1
k2  v2
k3  v3
k4  v4
k5  v5
k6  v6
k7  v7
k8  v8
k9  v9      

接下来我们创建Perl脚本,保存为/usr/local/src/split_kv.pl:

#/usr/bin/perl
while (<STDIN>){
  my $line = $_;
  chomp($line);
  my @kvs = split(/,/,$line);
  foreach my $p (@kvs){
    my @kv = split(/=/, $p);
    print $kv[0] . "\t" . $kv[1] . "\n"; 
  }
}      

接下来我们创建表kv_data,并将数据导入到表kv_data:

hive> create table kv_data(line string);
hive> load data local inpath '/usr/local/src/data.txt' into table kv_data;
hive> select * from kv_data;
OK
k1=v1,k2=v2,k3=v3
k4=v4,k5=v5,k6=v6
k7=v7,k8=v8,k9=v9      

接下来我们将split_kv.pl脚本导入的Hive中,并执行查询

hive> add file /usr/local/src/split_kv.pl;
Added resources: [/usr/local/src/split_kv.pl]

hive> select transform(line) using 'perl split_kv.pl' as (key, value) from kv_data;
OK
k1      v1
k2      v2
k3      v3
k4      v4
k5      v5
k6      v6
k7      v7
k8      v8
k9      v9      

使用Streaming进行聚合计算

可以使用streaming做类似于Hive的内置函数SUM一样的聚合运算。这是因为streaming处理过程可以对每行输入返回0或多行输出。

为了能在一个外部应用程序中完成聚合,脚本中在循环外面先定义了一个计算器,然后,循环内部不断从输入流中读取输入并进行计算,最终输出sum求和结果:

脚本: aggregate.pl

#!/usr/bin/perl
my $sum=0;
while (<STDIN>){
  my $line = $_;
  chomp($line);
  $sum=${sum} + ${line};
}
print $sum;      

接下来,我们创建表sum,导入整数数据,每行一个数据,用于测试:

数据文件int.txt

1
2
3
4
5
6
7
8
9      
hive> create table sum(number int);
hive> load data local inpath '/usr/local/src/int.txt' into table sum;
hive> select * from sum;
OK
1
2
3
4
5
6
7
8
9      

将streaming程序增加到分布式缓存中,并在transform查询中使用:

hive> add file /usr/local/src/aggregate.pl;
Added resources: [/usr/local/src/aggregate.pl]

hive> select transform(number) using 'perl aggregate.pl' as total from sum;
OK
45      

注意:不能像UDAF SUM()那样在单个查询中执行多个transform过程。如下:

hive> select sum(number) as one, sum(number) as two from sum;
OK
45      45      

cluster by、distribute by、sort by

具有相同键的数据需要分发到用一个处理节点上,或者数据需要按照指定列或指定函数进行排序。

可以使用cluster by语句,其可以确保类似的数据可以分发到同一个reduce task中,而且保证数据是有序的。

比如,这里我们通过transform方式实现Word Count算法。

我们需要创建两个Python脚本,一个将读取的文本的每行内容分割成单词;另一个脚本用于接受字频数据流以及单词的中间计数值(大多数是数字"1"),然后对每个单词的词频求和汇总。

第一个脚本mapper.py,按照空格将每行内容分割成单词:

import sys
for line in sys.stdin:
    words = line.strip().split()
    for word in words:
        print "%s\t1" % (word.lower())      

第二个脚本reducer.py,需要缓存当前处理的单词,以及这个单词出现的次数,当处理下一个单词时,这个脚本需要输出上一个单词的词频数,然后重置缓存.

import sys
(last_key, last_count) = (None, 0)
for line in sys.stdin:
    (key, count) = line.strip().split("\t")
    if last_key and last_key != key:
        print "%s\t%d" % (last_key, last_count)
        (last_key, last_count) = (key, int(count))
    else:
        last_key = key
        last_count += int(count)
if last_key:
    print "%s\t%d" % (last_key, last_count)      

建立我们的测试数据集文件test.txt

hello world word
hello test
test liuyazhuang
hello binghe
test binghe
word count      

接下来,我们在Hive中进行处理:

hive> create table docs(line string);
hive> load data local inpath '/usr/local/src/test.txt' into table docs;
hive> create table word_count(word string, count int) row format delimited fields terminated by '\t';

hive> add file /usr/local/hive-2.3.4/mapper.py;
Added resources: [/usr/local/hive-2.3.4/mapper.py]

hive> add file /usr/local/hive-2.3.4/reducer.py;
Added resources: [/usr/local/hive-2.3.4/reducer.py]

select transform(line) using 'mapper.py' as word, count from docs;
from (
from docs
select transform(line) using 'mapper.py'
as word, count
cluster by word) wc
insert overwrite table word_count
select transform(wc.word, wc.count) using 'reducer.py'
as word, count;      

替代cluster by的最方便的方式就是使用distribute by和sort by。场景是:用户期望将数据按照某个字段划分,然后按照另一个字段排序。

实际上,cluster by word等价于distribute by word sort by word asc。

如下transform查询的字频输出结果是按照降序排序的:

from (
from docs
select transform(line) using 'mapper.py'
as word, count
distribute by word sort by word desc) wc
insert overwrite table word_count
select transform(wc.word, wc.count) using 'reducer.py'
as word, count;      

使用cluster by 或者结合使用sort by的distribute by很重要,因为如果没有这些指示,Hive可能无法合理并行执行job,所有的数据可能都会分发到同一个reducer,这样会导致整体job执行时间延长。

GenericMR Tools for Streaming to Java

使用Java编写streaming也是可以的,而且Hive中包含了GenericMR API视图为streaming提供类似于Hadoop API 这样的接口:

from(
from src 
map value, key
using 'java -cp hive-contrib-3.1.1.jar org.apache.hadoop.hive.contrib.mr.example.IdentityMapper' 
as k, v 
cluster by k) map_output
reduce k, v 
using 'java -cp hive-contrib-3.1.1.jar org.apache.hadoop.hive.contrib.mr.example.WordCountReduce' 
as k, v;      

我们先了解一下GenericMR提供的接口。其中Mapper接口用于实现常见的Mapper实现方法,其提供了一个map放大,输入是字符串数组类型的String[]的列值:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hive.contrib.mr;

/**
 * Mapper.
 */
public interface Mapper {
  /**
   * Maps a single row into an intermediate rows.
   * 
   * @param record
   *          input record
   * @param output
   *          collect mapped rows.
   * @throws Exception
   *           on error
   */
  void map(String[] record, Output output) throws Exception;
}      

IndentityMapper不会对输入数据做任何改变而会将其传递给收集器(collector)。在功能上与/bin/cat一致。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hive.contrib.mr.example;

import org.apache.hadoop.hive.contrib.mr.GenericMR;
import org.apache.hadoop.hive.contrib.mr.Mapper;
import org.apache.hadoop.hive.contrib.mr.Output;

/**
 * Example Mapper (Identity).
 */
public final class IdentityMapper {

  public static void main(final String[] args) throws Exception {
    new GenericMR().map(System.in, System.out, new Mapper() {
      @Override
      public void map(final String[] record, final Output output) throws Exception {
        output.collect(record);
      }
    });
  }

  private IdentityMapper() {
    // prevent instantiation
  }
}      

Reducer 接口提供了第一列字符串,而其他列可以通过记录迭代器获得。每次迭代都会返回一堆字符串,其中第0个元素是重复的键而其下一次元素是值。output对象表示输出结果:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hive.contrib.mr;

import java.util.Iterator;

/**
 * Simple reducer interface.
 */
public interface Reducer {
  /**
   * Reduce.
   * 
   * Note that it is assumed that the key is the first column. Additionally, the
   * key will be repeated as the first column in the records[] array.
   * 
   * @param key
   *          key (first column) for this set of records.
   * @param records
   *          Iterator of records for this key. Note that the first column of
   *          record will also be the key.
   * @param output
   * @throws Exception
   */
  void reduce(String key, Iterator<String[]> records, Output output)
      throws Exception;
}      

WordCountReducer类中有一个累加器用于对记录迭代器中的每个元素进行计数。当所有记录都被计数后,就会生成一个由键及其对应次数所组成的数组作为结果:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hive.contrib.mr.example;

import java.util.Iterator;

import org.apache.hadoop.hive.contrib.mr.GenericMR;
import org.apache.hadoop.hive.contrib.mr.Output;
import org.apache.hadoop.hive.contrib.mr.Reducer;

/**
 * Example Reducer (WordCount).
 */
public final class WordCountReduce {

  private WordCountReduce() {
    // prevent instantiation
  }

  public static void main(final String[] args) throws Exception {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      public void reduce(String key, Iterator<String[]> records, Output output)
          throws Exception {
        int count = 0;

        while (records.hasNext()) {
          // note we use col[1] -- the key is provided again as col[0]
          count += Integer.parseInt(records.next()[1]);
        }

        output.collect(new String[] {key, String.valueOf(count)});
      }
    });
  }
}      

计算cogroup

在MapReduce程序中会对多数据集进行JOIN连接处理,然后使用TRANSFORM进行处理。使用UNION ALL 和 CLUSTER BY,我们可以实现GROUP BY操作的常见效果。

from(
from(
from order_log ol
--User Id, Order Id, and timestamp:
select ol.userid as uid, ol.orderid as id, av.ts as ts
union all
from clicks_log cl
select cl.userid as uid, cl.id as id, ac.ts as ts
)union_msgs
select union_msgs.uid, union_msgs.id, union_msgs.ts 
cluster by union_msgs.uid, union_msgs.ts
)map
insert overwrite table log_analysis 
select transform(map.uid, map.id, map.ts) using 'reduce_script' 
as (uid, id, ...);