天天看點

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

接上篇第5章的5.4:HBase第一天:HBase元件及架構、安裝HBase部署叢集、HBase的shell操作、HBase資料結構、命名空間、原理、讀寫流程、flush與合并、hbase-default.xml配置詳解

第6章 HBase API操作

6.1 環境準備

6.2 HBaseAPI

6.2.1 擷取Configuration對象

6.2.2 判斷表是否存在

6.2.3 建立表

6.2.4 删除表

6.2.5 向表中插入資料

6.2.6 删除多行資料

6.2.7 擷取所有資料

6.2.8 擷取某一行資料

6.2.9 擷取某一行指定“列族:列”的資料

6.3 MapReduce

6.3.1 官方HBase-MapReduce

6.3.2 自定義HBase-MapReduce1

6.3.3 自定義HBase-MapReduce2

6.4 與Hive的內建

6.4.1 HBase與Hive的對比

           6.4.2 HBase與Hive內建使用

第6章 HBase API操作

6.1 環境準備

建立項目後在pom.xml中添加依賴:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.1</version>
</dependency>

<dependency>
	<groupId>jdk.tools</groupId>
	<artifactId>jdk.tools</artifactId>
	<version>1.8</version>
	<scope>system</scope>
	<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> (這裡換成自己電腦的java路徑)
</dependency>
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

添加log4j

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender

log4j.appender.logfile.File=target/spring.log

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

6.2 HBaseAPI

(用靜态方法實作具體增删該查的功能,然後在main方法中調用靜态方法,共有的内容放在該類的靜态代碼塊中)

先啟動HBase叢集

6.2.1 擷取Configuration對象

public static Configuration conf;
static{
	//使用HBaseConfiguration的單例方法執行個體化
	conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.9.102");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
           

6.2.2 判斷表是否存在

過時的方法

//首先判斷表是否存在要有傳回結果的,傳回結果列印到控制台,于是選用boolean類型傳回值,傳入的參數就是//String類型的單個表明就可。
Public static boolean tableExist (String tableName){	
	//先new 一個配置檔案  用過時的HBaseConfiguration 
	HBaseConfiguration configuration = new HBaseConfiguration();
	 //在lib目錄下jar包中hbase的預設配置檔案中搜尋localhost可找到【hbase.zookeeper.quorum】    //配置頭,指定要連接配接的節點
	configuration.set("hbase.zookeeper.quorum","192.168.1.102");
	//new一個 用戶端  用過時的HBaseAdmin
HBaseAdmin admin = new HBaseAdmin(configuration);
	用用戶端打點調用tableExists方法
return admin.tableExists(tableName);
}
public static boolean isTableExist(String tableName) throws MasterNotRunningException,
 ZooKeeperConnectionException, IOException{
	//在HBase中管理、通路表需要先建立HBaseAdmin對象
//Connection connection = ConnectionFactory.createConnection(conf);
//HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
	HBaseAdmin admin = new HBaseAdmin(conf);
	return admin.tableExists(tableName);
}
           

新API完整公共代碼(筆記)

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-03-11:32
 */
public class TestHBase {
    static Connection connection = null;
    static Admin admin = null;

    static {
        //擷取HBase配置資訊
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102");
        //擷取admin
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //關閉資源
    public static void close(Connection connection, Admin admin) throws IOException {
        if (connection != null) {
            connection.close();
        }
        if (admin != null) {
            admin.close();
        }
    }

    //判斷表是否存在
    public static boolean tableExist(String tableName) throws IOException {
        return admin.tableExists(TableName.valueOf(tableName));
    }

    public static void main(String[] args) throws IOException {
        System.out.println(tableExist("student"));
    }
}
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.2.3 建立表

public static void createTable(String tableName, String... columnFamily) throws
 MasterNotRunningException, ZooKeeperConnectionException, IOException{
	HBaseAdmin admin = new HBaseAdmin(conf);
	//判斷表是否存在
	if(isTableExist(tableName)){
		System.out.println("表" + tableName + "已存在");
		//System.exit(0);
	}else{
		//建立表屬性對象,表名需要轉位元組
		HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
		//建立多個列族
		for(String cf : columnFamily){
			descriptor.addFamily(new HColumnDescriptor(cf));
		}
		//根據對表的配置,建立表
		admin.createTable(descriptor);
		System.out.println("表" + tableName + "建立成功!");
	}
}
           

建表代碼筆記

//建立表
private static void createTable(String tableName, List<String> columnFamilys) throws IOException {
    //先判斷表是否存在
    if (!tableExist(tableName)) {
        System.out.println("表 " + tableName + "不存在!可以建立。");
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String columnFamily : columnFamilys) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
            hTableDescriptor.addFamily(hColumnDescriptor);
        }
        admin.createTable(hTableDescriptor);
    }else System.out.println("表 " + tableName + "存在!不能被建立。");
}

