天天看点

Hadoop离线_Hive的基本操作

文章目录

    • 1 创建数据库与创建数据库表
        • 1.1 创建数据库的相关操作
        • 1.2 创建数据库表的相关操作
        • 1.3 四种常见的表模型
        • 1.4 加载数据方式推荐
    • 2 Hive查询语法
        • 2.1 格式
        • 2.2 常用
        • 2.3 常用函数
        • 2.4 LIMIT
        • 2.5 LIKE和RLIKE
        • 2.6 GROUP BY
        • 2.7 HAVING
        • 2.8 JOIN多表连接
        • 2.9 ORDER BY(全局排序)
        • 2.10 SORT BY(局部排序)
        • 2.11 DISTRIBUTE BY(分区排序)
        • 2.12 CLUSTER BY
    • 3 Hive Shell参数
        • 3.1 Hive命令行
        • 3.2 Hive参数配置方式
    • 4 Hive函数
        • 4.1 内联函数
        • 4.2 自定义函数
        • 4.3 UDF开发实例
    • 5 开启Map输出阶段和Reduce输出阶段压缩
    • 6 Hive的数据存储格式
        • 6.1 支持的存储格式
        • 6.2 列式存储和行式存储

1 创建数据库与创建数据库表

1.1 创建数据库的相关操作

  1. 创建数据库:

    CREATE TABLE IF NOT EXISTS myhive;

  2. 创建数据库并指定hdfs存储位置:

    CREATE TABLE myhive LOCATION '/myhive';

  3. 修改数据库属性:

    ALTER DATABASE myhive SET DBPROPERITIES ('createtime'='20191201');

  4. 查看数据库基本信息:

    DESC DATABASE myhive2;

  5. 查看数据库更多详细信息:

    DESC DATABASE EXTENDED myhive2;

  6. 删除数据库:

    DROP DATABASE myhive2

    ;
  7. 强制删除数据库:

    DROP DATABASE myhive2 CASCADE

    ;

1.2 创建数据库表的相关操作

