天天看点

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

接上篇第5章的5.4:HBase第一天:HBase组件及架构、安装HBase部署集群、HBase的shell操作、HBase数据结构、命名空间、原理、读写流程、flush与合并、hbase-default.xml配置详解

第6章 HBase API操作

6.1 环境准备

6.2 HBaseAPI

6.2.1 获取Configuration对象

6.2.2 判断表是否存在

6.2.3 创建表

6.2.4 删除表

6.2.5 向表中插入数据

6.2.6 删除多行数据

6.2.7 获取所有数据

6.2.8 获取某一行数据

6.2.9 获取某一行指定“列族:列”的数据

6.3 MapReduce

6.3.1 官方HBase-MapReduce

6.3.2 自定义HBase-MapReduce1

6.3.3 自定义HBase-MapReduce2

6.4 与Hive的集成

6.4.1 HBase与Hive的对比

           6.4.2 HBase与Hive集成使用

第6章 HBase API操作

6.1 环境准备

新建项目后在pom.xml中添加依赖:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.1</version>
</dependency>

<dependency>
	<groupId>jdk.tools</groupId>
	<artifactId>jdk.tools</artifactId>
	<version>1.8</version>
	<scope>system</scope>
	<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> (这里换成自己电脑的java路径)
</dependency>
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

添加log4j

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender

log4j.appender.logfile.File=target/spring.log

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

6.2 HBaseAPI

(用静态方法实现具体增删该查的功能,然后在main方法中调用静态方法,共有的内容放在该类的静态代码块中)

先启动HBase集群

6.2.1 获取Configuration对象

public static Configuration conf;
static{
	//使用HBaseConfiguration的单例方法实例化
	conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.9.102");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
           

6.2.2 判断表是否存在

过时的方法

//首先判断表是否存在要有返回结果的,返回结果打印到控制台,于是选用boolean类型返回值,传入的参数就是//String类型的单个表明就可。
Public static boolean tableExist (String tableName){	
	//先new 一个配置文件  用过时的HBaseConfiguration 
	HBaseConfiguration configuration = new HBaseConfiguration();
	 //在lib目录下jar包中hbase的默认配置文件中搜索localhost可找到【hbase.zookeeper.quorum】    //配置头,指定要连接的节点
	configuration.set("hbase.zookeeper.quorum","192.168.1.102");
	//new一个 客户端  用过时的HBaseAdmin
HBaseAdmin admin = new HBaseAdmin(configuration);
	用客户端打点调用tableExists方法
return admin.tableExists(tableName);
}
public static boolean isTableExist(String tableName) throws MasterNotRunningException,
 ZooKeeperConnectionException, IOException{
	//在HBase中管理、访问表需要先创建HBaseAdmin对象
//Connection connection = ConnectionFactory.createConnection(conf);
//HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
	HBaseAdmin admin = new HBaseAdmin(conf);
	return admin.tableExists(tableName);
}
           

新API完整公共代码(笔记)

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-03-11:32
 */
public class TestHBase {
    static Connection connection = null;
    static Admin admin = null;

    static {
        //获取HBase配置信息
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102");
        //获取admin
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //关闭资源
    public static void close(Connection connection, Admin admin) throws IOException {
        if (connection != null) {
            connection.close();
        }
        if (admin != null) {
            admin.close();
        }
    }

    //判断表是否存在
    public static boolean tableExist(String tableName) throws IOException {
        return admin.tableExists(TableName.valueOf(tableName));
    }

    public static void main(String[] args) throws IOException {
        System.out.println(tableExist("student"));
    }
}
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.2.3 创建表

public static void createTable(String tableName, String... columnFamily) throws
 MasterNotRunningException, ZooKeeperConnectionException, IOException{
	HBaseAdmin admin = new HBaseAdmin(conf);
	//判断表是否存在
	if(isTableExist(tableName)){
		System.out.println("表" + tableName + "已存在");
		//System.exit(0);
	}else{
		//创建表属性对象,表名需要转字节
		HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
		//创建多个列族
		for(String cf : columnFamily){
			descriptor.addFamily(new HColumnDescriptor(cf));
		}
		//根据对表的配置,创建表
		admin.createTable(descriptor);
		System.out.println("表" + tableName + "创建成功!");
	}
}
           

建表代码笔记

//创建表
private static void createTable(String tableName, List<String> columnFamilys) throws IOException {
    //先判断表是否存在
    if (!tableExist(tableName)) {
        System.out.println("表 " + tableName + "不存在!可以创建。");
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String columnFamily : columnFamilys) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
            hTableDescriptor.addFamily(hColumnDescriptor);
        }
        admin.createTable(hTableDescriptor);
    }else System.out.println("表 " + tableName + "存在!不能被创建。");
}