public static void main(String[] args) throws IOException {
    //判斷表是否存在,以下判斷表是否建立成功
    System.out.println(tableExist("staff"));
    createTable("staff", Collections.singletonList("f1"));
    System.out.println(tableExist("staff"));
}
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

通過HBase shell檢視

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

建立多個列族的表(main方法中測試)

ArrayList<String> cfs = new ArrayList<>();
cfs.add("f1");
cfs.add("f2");
cfs.add("f3");
createTable("staff1",cfs);
close(connection,admin);
           

HBase shell檢視

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.2.4 删除表

public static void dropTable(String tableName) throws MasterNotRunningException,
 ZooKeeperConnectionException, IOException{
	HBaseAdmin admin = new HBaseAdmin(conf);
	if(isTableExist(tableName)){
		admin.disableTable(tableName);
		admin.deleteTable(tableName);
		System.out.println("表" + tableName + "删除成功!");
	}else{
		System.out.println("表" + tableName + "不存在!");
	}
}
删除表代碼筆記
private static void deleteTable(String tableName) throws IOException {
    //判斷表存在
    if (tableExist(tableName)) {
        System.out.println("表存在,現在删除");
        //先disable,讓表不可用
        admin.disableTable(TableName.valueOf(tableName));
        //删除表
        admin.deleteTable(TableName.valueOf(tableName));
    } else System.out.println("表不存在");
}

public static void main(String[] args) throws IOException {
    deleteTable("staff1");
}
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

HBase shell檢視staff1已被删除

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.2.5 向表中插入資料

public static void addRowData(String tableName, String rowKey, String columnFamily, String
 column, String value) throws IOException{
	//建立HTable對象
	HTable hTable = new HTable(conf, tableName);
	//向表中插入資料
	Put put = new Put(Bytes.toBytes(rowKey));
	//向Put對象中組裝資料
	put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
	hTable.put(put);
	hTable.close();
	System.out.println("插入資料成功");
}
           

插入資料代碼筆記

private static void putData(String tableName, String rowKey, String cf, String cn, String value) throws IOException {
    //擷取表對象
    Table table = connection.getTable(TableName.valueOf(tableName));
    //利用HBase的Bytes工具類轉化rowKey
    Put put = new Put(Bytes.toBytes(rowKey));
    //添加列族、列名、值
    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
    //執行put操作
    table.put(put);
    table.close();
}

public static void main(String[] args) throws IOException {
    //插入一條資料
    putData("staff","1001","f1","name","cr");
}
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.2.6 删除多行資料

public static void deleteMultiRow(String tableName, String... rows) throws IOException{
	HTable hTable = new HTable(conf, tableName);
	List<Delete> deleteList = new ArrayList<Delete>();
	for(String row : rows){
		Delete delete = new Delete(Bytes.toBytes(row));
		deleteList.add(delete);
	}
	hTable.delete(deleteList);
	hTable.close();
}
           

删除一行資料堂筆記:

private static void deleteData(String tableName, String rowkey, String cf, String cn) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));
    //建立要被删除的對象
    Delete delete = new Delete(Bytes.toBytes(rowkey));
    //給delete對象添加具體的列族、列
    delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
    //删除
    table.delete(delete);
    table.close();
}