基本格式:

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
[(col_name data_type [COMMENT col_comment], ...)] 
[COMMENT table_comment] 
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
[CLUSTERED BY (col_name, col_name, ...) 
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 
[ROW FORMAT row_format] 
[STORED AS file_format] 
[LOCATION hdfs_path]
           
字段参考:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types

1.3 四种常见的表模型

  • 管理表(内部表):

    又称为内部表,作出删除表的操作时,HDFS上对应的数据也会被同步删除

  1. 创建管理表: CREATE TABLE IF NOT EXISTS stu(id INT,name STRING);

创建完成后,表的数据会存放在HDFS的/user/hive/warehouse/你选择的数据库中

由hive-site.xml配置文件中的一个属性指定

<name>hive.metastore.warehouse.dir</name>

<value>/user/hive/warehouse</value>

如果在创建的表时指定分隔符和存放位置需要用到关键字。hive当中的默认分隔符:\001(键盘打不出来,因为是asc码值,非打印字符,这么做是为了避免分隔符的冲突)
  1. 创建指定分隔符的管理表:

    CREATE TABLE IF NOT EXISIS stu2( id INT,name STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/user/stu2';

  2. 复制表结构和表中的数据:

    CREATE TABLE stu3 AS SELECT * FROM stu2;

  3. 只复制表的结构,不复制表的数据:

    CREATE TABLE stu4 LIKE stu2;

  4. 查询表的类型:

    DESC FORMATTED stu2;

  • 外部表:

    外部表是指定其他的HDFS路径的数据加载到表当中来,所以hive表会认为自己不完全独占这份数据,所以删除hive表的时候,数据仍然存放在HDFS当中,不会删掉

  1. 创建外部表:添加关键字EXTERNAL
CREATE EXTERNAL TABLE student(
s_id STRING,
s_name STRING,
s_birth STRING,
s_sex STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
           
  1. 从本地文件向表中导入数据:
  1. 从本地文件向表中导入并覆盖原有数据:
  1. 从HDFS文件系统向表中加载数据:
如果删掉student外部表,HDFS的数据仍然存在,并且重新创建表之后,表中就直接存在数据了,
  • 分区表:

    一般没有一种独立的表模型,只有内部分区表,或者外部分区表。核心思想是分治。比如三个月的数据表,可以分区到每个月、每周甚至每天一个区,这样需要确切某一天的数据时,找起来会很方便

  1. 创建分区表:

    内部分区表:

CREATE TABLE score(
s_id STRING,c_id STRING,s_score INT)
PARTITIONED BY (MONTH STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
           

外部分区表:

CREATE EXTERNAL TABLE score4(
s_id STRING,c_id STRING,s_score INT)
PARTITIONED BY (month='201806')
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
           
  1. 创建一个表带多个分区:
CREATE TABLE score2(
s_id STRING,c_id STRING,s_score INT)
PARTITIONED BY (YEAR STRING,MONTH STRING,DAY STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

           
  1. 向分区表中导入数据:

    LOAD DATA LOCAL INPATH '/export/servers/hivedatas/score.csv' INTO TABLE score PARTITION (MONTH='201806');

  2. 向一个多分区表中导入数据:

    LOAD DATA LOCAL INPATH '/export/servers/hivedatas/score.csv' INTO TABLE score2 PARTITION (YEAR='2018',MONTH='06',DAY='01');

  3. 查看分区:

    SHOW PARTITIONS score;

  4. 添加一个分区或者多个分区:

    ALTER TABLE score ADD PARTITION(month='201805');

    ALTER TABLE score ADD PARTITION(month='201804') PARTITION(month='201803');

  5. 删除分区:

    ALTER TABLE score DROP PARTITION(month='201806');

  • 分桶表:

    一般也是与内部表或者外部表搭配使用,就是将数据按照字段进行划分,可以将数据按照字段划分到多个文件当中去

  1. 创建分桶表
CREATE TABLE course(
c_id STRING,c_name STRING,t_id STRING)
CLUSTERED BY (c_id) INTO 3 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
           
  1. 给分桶表添加数据:

    INSERT OVERWRITE TBALE course SELECT * FROM course_common CLUSTER BY (c_id);

  2. 修改表的名字:

    ALTER TABLE score4 RENAME score5;

  3. 增加或修改表内列信息:

    ALTER TABLE score5 ADD columns(mycol STRING,mysco STRING);

    ALTER TABLE score CHANGE column mysco mysconew INT;

1.4 加载数据方式推荐

  1. 直接向分区表中插入数据:

    INSERT INTO TABLE score3 PARTITION(month='201807') VALUES('001','002','003');

    (不推荐用该方法)
  2. 通过查询插入数据:

    (1)

    LOAD DATA LOCAL INPATH '/export/servers/hivedatas/score.csv' OVERWRITE INTO TABLE score PARTITION(month='201806');

    (2)

    INSERT OVERWRITE TABLE score PARTITION(month='201806') SELECT s_id,c_id,s_score FROM score;

    (关键字overwrite必须要有)
  3. 多插入模式:
FROM score
INSERT OVERWRITE TABLE score_first PARTITION(month='201806') 
SELECT s_id,c_id,s_score
INSERT OVERWRITE TABLE score_second PARTITION(month='201806') 
SELECT s_id,c_id,s_score
           

2 Hive查询语法

2.1 格式

SELECT [ALL | DISTINCT] select_expr, select_expr, ... 
FROM table_reference
[WHERE where_condition] 
[GROUP BY col_list [HAVING condition]] 
[CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY| ORDER BY col_list]] 
[LIMIT number]
           

2.2 常用

  1. 全表查询:

    SELECT * FROM socre;

  2. 指定列查询:

    SELECT s_id,c_id FROM score;

  3. 指定别名查询:

    SELECT s_id AS studentID,c_id FROM score;

2.3 常用函数

  1. SELECT COUNT(1) FROM score;

    总行数
  2. SELECT MAX(s_score) FROM score;

    求指定字段最大值
  3. SELECT MIN(s_score) FROM score;

    求指定字段最小值
  4. SELECT SUM(s_score) FROM score;

    求指定字段总和
  5. SELECT AVG(s_score) FROM score;

    求指定字段平均值

2.4 LIMIT

  1. SELECT * FROM score LIMIT 3

    返回指定行数的数据

2.5 LIKE和RLIKE

  1. SELECT * FROM score WHERE s_score LIKE '8%';

    查找以8开头的指定字段
  2. SELECT * FROM score WHERE s_score LIKE '_9%;'

    查找第二个数值是9的指定字段
  3. SELECT * FROM score WHERE s_score RLIKE '[9]';

    查找成绩中含有9的指定字段

2.6 GROUP BY

  1. SELECT s_id,AVG(s_score) FROM score GROUP BY s_id;

    按照s_id进行分组,并求出每个学生的平均分数==(GROUP BY的字段必须是SELECT后面的字段,SELECT后面的字段不能比GROUP BY后面的字段多)==

2.7 HAVING

HAVING和WHERE的区别:

  1. WHERE是首次筛选,是在产生结果之前进行筛选
  2. HAVING是二次筛选,是在产生结果后对结果再次筛选
  3. WHERE后面不能写分组函数,HAVING可以
  4. HAVING只和GROUP BY一起用
SELECT s_id,AVG(s_score) AS avgScore FROM score
GROUP BY s_id
HAVING avgScore>85;
           

2.8 JOIN多表连接

(Hive只支持等值连接,不支持非等值连接)

==INNER JOIN(内连接):==只有进行连接的两个表中都存在与连接条件相匹配的数据才会被保留下来。

SELECT * FROM teacher AS t
INNER JOIN course AS c
ON t.t_id=c.t_id;
           

LEFT JOIN(左外连接): JOIN操作符左边表中符合WHERE子句的所有记录将会被返回。

SELECT * FROM teacher AS t
LEFT JOIN course AS c
ON t.t_id=c.t_id;
           

RIGHT JOIN(右外连接): JOIN操作符右边表中符合WHERE子句的所有记录将会被返回。

SELECT * FROM teacher AS t
RIGHT JOIN course AS c
ON t.t_id=c.t_id;
           

FULL JOIN(满外连接): 将会返回所有表中符合WHERE语句条件的所有记录。如果任一表的指定字段没有符合条件的值的话,那么就使用NULL值替代。

SELECT * FROM teacher AS t
FULL JOIN course AS c
ON t.t_id=c.t_id;

           

多表连接: 连接 n个表,至少需要n-1个连接条件。

SELECT * FROM teacher AS t
LEFT JOIN course AS c
ON t.t_id=c.t_id
LEFT JOIN score AS s
ON s.c_id=c.c_id
LEFT JOIN student AS stu
ON s.s_id=stu.s_id;

           

2.9 ORDER BY(全局排序)

SELECT * FROM student AS stu
LEFT JOIN score AS s
ON stu.s_id=s.s_id
ORDER BY s.s_score DESC|ASC;
           

多个列

SELECT s_id,AVG(s_score) AS avgScore
FROM score
GROUP BY s_id
ORDER BY s_id,avgScore;

           
只启动一个reduce

2.10 SORT BY(局部排序)

每个MapReduce内部进行排序,对全局结果集来说不是排序。

set mapreduce.job.reduces=3 设置reduce个数

  1. 按照降序查看成绩 :

    SELECT * FROM score SORT BY s_score;

  2. 按照降序的成绩结果导入到文件中:

    INSERT OVERWRITE LOCAL DIRECTORY '/export/servers/hivedatas/sort' SELECT * FROM score SORT BY s_score;

2.11 DISTRIBUTE BY(分区排序)

类似MR中partition,进行分区,结合sort by使用,

并且要将DISTRIBUTE BY的语句写在SORT BY之前。

(对于distribute by进行测试,一定要分配多reduce进行处理,否则无法看到distribute by的效果)

先按照学生id进行分区,再按照学生成绩进行排序

set mapreduce.job.reduces=7

先设置reduce的个数为7个

通过DISTRIBUTE BY 进行分区,在通过SORT BY排序

INSERT OVERWRITE LOCAL DIRECTORY '/export/servers/hivedatas/sort'
SELECT * FROM score
DISTRIBUTE BY s_id
SORT BY s_score;
           

2.12 CLUSTER BY

当DISTRIBUTE BY和SORT BY字段相同时,可以使用CLUSTER BY方式。

CLUSTER BY除了具有DISTRIBUTE BY的功能外还兼具SORT BY的功能。但是排序只能是倒序排序,不能指定排序规则为ASC或者DESC。

SELECT * 
FROM score 
CLUSTER BY s_id; 
相当于 
SELECT * 
FROM score 
DISTRIBUTE BY s_id 
SORT BY s_id;
           

3 Hive Shell参数

3.1 Hive命令行

格式:

hive [-hiveconf x=y] * [<-i filename>]* [<-f filename>|<-e query-string>] [-S]

参数 功能
-i 从文件初始化HQL
-e 从命令行执行指定的HQL
-f 执行HQL脚本
-v 输出执行的HQL语句到控制台
-hiveconf 设置hive运行时候的参数配置

3.2 Hive参数配置方式

  1. Hive参数参考:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

    对于一般参数,有三种设定方式:

  2. 配置文件:

    用户自定义配置会覆盖默认配置(用户自定义配置文件:

    $HIVE_CONF_DIR/hive-site.xml/

    默认配置文件:

    $HIVE_CONF_DIR/hive-default.xml

    )。另外,Hive也会读入Hadoop的配置,因为Hive是作为Hadoop的客户端启动的,Hive的配置会覆盖Hadoop的配置。配置文件的设定对本机启动的所有Hive进程都有效。
  3. 命令行参数:

    启动Hive(客户端或Server方式)时,可以在命令行添加

    -hiveconf param=value

    来设定参数。例如

    bin/hive -hiveconf hive.root.logger=INFO,console

    。命令行参数只对本次启动的Session(对于Server方式启动,则是所有请求的Sessions)有效。
  4. 参数声明:

    可以在HQL中使用SET关键字设定参数,例如:

    set mapred.reduce.tasks=100;

    。这一设定的作用域也是session级的。
优先级:参数声明 > 命令行参数 > 配置文件参数(hive)

4 Hive函数

4.1 内联函数

参考文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

SHOW FUNCTIONS;

查看系统自带的函数

DESC FUNCTION UPPER

显示自带函数的用法

DESC FUNCTION EXTENDED UPPER

显示更为详细的自带函数的用法

4.2 自定义函数

官方文档: https://cwiki.apache.org/confluence/display/Hive/HivePlugins

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数.

自定义函数可以分为三种:

UDF(User Defined Function)

:一进一出类型的,比如大小写转换

UDAF(User Defined Aggregation Function)

:聚集函数,多进一出,比如求最值和总数等;

UDTF(User Defined Table-Generating Function)

:一进多出,比如lateral view explore()

4.3 UDF开发实例

  1. 创建maven java工程,导入jar包
<repositories>
    <repository>
        <id>cloudera</id>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.1.0-cdh5.14.0</version>
    </dependency>
</dependencies>
<build>
<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.0</version>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <encoding>UTF-8</encoding>
        </configuration>
    </plugin>
     <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
         <version>2.2</version>
         <executions>
             <execution>
                 <phase>package</phase>
                 <goals>
                     <goal>shade</goal>
                 </goals>
                 <configuration>
                     <filters>
                         <filter>
                             <artifact>*:*</artifact>
                             <excludes>
                                 <exclude>META-INF/*.SF</exclude>
                                 <exclude>META-INF/*.DSA</exclude>
                                 <exclude>META-INF/*/RSA</exclude>
                             </excludes>
                         </filter>
                     </filters>
                 </configuration>
             </execution>
         </executions>
     </plugin>
</plugins>
</build>

           
  1. 开发java类继承UDF,并重载evaluate方法
package cn.itcast.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class ItcastUDF extends UDF {
    public Text evaluate(final Text s) {
        if (null == s) {
            return null;
        }
        return new Text(s.toString().toUpperCase());
    }
}
           
  1. 将项目打包,并上传到hive的lib目录下
  2. 添加jar包

    重命名jar包:

    mv original-day06_hive-1.0-SNAPSHOT.jar udf.jar

    把jar包添加到hive客户端:

    add jar /export/servers/hive-1.1.0-cdh5.14.0/lib/udf.jar;

  3. 关联自定义函数

    create temporary function touppercase AS 'cn.itcast.udf.ItcastUDF';

  4. 使用自定义函数

    SELECT touppercase('abc');

5 开启Map输出阶段和Reduce输出阶段压缩

开启map输出阶段压缩可以减少job中MapTask和ReduceTask间数据传输量。

  1. 开启hive中间传输数据压缩功能:

    set hive.exec.compress.intermediate=true;

  2. 开启mapreduce中map输出压缩功能:

    set mapreduce.map.output.compress=true;

  3. 设置mapreduce中map输出数据的压缩方式:

    set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

  4. 开启hive最终输出数据压缩功能:

    set hive.exec.compress.output=true;

  5. 开启mapreduce最终输出数据压缩:

    set mapreduce.output.fileoutputformat.compress=true;

  6. 设置mapreduce最终数据输出压缩方式:

    set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;

  7. 设置mapreduce最终数据输出压缩为块压缩:

    set mapreduce.output.fileoutputformat.compress.type=BLOCK;

6 Hive的数据存储格式

6.1 支持的存储格式

Hive支持的存储数的格式主要有:TEXTFILE(行式存储) 、SEQUENCEFILE(行式存储)、ORC(列式存储)、PARQUET(列式存储)。

6.2 列式存储和行式存储

  1. 列式存储的特点: 因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。
  2. 行式存储的特点: 查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。
一般拿到原始数据都是TextFile格式,需要经过分析后,通过

INSERT OVERWRITE TABLE tbl_name SELECT * FROM tbl_name2;

将分析出的结果导入到另一张临时表中就可以使用Parquet或者orc这些列式存储格式了。

注意: 存储格式和压缩方式没有关系,但是实际工作当中存储格式一般都会和压缩方式一起使用

orc存储格式本身自带一个叫zlib的压缩方式,就算orc去除掉zlib压缩,它因为列式存储的优势,还是可以将原有的数据变小。

虽然用zlib压缩的文件大小更小,但是压缩和解压都计较耗时,实际工作中一般都是将orc的存储格式和snappy的压缩方式一起用。