public static void main(String[] args) throws IOException {
    //判断表是否存在,以下判断表是否创建成功
    System.out.println(tableExist("staff"));
    createTable("staff", Collections.singletonList("f1"));
    System.out.println(tableExist("staff"));
}
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

通过HBase shell查看

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

创建多个列族的表(main方法中测试)

ArrayList<String> cfs = new ArrayList<>();
cfs.add("f1");
cfs.add("f2");
cfs.add("f3");
createTable("staff1",cfs);
close(connection,admin);
           

HBase shell查看

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.2.4 删除表

public static void dropTable(String tableName) throws MasterNotRunningException,
 ZooKeeperConnectionException, IOException{
	HBaseAdmin admin = new HBaseAdmin(conf);
	if(isTableExist(tableName)){
		admin.disableTable(tableName);
		admin.deleteTable(tableName);
		System.out.println("表" + tableName + "删除成功!");
	}else{
		System.out.println("表" + tableName + "不存在!");
	}
}
删除表代码笔记
private static void deleteTable(String tableName) throws IOException {
    //判断表存在
    if (tableExist(tableName)) {
        System.out.println("表存在,现在删除");
        //先disable,让表不可用
        admin.disableTable(TableName.valueOf(tableName));
        //删除表
        admin.deleteTable(TableName.valueOf(tableName));
    } else System.out.println("表不存在");
}

public static void main(String[] args) throws IOException {
    deleteTable("staff1");
}
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

HBase shell查看staff1已被删除

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.2.5 向表中插入数据

public static void addRowData(String tableName, String rowKey, String columnFamily, String
 column, String value) throws IOException{
	//创建HTable对象
	HTable hTable = new HTable(conf, tableName);
	//向表中插入数据
	Put put = new Put(Bytes.toBytes(rowKey));
	//向Put对象中组装数据
	put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
	hTable.put(put);
	hTable.close();
	System.out.println("插入数据成功");
}
           

插入数据代码笔记

private static void putData(String tableName, String rowKey, String cf, String cn, String value) throws IOException {
    //获取表对象
    Table table = connection.getTable(TableName.valueOf(tableName));
    //利用HBase的Bytes工具类转化rowKey
    Put put = new Put(Bytes.toBytes(rowKey));
    //添加列族、列名、值
    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
    //执行put操作
    table.put(put);
    table.close();
}

public static void main(String[] args) throws IOException {
    //插入一条数据
    putData("staff","1001","f1","name","cr");
}
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.2.6 删除多行数据

public static void deleteMultiRow(String tableName, String... rows) throws IOException{
	HTable hTable = new HTable(conf, tableName);
	List<Delete> deleteList = new ArrayList<Delete>();
	for(String row : rows){
		Delete delete = new Delete(Bytes.toBytes(row));
		deleteList.add(delete);
	}
	hTable.delete(deleteList);
	hTable.close();
}
           

删除一行数据堂笔记:

private static void deleteData(String tableName, String rowkey, String cf, String cn) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));
    //创建要被删除的对象
    Delete delete = new Delete(Bytes.toBytes(rowkey));
    //给delete对象添加具体的列族、列
    delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
    //删除
    table.delete(delete);
    table.close();
}

public static void main(String[] args) throws IOException {
    //删除一条数据
    deleteData("student","1001","info","age");
}
           

可以看到age=20变成了age=18,是因为age=20是之前演示HBase shell时新添加的version3版本,Delete只删除最后一个版本

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

再次执行上述代码才发现age被彻底删除

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

而在HBase shell中一次会删掉所有版本的数据,api说明:

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

删除多条数据代码:

private static void deleteDatas(String tableName, String... rowKeys) throws IOException {
    //获取表对象
    Table table = connection.getTable(TableName.valueOf(tableName));
    ArrayList<Delete> deletes = new ArrayList<>();
    for (String rowKey : rowKeys) {
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        //添加多个要被删除的对象
        deletes.add(delete);
    }
    table.delete(deletes);
    table.close();
}