public static void main(String[] args) throws IOException {
    //删除一條資料
    deleteData("student","1001","info","age");
}
           

可以看到age=20變成了age=18,是因為age=20是之前示範HBase shell時新添加的version3版本,Delete隻删除最後一個版本

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

再次執行上述代碼才發現age被徹底删除

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

而在HBase shell中一次會删掉所有版本的資料,api說明:

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

删除多條資料代碼:

private static void deleteDatas(String tableName, String... rowKeys) throws IOException {
    //擷取表對象
    Table table = connection.getTable(TableName.valueOf(tableName));
    ArrayList<Delete> deletes = new ArrayList<>();
    for (String rowKey : rowKeys) {
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        //添加多個要被删除的對象
        deletes.add(delete);
    }
    table.delete(deletes);
    table.close();
}

public static void main(String[] args) throws IOException {
    //删除多行資料
    deleteDatas("student","1001","1002");
}
           

HBase shell檢視1001、1002行鍵中所有資料已被删除

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.2.7 擷取所有資料

public static void getAllRows(String tableName) throws IOException{
	HTable hTable = new HTable(conf, tableName);
	//得到用于掃描region的對象
	Scan scan = new Scan();
	//使用HTable得到resultcanner實作類的對象
	ResultScanner resultScanner = hTable.getScanner(scan);
	for(Result result : resultScanner){
		Cell[] cells = result.rawCells();
		for(Cell cell : cells){
			//得到rowkey
			System.out.println("行鍵:" + Bytes.toString(CellUtil.cloneRow(cell)));
			//得到列族
			System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
			System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
			System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
		}
	}
}
           

全表掃描代碼筆記:

private static void getScanData(String tableName) throws IOException {
    //擷取table對象
    Table table = connection.getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();//空參對象進行全表掃描
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
        Cell[] cells = result.rawCells();
        //列印獲得到的資料
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                    ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                    + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
    table.close();
}

public static void main(String[] args) throws IOException {
    //掃描整張表
    getScanData("student");
}
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.2.8 擷取某一行資料

public static void getRow(String tableName, String rowKey) throws IOException{
	HTable table = new HTable(conf, tableName);
	Get get = new Get(Bytes.toBytes(rowKey));
	//get.setMaxVersions();顯示所有版本
    //get.setTimeStamp();顯示指定時間戳的版本
	Result result = table.get(get);
	for(Cell cell : result.rawCells()){
		System.out.println("行鍵:" + Bytes.toString(result.getRow()));
		System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
		System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
		System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
		System.out.println("時間戳:" + cell.getTimestamp());
	}
}
           

擷取一行資料代碼筆記

public static void getData(String tableName, String rowkey) throws IOException {
    //擷取表對象
    Table table = connection.getTable(TableName.valueOf(tableName));
    Get get = new Get(Bytes.toBytes(String.valueOf(rowkey)));
    Result result = table.get(get);
    Cell[] cells = result.rawCells();
    //列印獲得到的資料
    for (Cell cell : cells) {
        System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
        ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
        + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));

    }
    table.close();
}

public static void main(String[] args) throws IOException {
    getData("staff","1001");
}
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.2.9 擷取某一行指定“列族:列”的資料

public static void getRowQualifier(String tableName, String rowKey, String family, String
 qualifier) throws IOException{
	HTable table = new HTable(conf, tableName);
	Get get = new Get(Bytes.toBytes(rowKey));
	get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
	Result result = table.get(get);
	for(Cell cell : result.rawCells()){
		System.out.println("行鍵:" + Bytes.toString(result.getRow()));
		System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
		System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
		System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
	}
}
           

擷取指定列族的一行資料代碼筆記:

private static void getDataByCN(String tableName, String rowkey, String cf, String cn) throws IOException {
    //擷取表對象
    Table table = connection.getTable(TableName.valueOf(tableName));
    Get get = new Get(Bytes.toBytes(rowkey));
    get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//擷取多個版本的資料
get.setMaxVersions(3);
    Result result = table.get(get);
    Cell[] cells = result.rawCells();
    //列印獲得到的資料
    for (Cell cell : cells) {
        System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
    }
    table.close();
}