public static void main(String[] args) throws IOException {
    //删除多行数据
    deleteDatas("student","1001","1002");
}
           

HBase shell查看1001、1002行键中所有数据已被删除

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.2.7 获取所有数据

public static void getAllRows(String tableName) throws IOException{
	HTable hTable = new HTable(conf, tableName);
	//得到用于扫描region的对象
	Scan scan = new Scan();
	//使用HTable得到resultcanner实现类的对象
	ResultScanner resultScanner = hTable.getScanner(scan);
	for(Result result : resultScanner){
		Cell[] cells = result.rawCells();
		for(Cell cell : cells){
			//得到rowkey
			System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
			//得到列族
			System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
			System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
			System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
		}
	}
}
           

全表扫描代码笔记:

private static void getScanData(String tableName) throws IOException {
    //获取table对象
    Table table = connection.getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();//空参对象进行全表扫描
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
        Cell[] cells = result.rawCells();
        //打印获得到的数据
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                    ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                    + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
    table.close();
}

public static void main(String[] args) throws IOException {
    //扫描整张表
    getScanData("student");
}
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.2.8 获取某一行数据

public static void getRow(String tableName, String rowKey) throws IOException{
	HTable table = new HTable(conf, tableName);
	Get get = new Get(Bytes.toBytes(rowKey));
	//get.setMaxVersions();显示所有版本
    //get.setTimeStamp();显示指定时间戳的版本
	Result result = table.get(get);
	for(Cell cell : result.rawCells()){
		System.out.println("行键:" + Bytes.toString(result.getRow()));
		System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
		System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
		System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
		System.out.println("时间戳:" + cell.getTimestamp());
	}
}
           

获取一行数据代码笔记

public static void getData(String tableName, String rowkey) throws IOException {
    //获取表对象
    Table table = connection.getTable(TableName.valueOf(tableName));
    Get get = new Get(Bytes.toBytes(String.valueOf(rowkey)));
    Result result = table.get(get);
    Cell[] cells = result.rawCells();
    //打印获得到的数据
    for (Cell cell : cells) {
        System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
        ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
        + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));

    }
    table.close();
}

public static void main(String[] args) throws IOException {
    getData("staff","1001");
}
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.2.9 获取某一行指定“列族:列”的数据

public static void getRowQualifier(String tableName, String rowKey, String family, String
 qualifier) throws IOException{
	HTable table = new HTable(conf, tableName);
	Get get = new Get(Bytes.toBytes(rowKey));
	get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
	Result result = table.get(get);
	for(Cell cell : result.rawCells()){
		System.out.println("行键:" + Bytes.toString(result.getRow()));
		System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
		System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
		System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
	}
}
           

获取指定列族的一行数据代码笔记:

private static void getDataByCN(String tableName, String rowkey, String cf, String cn) throws IOException {
    //获取表对象
    Table table = connection.getTable(TableName.valueOf(tableName));
    Get get = new Get(Bytes.toBytes(rowkey));
    get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//获取多个版本的数据
get.setMaxVersions(3);
    Result result = table.get(get);
    Cell[] cells = result.rawCells();
    //打印获得到的数据
    for (Cell cell : cells) {
        System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
    }
    table.close();
}

public static void main(String[] args) throws IOException {
    //获取一样数据(指定列族:列)
    getDataByCN("student","1002","info","name");
}
           

测试student表的截图

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

以上测试代码合集

package hbase.API;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * HBase的API操作,可作为工具类使用
 * @author cherry
 * @create 2019-09-03-11:32
 */

@SuppressWarnings("all")
public class TestHBase {
    static Connection connection = null;
    static Admin admin = null;

    static {
        //获取HBase配置信息
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102");
        //获取admin
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭资源
     *
     * @param connection
     * @param admin
     * @throws IOException
     */
    public static void close(Connection connection, Admin admin) throws IOException {
        if (connection != null) {
            connection.close();
        }
        if (admin != null) {
            admin.close();
        }
    }

    /**
     * 判断表是否存在
     *
     * @param tableName
     * @return
     * @throws IOException
     */
    public static boolean tableExist(String tableName) throws IOException {
        return admin.tableExists(TableName.valueOf(tableName));
    }

    /**
     * 创建表
     *
     * @param tableName
     * @param columnFamilys
     * @throws IOException
     */
    private static void createTable(String tableName, List<String> columnFamilys) throws IOException {
        //先判断表是否存在
        if (!tableExist(tableName)) {
            System.out.println("表 " + tableName + "不存在!可以创建。");
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            for (String columnFamily : columnFamilys) {
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
                hTableDescriptor.addFamily(hColumnDescriptor);
            }
            admin.createTable(hTableDescriptor);
        } else System.out.println("表 " + tableName + "存在!不能被创建。");
    }

    /**
     * 删除表
     *
     * @param tableName
     * @throws IOException
     */
    private static void deleteTable(String tableName) throws IOException {
        //判断表存在
        if (tableExist(tableName)) {
            System.out.println("表存在,现在删除");
            //先disable,让表不可用
            admin.disableTable(TableName.valueOf(tableName));
            //删除表
            admin.deleteTable(TableName.valueOf(tableName));
        } else System.out.println("表不存在");
    }

    /**
     * 插入一条数据
     *
     * @param tableName
     * @param rowKey
     * @param cf
     * @param cn
     * @param value
     * @throws IOException
     */
    private static void putData(String tableName, String rowKey, String cf, String cn, String value) throws IOException {
        //获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        //利用HBase的Bytes工具类转化rowKey
        Put put = new Put(Bytes.toBytes(rowKey));
        //添加列族、列名、值
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
        //执行put操作
        table.put(put);
        table.close();
    }

    /**
     * 查询一条数据
     *
     * @param tableName
     * @param rowkey
     * @throws IOException
     */
    public static void getData(String tableName, String rowkey) throws IOException {
        //获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(String.valueOf(rowkey)));
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        //打印获得到的数据
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                    ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                    + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
        table.close();
    }

    /**
     * 获取一行数据值(指定列族:列)
     *
     * @param tableName
     * @param rowkey
     * @param cf
     * @param cn
     * @throws IOException
     */
    private static void getDataByCN(String tableName, String rowkey, String cf, String cn) throws IOException {
        //获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowkey));
        //get.addFamily();//获取一个列族下的所有数据
        get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //获取多个版本的数据
        get.setMaxVersions(3);
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        //打印获得到的数据
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                    ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                    + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
        table.close();
    }

    /**
     * 全表扫描(scan)
     *
     * @param tableName
     * @throws IOException
     */
    private static void getScanData(String tableName) throws IOException {
        //获取table对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();//空参对象进行全表扫描
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            //打印获得到的数据
            for (Cell cell : cells) {
                System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                        ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                        + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
        table.close();
    }

    /**
     * 删除一条数据(delete)
     *
     * @param tableName
     * @param rowkey
     * @param cf
     * @param cn
     * @throws IOException
     */
    private static void deleteData(String tableName, String rowkey, String cf, String cn) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        //创建要被删除的对象
        Delete delete = new Delete(Bytes.toBytes(rowkey));
        //给delete对象添加具体的列族、列
        delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //删除
        table.delete(delete);
        table.close();
    }

    /**
     * 删除多条数据
     *
     * @param tableName
     * @param rowKeys
     */
    private static void deleteDatas(String tableName, String... rowKeys) throws IOException {
        //获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        ArrayList<Delete> deletes = new ArrayList<>();
        for (String rowKey : rowKeys) {
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            //添加多个要被删除的对象
            deletes.add(delete);
        }
        table.delete(deletes);
        table.close();
    }

    /**
     * main方法用来测试上述方法
     *
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {

        /*//插入三个版本的数据
        putData("student","1001","info","name", "cr");
        putData("student","1001","info","name", "wq");
        putData("student","1001","info","name", "zs");*/
        /*//删除多行数据
        deleteDatas("student","1001","1002");*/
       /* //删除一条数据
        deleteData("student","1001","info","age");*/
        /*//扫描整张表
        getScanData("student");*/
        //获取一样数据(指定列族:列)
        //getDataByCN("student", "1001", "info", "name");
        //getData("student","1001");
       /* //插入一条数据
        putData("staff", "1001", "f1", "name", "cr");*/
        //deleteTable("staff1");
        //判断表是否存在,以下判断表是否创建成功
        /*System.out.println(tableExist("staff"));
        createTable("staff", Collections.singletonList("f1"));
        System.out.println(tableExist("staff"));*/
        /*ArrayList<String> cfs = new ArrayList<>();
        cfs.add("f1");
        cfs.add("f2");
        cfs.add("f3");
        createTable("staff1",cfs);
        close(connection,admin);*/
    }

}
           