public static void main(String[] args) throws IOException {
    //擷取一樣資料(指定列族:列)
    getDataByCN("student","1002","info","name");
}
           

測試student表的截圖

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

以上測試代碼合集

package hbase.API;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * HBase的API操作,可作為工具類使用
 * @author cherry
 * @create 2019-09-03-11:32
 */

@SuppressWarnings("all")
public class TestHBase {
    static Connection connection = null;
    static Admin admin = null;

    static {
        //擷取HBase配置資訊
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102");
        //擷取admin
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉資源
     *
     * @param connection
     * @param admin
     * @throws IOException
     */
    public static void close(Connection connection, Admin admin) throws IOException {
        if (connection != null) {
            connection.close();
        }
        if (admin != null) {
            admin.close();
        }
    }

    /**
     * 判斷表是否存在
     *
     * @param tableName
     * @return
     * @throws IOException
     */
    public static boolean tableExist(String tableName) throws IOException {
        return admin.tableExists(TableName.valueOf(tableName));
    }

    /**
     * 建立表
     *
     * @param tableName
     * @param columnFamilys
     * @throws IOException
     */
    private static void createTable(String tableName, List<String> columnFamilys) throws IOException {
        //先判斷表是否存在
        if (!tableExist(tableName)) {
            System.out.println("表 " + tableName + "不存在!可以建立。");
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            for (String columnFamily : columnFamilys) {
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
                hTableDescriptor.addFamily(hColumnDescriptor);
            }
            admin.createTable(hTableDescriptor);
        } else System.out.println("表 " + tableName + "存在!不能被建立。");
    }

    /**
     * 删除表
     *
     * @param tableName
     * @throws IOException
     */
    private static void deleteTable(String tableName) throws IOException {
        //判斷表存在
        if (tableExist(tableName)) {
            System.out.println("表存在,現在删除");
            //先disable,讓表不可用
            admin.disableTable(TableName.valueOf(tableName));
            //删除表
            admin.deleteTable(TableName.valueOf(tableName));
        } else System.out.println("表不存在");
    }

    /**
     * 插入一條資料
     *
     * @param tableName
     * @param rowKey
     * @param cf
     * @param cn
     * @param value
     * @throws IOException
     */
    private static void putData(String tableName, String rowKey, String cf, String cn, String value) throws IOException {
        //擷取表對象
        Table table = connection.getTable(TableName.valueOf(tableName));
        //利用HBase的Bytes工具類轉化rowKey
        Put put = new Put(Bytes.toBytes(rowKey));
        //添加列族、列名、值
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
        //執行put操作
        table.put(put);
        table.close();
    }

    /**
     * 查詢一條資料
     *
     * @param tableName
     * @param rowkey
     * @throws IOException
     */
    public static void getData(String tableName, String rowkey) throws IOException {
        //擷取表對象
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(String.valueOf(rowkey)));
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        //列印獲得到的資料
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                    ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                    + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
        table.close();
    }

    /**
     * 擷取一行資料值(指定列族:列)
     *
     * @param tableName
     * @param rowkey
     * @param cf
     * @param cn
     * @throws IOException
     */
    private static void getDataByCN(String tableName, String rowkey, String cf, String cn) throws IOException {
        //擷取表對象
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowkey));
        //get.addFamily();//擷取一個列族下的所有資料
        get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //擷取多個版本的資料
        get.setMaxVersions(3);
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        //列印獲得到的資料
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                    ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                    + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
        table.close();
    }

    /**
     * 全表掃描(scan)
     *
     * @param tableName
     * @throws IOException
     */
    private static void getScanData(String tableName) throws IOException {
        //擷取table對象
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();//空參對象進行全表掃描
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            //列印獲得到的資料
            for (Cell cell : cells) {
                System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                        ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                        + ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
        table.close();
    }

    /**
     * 删除一條資料(delete)
     *
     * @param tableName
     * @param rowkey
     * @param cf
     * @param cn
     * @throws IOException
     */
    private static void deleteData(String tableName, String rowkey, String cf, String cn) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        //建立要被删除的對象
        Delete delete = new Delete(Bytes.toBytes(rowkey));
        //給delete對象添加具體的列族、列
        delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //删除
        table.delete(delete);
        table.close();
    }

    /**
     * 删除多條資料
     *
     * @param tableName
     * @param rowKeys
     */
    private static void deleteDatas(String tableName, String... rowKeys) throws IOException {
        //擷取表對象
        Table table = connection.getTable(TableName.valueOf(tableName));
        ArrayList<Delete> deletes = new ArrayList<>();
        for (String rowKey : rowKeys) {
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            //添加多個要被删除的對象
            deletes.add(delete);
        }
        table.delete(deletes);
        table.close();
    }

    /**
     * main方法用來測試上述方法
     *
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {

        /*//插入三個版本的資料
        putData("student","1001","info","name", "cr");
        putData("student","1001","info","name", "wq");
        putData("student","1001","info","name", "zs");*/
        /*//删除多行資料
        deleteDatas("student","1001","1002");*/
       /* //删除一條資料
        deleteData("student","1001","info","age");*/
        /*//掃描整張表
        getScanData("student");*/
        //擷取一樣資料(指定列族:列)
        //getDataByCN("student", "1001", "info", "name");
        //getData("student","1001");
       /* //插入一條資料
        putData("staff", "1001", "f1", "name", "cr");*/
        //deleteTable("staff1");
        //判斷表是否存在,以下判斷表是否建立成功
        /*System.out.println(tableExist("staff"));
        createTable("staff", Collections.singletonList("f1"));
        System.out.println(tableExist("staff"));*/
        /*ArrayList<String> cfs = new ArrayList<>();
        cfs.add("f1");
        cfs.add("f2");
        cfs.add("f3");
        createTable("staff1",cfs);
        close(connection,admin);*/
    }

}
           

6.3 MapReduce

通過HBase的相關JavaAPI,我們可以實作伴随HBase操作的MapReduce過程,比如使用MapReduce将資料從本地檔案系統導入到HBase的表中,比如我們從HBase中讀取一些原始資料後使用MapReduce做資料分析。

6.3.1 官方HBase-MapReduce

1.檢視HBase的MapReduce任務的執行 (hbase想從hdfs讀資料,所需要的jar包)

$ bin/hbase mapredcp

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

2.環境變量的導入

(1)執行環境變量的導入(臨時生效,在指令行執行下述操作)

$ export HBASE_HOME=/opt/module/hbase-1.3.1

$ export HADOOP_HOME=/opt/module/hadoop-2.7.2

$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

反引号指的是把執行結果指派,也就是把HBASE_HOME下面的這個指令所顯示的jar包給到HADOOP_CLASSPATH下面

(2)永久生效:在/etc/profile配置

export HBASE_HOME=/opt/module/hbase

export HADOOP_HOME=/opt/module/hadoop-2.7.2

并在hadoop-env.sh中配置:(注意:在for循環之後配)

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

别忘記分發/etc/profile 和hadoop-env.sh  

3.運作官方的MapReduce任務 (用官方案例的目的是測試 是否打通hdfs 和 hbase)

-- 案例一:統計Student表中有多少行資料

$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

通過HBase shell驗證

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

-- 案例二:使用MapReduce将本地資料導入到HBase

1)在本地建立一個tsv格式的檔案:fruit.tsv

1001	Apple	Red
1002	Pear	Yellow
1003	Pineapple	Yellow
           

2)建立HBase表

hbase(main):001:0> create 'fruit','info'

3)在HDFS中建立input_fruit檔案夾并上傳fruit.tsv檔案

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

執行MapReduce到HBase的fruit表中

$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \

-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \

hdfs://hadoop102:9000/input_fruit

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

使用scan指令檢視導入後的結果

hbase(main):001:0> scan ‘fruit’

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

6.3.2 自定義HBase-MapReduce1

目标:将fruit表中的一部分資料,通過MR遷入到fruit_mr表中。