6.3 MapReduce

通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。

6.3.1 官方HBase-MapReduce

1.查看HBase的MapReduce任务的执行 (hbase想从hdfs读数据,所需要的jar包)

$ bin/hbase mapredcp

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

2.环境变量的导入

(1)执行环境变量的导入(临时生效,在命令行执行下述操作)

$ export HBASE_HOME=/opt/module/hbase-1.3.1

$ export HADOOP_HOME=/opt/module/hadoop-2.7.2

$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

反引号指的是把执行结果赋值,也就是把HBASE_HOME下面的这个命令所显示的jar包给到HADOOP_CLASSPATH下面

(2)永久生效:在/etc/profile配置

export HBASE_HOME=/opt/module/hbase

export HADOOP_HOME=/opt/module/hadoop-2.7.2

并在hadoop-env.sh中配置:(注意:在for循环之后配)

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

别忘记分发/etc/profile 和hadoop-env.sh  

3.运行官方的MapReduce任务 (用官方案例的目的是测试 是否打通hdfs 和 hbase)

-- 案例一:统计Student表中有多少行数据

$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

通过HBase shell验证

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

-- 案例二:使用MapReduce将本地数据导入到HBase

1)在本地创建一个tsv格式的文件:fruit.tsv

1001	Apple	Red
1002	Pear	Yellow
1003	Pineapple	Yellow
           

2)创建HBase表

hbase(main):001:0> create 'fruit','info'

3)在HDFS中创建input_fruit文件夹并上传fruit.tsv文件

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

执行MapReduce到HBase的fruit表中

$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \

-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \

hdfs://hadoop102:9000/input_fruit

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

使用scan命令查看导入后的结果

hbase(main):001:0> scan ‘fruit’

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

6.3.2 自定义HBase-MapReduce1

目标:将fruit表中的一部分数据,通过MR迁入到fruit_mr表中。

分步实现:

1.构建ReadFruitMapper类,用于读取fruit表中的数据

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {

	@Override
	protected void map(ImmutableBytesWritable key, Result value, Context context) 
	throws IOException, InterruptedException {
	//将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
		Put put = new Put(key.get());
		//遍历添加column行
		for(Cell cell: value.rawCells()){
			//添加/克隆列族:info
			if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
				//添加/克隆列:name
				if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
					//将该列cell加入到put对象中
					put.add(cell);
					//添加/克隆列:color
				}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
					//向该列cell加入到put对象中
					put.add(cell);
				}
			}
		}
		//将从fruit读取到的每行数据写入到context中作为map的输出
		context.write(key, put);
	}
}
           

2. 构建WriteFruitMRReducer类,用于将读取到的fruit表中的数据写入到fruit_mr表中

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
	@Override
	protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) 
	throws IOException, InterruptedException {
		//读出来的每一行数据写入到fruit_mr表中
		for(Put put: values){
			context.write(NullWritable.get(), put);
		}
	}
}
           

3.构建Fruit2FruitMRRunner extends Configured implements Tool用于组装运行Job任务

//组装Job
	public int run(String[] args) throws Exception {
		//得到Configuration
		Configuration conf = this.getConf();
		//创建Job任务
		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(Fruit2FruitMRRunner.class);

		//配置Job
		Scan scan = new Scan();
		scan.setCacheBlocks(false);
		scan.setCaching(500);

		//设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
		TableMapReduceUtil.initTableMapperJob(
		"fruit", //数据源的表名
		scan, //scan扫描控制器
		ReadFruitMapper.class,//设置Mapper类
		ImmutableBytesWritable.class,//设置Mapper输出key类型
		Put.class,//设置Mapper输出value值类型
		job//设置给哪个JOB
		);
		//设置Reducer
		TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);
		//设置Reduce数量,最少1个
		job.setNumReduceTasks(1);

		boolean isSuccess = job.waitForCompletion(true);
		if(!isSuccess){
			throw new IOException("Job running with error");
		}
		return isSuccess ? 0 : 1;
	}
           

4.主函数中调用运行该Job任务

public static void main( String[] args ) throws Exception{
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
System.exit(status);
}
           

5.打包运行任务