分步實作:

1.建構ReadFruitMapper類,用于讀取fruit表中的資料

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {

	@Override
	protected void map(ImmutableBytesWritable key, Result value, Context context) 
	throws IOException, InterruptedException {
	//将fruit的name和color提取出來,相當于将每一行資料讀取出來放入到Put對象中。
		Put put = new Put(key.get());
		//周遊添加column行
		for(Cell cell: value.rawCells()){
			//添加/克隆列族:info
			if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
				//添加/克隆列:name
				if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
					//将該列cell加入到put對象中
					put.add(cell);
					//添加/克隆列:color
				}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
					//向該列cell加入到put對象中
					put.add(cell);
				}
			}
		}
		//将從fruit讀取到的每行資料寫入到context中作為map的輸出
		context.write(key, put);
	}
}
           

2. 建構WriteFruitMRReducer類,用于将讀取到的fruit表中的資料寫入到fruit_mr表中

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
	@Override
	protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) 
	throws IOException, InterruptedException {
		//讀出來的每一行資料寫入到fruit_mr表中
		for(Put put: values){
			context.write(NullWritable.get(), put);
		}
	}
}
           

3.建構Fruit2FruitMRRunner extends Configured implements Tool用于組裝運作Job任務

//組裝Job
	public int run(String[] args) throws Exception {
		//得到Configuration
		Configuration conf = this.getConf();
		//建立Job任務
		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(Fruit2FruitMRRunner.class);

		//配置Job
		Scan scan = new Scan();
		scan.setCacheBlocks(false);
		scan.setCaching(500);

		//設定Mapper,注意導入的是mapreduce包下的,不是mapred包下的,後者是老版本
		TableMapReduceUtil.initTableMapperJob(
		"fruit", //資料源的表名
		scan, //scan掃描控制器
		ReadFruitMapper.class,//設定Mapper類
		ImmutableBytesWritable.class,//設定Mapper輸出key類型
		Put.class,//設定Mapper輸出value值類型
		job//設定給哪個JOB
		);
		//設定Reducer
		TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);
		//設定Reduce數量,最少1個
		job.setNumReduceTasks(1);

		boolean isSuccess = job.waitForCompletion(true);
		if(!isSuccess){
			throw new IOException("Job running with error");
		}
		return isSuccess ? 0 : 1;
	}
           

4.主函數中調用運作該Job任務

public static void main( String[] args ) throws Exception{
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
System.exit(status);
}
           

5.打包運作任務

$ /opt/module/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar

 com.z.hbase.mr1.Fruit2FruitMRRunner

先掃描确定資料表中的資料

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

執行MR報錯,是因為輸出表fruit_mr需要提前被建立

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

建立輸出表

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

重複執行MR程式,檢視執行結果

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

提示:運作任務前,如果待資料導入的表不存在,則需要提前建立。

提示:maven打包指令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

MR1代碼筆記:

FruitMapper類      
package hbase.MR;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-14:27
 */
public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //擷取put對象
        Put put = new Put(key.get());
        //擷取該rowkey下所有的cell
        Cell[] cells = value.rawCells();
        //周遊,找出所需的資料
        for (Cell cell : cells) {
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                put.add(cell);
                context.write(key,put);
            }
        }

    }
}
           
FruitReducer類:      
package hbase.MR;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-14:53
 */
public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put value : values) {
            context.write(key,value);
        }
    }
}
           
FruitDriver類      
package hbase.MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-14:57
 */
public class FruitDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.擷取job
        Configuration conf = HBaseConfiguration.create();
        Job job = Job.getInstance(conf);
        //2.指定jar所在路徑
        job.setJarByClass(FruitDriver.class);
        //3.指定MR
        Scan scan = new Scan();//全表掃描
        TableMapReduceUtil.initTableMapperJob("fruit", scan, FruitMapper.class, ImmutableBytesWritable.class, Put.class, job);
        TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job);
        //4.指定Mapper的輸出
        //5.指定最終輸出
        job.setNumReduceTasks(1);//Reduce數量最少為1,該類不能被省略
        //6.指定輸入輸出路徑,456步已寫好
        //7.送出
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
           