$ /opt/module/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar

 com.z.hbase.mr1.Fruit2FruitMRRunner

先扫描确定数据表中的数据

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

执行MR报错,是因为输出表fruit_mr需要提前被创建

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

创建输出表

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

重复执行MR程序,查看执行结果

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

MR1代码笔记:

FruitMapper类      
package hbase.MR;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-14:27
 */
public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //获取put对象
        Put put = new Put(key.get());
        //获取该rowkey下所有的cell
        Cell[] cells = value.rawCells();
        //遍历,找出所需的数据
        for (Cell cell : cells) {
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                put.add(cell);
                context.write(key,put);
            }
        }

    }
}
           
FruitReducer类:      
package hbase.MR;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-14:53
 */
public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put value : values) {
            context.write(key,value);
        }
    }
}
           
FruitDriver类      
package hbase.MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-14:57
 */
public class FruitDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.获取job
        Configuration conf = HBaseConfiguration.create();
        Job job = Job.getInstance(conf);
        //2.指定jar所在路径
        job.setJarByClass(FruitDriver.class);
        //3.指定MR
        Scan scan = new Scan();//全表扫描
        TableMapReduceUtil.initTableMapperJob("fruit", scan, FruitMapper.class, ImmutableBytesWritable.class, Put.class, job);
        TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job);
        //4.指定Mapper的输出
        //5.指定最终输出
        job.setNumReduceTasks(1);//Reduce数量最少为1,该类不能被省略
        //6.指定输入输出路径,456步已写好
        //7.提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
           

6.3.3 自定义HBase-MapReduce2

目标:实现将HDFS中的数据写入到HBase表中。

分步实现:

1.构建ReadFruitFromHDFSMapper于读取HDFS中的文件数据

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		//从HDFS中读取的数据
		String lineValue = value.toString();
		//读取出来的每行数据使用\t进行分割,存于String数组
		String[] values = lineValue.split("\t");
		
		//根据数据中值的含义取值
		String rowKey = values[0];
		String name = values[1];
		String color = values[2];
		
		//初始化rowKey
		ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
		
		//初始化put对象
		Put put = new Put(Bytes.toBytes(rowKey));
		
		//参数分别:列族、列、值  
        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"),  Bytes.toBytes(name)); 
        put.add(Bytes.toBytes("info"), Bytes.toBytes("color"),  Bytes.toBytes(color)); 
        
        context.write(rowKeyWritable, put);
	}
}
           

2.构建WriteFruitMRFromTxtReducer类

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
	@Override
	protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
		//读出来的每一行数据写入到fruit_hdfs表中
		for(Put put: values){
			context.write(NullWritable.get(), put);
		}
	}
}
           

3.创建Txt2FruitRunner组装Job

public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();

//创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Txt2FruitRunner.class);
Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv");
FileInputFormat.addInputPath(job, inPath);

//设置Mapper
job.setMapperClass(ReadFruitFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

//设置Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);

//设置Reduce数量,最少1个
job.setNumReduceTasks(1);

boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}

return isSuccess ? 0 : 1;
}
           

4.调用执行Job

public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
	    int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
	    System.exit(status);
}
           

5.打包运行

$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.hbase.mr2.Txt2FruitRunner

先建表

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

确认HDFS上原始文件的路径

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

运行jar后扫描表fruit_hdfs

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

看到1002的color为空值,是因为建表时多打了一个制表符,Map是根据单个制表符进行切片的,删掉多余制表符即可解决问题

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。

提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

MR2代码笔记:

HDFSMapper类:      
package hbase.MR2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 目标:实现将HDFS中的数据写入到HBase表中。
 *
 * @author cherry
 * @create 2019-09-04-15:29
 */
public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> {
    /**
     * 原数据:1001 Apple Red
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        Put put = new Put(Bytes.toBytes(split[0]));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
        context.write(NullWritable.get(), put);
    }
}
           
HDFSReducer类:      
package hbase.MR2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-15:50
 */
public class HDFSReducer extends TableReducer<NullWritable, Put,NullWritable> {
    @Override
    protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put value : values) {
            context.write(NullWritable.get(),value);
        }
    }
}
           
HDFSDriver类:      
package hbase.MR2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author cherry
 * @create 2019-09-04-15:51
 */