6.3.3 自定義HBase-MapReduce2

目标:實作将HDFS中的資料寫入到HBase表中。

分步實作:

1.建構ReadFruitFromHDFSMapper于讀取HDFS中的檔案資料

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		//從HDFS中讀取的資料
		String lineValue = value.toString();
		//讀取出來的每行資料使用\t進行分割,存于String數組
		String[] values = lineValue.split("\t");
		
		//根據資料中值的含義取值
		String rowKey = values[0];
		String name = values[1];
		String color = values[2];
		
		//初始化rowKey
		ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
		
		//初始化put對象
		Put put = new Put(Bytes.toBytes(rowKey));
		
		//參數分别:列族、列、值  
        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"),  Bytes.toBytes(name)); 
        put.add(Bytes.toBytes("info"), Bytes.toBytes("color"),  Bytes.toBytes(color)); 
        
        context.write(rowKeyWritable, put);
	}
}
           

2.建構WriteFruitMRFromTxtReducer類

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
	@Override
	protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
		//讀出來的每一行資料寫入到fruit_hdfs表中
		for(Put put: values){
			context.write(NullWritable.get(), put);
		}
	}
}
           

3.建立Txt2FruitRunner組裝Job

public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();

//建立Job任務
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Txt2FruitRunner.class);
Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv");
FileInputFormat.addInputPath(job, inPath);

//設定Mapper
job.setMapperClass(ReadFruitFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

//設定Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);

//設定Reduce數量,最少1個
job.setNumReduceTasks(1);

boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}

return isSuccess ? 0 : 1;
}
           

4.調用執行Job

public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
	    int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
	    System.exit(status);
}
           

5.打包運作

$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.hbase.mr2.Txt2FruitRunner

先建表

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

确認HDFS上原始檔案的路徑

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

運作jar後掃描表fruit_hdfs

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

看到1002的color為空值,是因為建表時多打了一個制表符,Map是根據單個制表符進行切片的,删掉多餘制表符即可解決問題

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

提示:運作任務前,如果待資料導入的表不存在,則需要提前建立之。

提示:maven打包指令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

MR2代碼筆記:

HDFSMapper類:      
package hbase.MR2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 目标:實作将HDFS中的資料寫入到HBase表中。
 *
 * @author cherry
 * @create 2019-09-04-15:29
 */
public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> {
    /**
     * 原資料:1001 Apple Red
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        Put put = new Put(Bytes.toBytes(split[0]));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
        context.write(NullWritable.get(), put);
    }
}
           
HDFSReducer類:      
package hbase.MR2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
 * @author cherry
 * @create 2019-09-04-15:50
 */
public class HDFSReducer extends TableReducer<NullWritable, Put,NullWritable> {
    @Override
    protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put value : values) {
            context.write(NullWritable.get(),value);
        }
    }
}
           
HDFSDriver類:      
package hbase.MR2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author cherry
 * @create 2019-09-04-15:51
 */
public class HDFSDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        //擷取任務對象
        Job job = Job.getInstance(conf);
        job.setJarByClass(HDFSDriver.class);
        //關聯MR
        job.setMapperClass(HDFSMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);
        TableMapReduceUtil.initTableReducerJob("fruit_hdfs", HDFSReducer.class, job);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        //一定要用HBase的conf
        Configuration conf = HBaseConfiguration.create();
        int run = ToolRunner.run(conf, new HDFSDriver(), args);
        if (run == 1) System.out.println("任務失敗!");
    }
}
           

6.4 與Hive的內建

6.4.1 HBase與Hive的對比

1.Hive (分析)

(1) 資料倉庫

Hive的本質其實就相當于将HDFS中已經存儲的檔案在Mysql中做了一個雙射關系,以友善使用HQL去管理查詢。

(2) 用于資料分析、清洗

Hive适用于離線的資料分析和清洗,延遲較高。

(3) 基于HDFS、MapReduce

Hive存儲的資料依舊在DataNode上,編寫的HQL語句終将是轉換為MapReduce代碼執行。

2.HBase (存儲)

(1) 資料庫

是一種面向列存儲的非關系型資料庫。

(2) 用于存儲結構化和非結構化的資料

适用于單表非關系型資料的存儲,不适合做關聯查詢,類似JOIN等操作。

(3) 基于HDFS

資料持久化存儲的展現形式是Hfile,存放于DataNode中,被ResionServer以region的形式進行管理。

(4) 延遲較低,接入線上業務使用

面對大量的企業資料,HBase可以直線單表大量資料的存儲,同時提供了高效的資料通路速度。

6.4.2 HBase與Hive內建使用

尖叫提示:HBase與Hive的內建在最新的兩個版本中無法相容。是以,我們隻能含着淚勇敢的重新編譯:hive-hbase-handler-1.2.2.jar!!好氣!!

環境準備

因為我們後續可能會在操作Hive的同時對HBase也會産生影響,是以Hive需要持有操作HBase的Jar,那麼接下來拷貝Hive所依賴的Jar包(或者使用軟連接配接的形式)。

$ export HBASE_HOME=/opt/module/hbase
$ export HIVE_HOME=/opt/module/hive

ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar  $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

同時在hive-site.xml中修改zookeeper的屬性,如下:

<property>
  <name>hive.zookeeper.quorum</name>
  <value>hadoop102,hadoop103,hadoop104</value>
  <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
  <name>hive.zookeeper.client.port</name>
  <value>2181</value>
  <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
           

1.案例一

目标:建立Hive表,關聯HBase表,插入資料到Hive表的同時能夠影響HBase表。

分步實作:

(1) 在Hive中建立表同時關聯HBase

CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
           

第一次不會成功,報錯:FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

解決方法:編譯hive-hbase-handler-1.2.1.jar并覆寫掉原來的jar

下載下傳位址:被編譯的hive-hbase-handler-1.2.1.jar,用于在Hive中建立關聯HBase表的jar,解決建立Hive關聯HBase時報FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 錯誤的問題

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

退出并重新進入hive,重新執行HQL,執行成功:

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

進入hbase shell中檢查hbase_emp_table的表結構

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

提示:完成之後,可以分别進入Hive和HBase檢視,都生成了對應的表

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

(2) 在Hive中建立臨時中間表,用于load檔案中的資料

提示:不能将資料直接load進Hive所關聯HBase的那張表中

CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
           

(3) 向Hive中間表中load資料

hive> load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;

(4) 通過insert指令将中間表中的資料導入到Hive關聯HBase的那張表中

hive> insert into table hive_hbase_emp_table select * from emp;
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

(5) 檢視Hive以及關聯的HBase表中是否已經成功的同步插入了資料

Hive:

hive> select * from hive_hbase_emp_table;
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

HBase:

hbase> scan ‘hbase_emp_table’

注意:如果删除其中一個表,則與之關聯的另一張表也會被删除

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

2.案例二

目标:在HBase中已經存儲了某一張表hbase_emp_table,然後在Hive中建立一個外部表來關聯HBase中的hbase_emp_table這張表,使之可以借助Hive來分析HBase這張表中的資料。

注:該案例2緊跟案例1的腳步,是以完成此案例前,請先完成案例1。

分步實作:

(1) 在Hive中建立外部表

CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
           

(2) 關聯後就可以使用Hive函數進行一些分析操作了

hive (default)> select * from relevance_hbase_emp;

筆記:建立與HBase中fruit關聯表

建表HQL:

CREATE EXTERNAL TABLE relevance_hbase_fruit(
id int,
name string,
color string)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,info:name,info:color")
TBLPROPERTIES("hbase.table.name"="fruit");
           
HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

hive上檢視HBase中fruit表的資料:

HBase第二天:HBase的API操作,判斷表存在、建立删除表、擷取表中一行或指定列族資料、向表中插入資料、HBase的wordcount、自定義HBaseMapReduce、Hbase內建Hive第6章 HBase API操作

繼續閱讀