public class HDFSDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        //获取任务对象
        Job job = Job.getInstance(conf);
        job.setJarByClass(HDFSDriver.class);
        //关联MR
        job.setMapperClass(HDFSMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);
        TableMapReduceUtil.initTableReducerJob("fruit_hdfs", HDFSReducer.class, job);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        //一定要用HBase的conf
        Configuration conf = HBaseConfiguration.create();
        int run = ToolRunner.run(conf, new HDFSDriver(), args);
        if (run == 1) System.out.println("任务失败!");
    }
}
           

6.4 与Hive的集成

6.4.1 HBase与Hive的对比

1.Hive (分析)

(1) 数据仓库

Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。

(2) 用于数据分析、清洗

Hive适用于离线的数据分析和清洗,延迟较高。

(3) 基于HDFS、MapReduce

Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。

2.HBase (存储)

(1) 数据库

是一种面向列存储的非关系型数据库。

(2) 用于存储结构化和非结构化的数据

适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。

(3) 基于HDFS

数据持久化存储的体现形式是Hfile,存放于DataNode中,被ResionServer以region的形式进行管理。

(4) 延迟较低,接入在线业务使用

面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。

6.4.2 HBase与Hive集成使用

尖叫提示:HBase与Hive的集成在最新的两个版本中无法兼容。所以,我们只能含着泪勇敢的重新编译:hive-hbase-handler-1.2.2.jar!!好气!!

环境准备

因为我们后续可能会在操作Hive的同时对HBase也会产生影响,所以Hive需要持有操作HBase的Jar,那么接下来拷贝Hive所依赖的Jar包(或者使用软连接的形式)。

$ export HBASE_HOME=/opt/module/hbase
$ export HIVE_HOME=/opt/module/hive

ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar  $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

同时在hive-site.xml中修改zookeeper的属性,如下:

<property>
  <name>hive.zookeeper.quorum</name>
  <value>hadoop102,hadoop103,hadoop104</value>
  <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
  <name>hive.zookeeper.client.port</name>
  <value>2181</value>
  <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
           

1.案例一

目标:建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表。

分步实现:

(1) 在Hive中创建表同时关联HBase

CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
           

第一次不会成功,报错:FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

解决方法:编译hive-hbase-handler-1.2.1.jar并覆盖掉原来的jar

下载地址:被编译的hive-hbase-handler-1.2.1.jar,用于在Hive中创建关联HBase表的jar,解决创建Hive关联HBase时报FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 错误的问题

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

退出并重新进入hive,重新执行HQL,执行成功:

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

进入hbase shell中检查hbase_emp_table的表结构

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

提示:完成之后,可以分别进入Hive和HBase查看,都生成了对应的表

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

(2) 在Hive中创建临时中间表,用于load文件中的数据

提示:不能将数据直接load进Hive所关联HBase的那张表中

CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
           

(3) 向Hive中间表中load数据

hive> load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;

(4) 通过insert命令将中间表中的数据导入到Hive关联HBase的那张表中

hive> insert into table hive_hbase_emp_table select * from emp;
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

(5) 查看Hive以及关联的HBase表中是否已经成功的同步插入了数据

Hive:

hive> select * from hive_hbase_emp_table;
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

HBase:

hbase> scan ‘hbase_emp_table’

注意:如果删除其中一个表,则与之关联的另一张表也会被删除

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

2.案例二

目标:在HBase中已经存储了某一张表hbase_emp_table,然后在Hive中创建一个外部表来关联HBase中的hbase_emp_table这张表,使之可以借助Hive来分析HBase这张表中的数据。

注:该案例2紧跟案例1的脚步,所以完成此案例前,请先完成案例1。

分步实现:

(1) 在Hive中创建外部表

CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
           

(2) 关联后就可以使用Hive函数进行一些分析操作了

hive (default)> select * from relevance_hbase_emp;

笔记:创建与HBase中fruit关联表

建表HQL:

CREATE EXTERNAL TABLE relevance_hbase_fruit(
id int,
name string,
color string)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,info:name,info:color")
TBLPROPERTIES("hbase.table.name"="fruit");
           
HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

hive上查看HBase中fruit表的数据:

HBase第二天:HBase的API操作,判断表存在、创建删除表、获取表中一行或指定列族数据、向表中插入数据、HBase的wordcount、自定义HBaseMapReduce、Hbase集成Hive第6章 HBase API操作

继续阅读