接上篇第5章的5.4:HBase第一天:HBase元件及架構、安裝HBase部署叢集、HBase的shell操作、HBase資料結構、命名空間、原理、讀寫流程、flush與合并、hbase-default.xml配置詳解
第6章 HBase API操作
6.1 環境準備
6.2 HBaseAPI
6.2.1 擷取Configuration對象
6.2.2 判斷表是否存在
6.2.3 建立表
6.2.4 删除表
6.2.5 向表中插入資料
6.2.6 删除多行資料
6.2.7 擷取所有資料
6.2.8 擷取某一行資料
6.2.9 擷取某一行指定“列族:列”的資料
6.3 MapReduce
6.3.1 官方HBase-MapReduce
6.3.2 自定義HBase-MapReduce1
6.3.3 自定義HBase-MapReduce2
6.4 與Hive的內建
6.4.1 HBase與Hive的對比
6.4.2 HBase與Hive內建使用
第6章 HBase API操作
6.1 環境準備
建立項目後在pom.xml中添加依賴:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> (這裡換成自己電腦的java路徑)
</dependency>
添加log4j
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
6.2 HBaseAPI
(用靜态方法實作具體增删該查的功能,然後在main方法中調用靜态方法,共有的内容放在該類的靜态代碼塊中)
先啟動HBase叢集
6.2.1 擷取Configuration對象
public static Configuration conf;
static{
//使用HBaseConfiguration的單例方法執行個體化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.9.102");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
6.2.2 判斷表是否存在
過時的方法
//首先判斷表是否存在要有傳回結果的,傳回結果列印到控制台,于是選用boolean類型傳回值,傳入的參數就是//String類型的單個表明就可。
Public static boolean tableExist (String tableName){
//先new 一個配置檔案 用過時的HBaseConfiguration
HBaseConfiguration configuration = new HBaseConfiguration();
//在lib目錄下jar包中hbase的預設配置檔案中搜尋localhost可找到【hbase.zookeeper.quorum】 //配置頭,指定要連接配接的節點
configuration.set("hbase.zookeeper.quorum","192.168.1.102");
//new一個 用戶端 用過時的HBaseAdmin
HBaseAdmin admin = new HBaseAdmin(configuration);
用用戶端打點調用tableExists方法
return admin.tableExists(tableName);
}
public static boolean isTableExist(String tableName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException{
//在HBase中管理、通路表需要先建立HBaseAdmin對象
//Connection connection = ConnectionFactory.createConnection(conf);
//HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
HBaseAdmin admin = new HBaseAdmin(conf);
return admin.tableExists(tableName);
}
新API完整公共代碼(筆記)
package hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
/**
* @author cherry
* @create 2019-09-03-11:32
*/
public class TestHBase {
static Connection connection = null;
static Admin admin = null;
static {
//擷取HBase配置資訊
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop102");
//擷取admin
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
try {
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
//關閉資源
public static void close(Connection connection, Admin admin) throws IOException {
if (connection != null) {
connection.close();
}
if (admin != null) {
admin.close();
}
}
//判斷表是否存在
public static boolean tableExist(String tableName) throws IOException {
return admin.tableExists(TableName.valueOf(tableName));
}
public static void main(String[] args) throws IOException {
System.out.println(tableExist("student"));
}
}
6.2.3 建立表
public static void createTable(String tableName, String... columnFamily) throws
MasterNotRunningException, ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
//判斷表是否存在
if(isTableExist(tableName)){
System.out.println("表" + tableName + "已存在");
//System.exit(0);
}else{
//建立表屬性對象,表名需要轉位元組
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
//建立多個列族
for(String cf : columnFamily){
descriptor.addFamily(new HColumnDescriptor(cf));
}
//根據對表的配置,建立表
admin.createTable(descriptor);
System.out.println("表" + tableName + "建立成功!");
}
}
建表代碼筆記
//建立表
private static void createTable(String tableName, List<String> columnFamilys) throws IOException {
//先判斷表是否存在
if (!tableExist(tableName)) {
System.out.println("表 " + tableName + "不存在!可以建立。");
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String columnFamily : columnFamilys) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
hTableDescriptor.addFamily(hColumnDescriptor);
}
admin.createTable(hTableDescriptor);
}else System.out.println("表 " + tableName + "存在!不能被建立。");
}
public static void main(String[] args) throws IOException {
//判斷表是否存在,以下判斷表是否建立成功
System.out.println(tableExist("staff"));
createTable("staff", Collections.singletonList("f1"));
System.out.println(tableExist("staff"));
}
通過HBase shell檢視
建立多個列族的表(main方法中測試)
ArrayList<String> cfs = new ArrayList<>();
cfs.add("f1");
cfs.add("f2");
cfs.add("f3");
createTable("staff1",cfs);
close(connection,admin);
HBase shell檢視
6.2.4 删除表
public static void dropTable(String tableName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
if(isTableExist(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("表" + tableName + "删除成功!");
}else{
System.out.println("表" + tableName + "不存在!");
}
}
删除表代碼筆記
private static void deleteTable(String tableName) throws IOException {
//判斷表存在
if (tableExist(tableName)) {
System.out.println("表存在,現在删除");
//先disable,讓表不可用
admin.disableTable(TableName.valueOf(tableName));
//删除表
admin.deleteTable(TableName.valueOf(tableName));
} else System.out.println("表不存在");
}
public static void main(String[] args) throws IOException {
deleteTable("staff1");
}
HBase shell檢視staff1已被删除
6.2.5 向表中插入資料
public static void addRowData(String tableName, String rowKey, String columnFamily, String
column, String value) throws IOException{
//建立HTable對象
HTable hTable = new HTable(conf, tableName);
//向表中插入資料
Put put = new Put(Bytes.toBytes(rowKey));
//向Put對象中組裝資料
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
System.out.println("插入資料成功");
}
插入資料代碼筆記
private static void putData(String tableName, String rowKey, String cf, String cn, String value) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
//利用HBase的Bytes工具類轉化rowKey
Put put = new Put(Bytes.toBytes(rowKey));
//添加列族、列名、值
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
//執行put操作
table.put(put);
table.close();
}
public static void main(String[] args) throws IOException {
//插入一條資料
putData("staff","1001","f1","name","cr");
}
6.2.6 删除多行資料
public static void deleteMultiRow(String tableName, String... rows) throws IOException{
HTable hTable = new HTable(conf, tableName);
List<Delete> deleteList = new ArrayList<Delete>();
for(String row : rows){
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
}
删除一行資料堂筆記:
private static void deleteData(String tableName, String rowkey, String cf, String cn) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
//建立要被删除的對象
Delete delete = new Delete(Bytes.toBytes(rowkey));
//給delete對象添加具體的列族、列
delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//删除
table.delete(delete);
table.close();
}
public static void main(String[] args) throws IOException {
//删除一條資料
deleteData("student","1001","info","age");
}
可以看到age=20變成了age=18,是因為age=20是之前示範HBase shell時新添加的version3版本,Delete隻删除最後一個版本
再次執行上述代碼才發現age被徹底删除
而在HBase shell中一次會删掉所有版本的資料,api說明:
删除多條資料代碼:
private static void deleteDatas(String tableName, String... rowKeys) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
ArrayList<Delete> deletes = new ArrayList<>();
for (String rowKey : rowKeys) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
//添加多個要被删除的對象
deletes.add(delete);
}
table.delete(deletes);
table.close();
}
public static void main(String[] args) throws IOException {
//删除多行資料
deleteDatas("student","1001","1002");
}
HBase shell檢視1001、1002行鍵中所有資料已被删除
6.2.7 擷取所有資料
public static void getAllRows(String tableName) throws IOException{
HTable hTable = new HTable(conf, tableName);
//得到用于掃描region的對象
Scan scan = new Scan();
//使用HTable得到resultcanner實作類的對象
ResultScanner resultScanner = hTable.getScanner(scan);
for(Result result : resultScanner){
Cell[] cells = result.rawCells();
for(Cell cell : cells){
//得到rowkey
System.out.println("行鍵:" + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
全表掃描代碼筆記:
private static void getScanData(String tableName) throws IOException {
//擷取table對象
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();//空參對象進行全表掃描
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
//列印獲得到的資料
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
table.close();
}
public static void main(String[] args) throws IOException {
//掃描整張表
getScanData("student");
}
6.2.8 擷取某一行資料
public static void getRow(String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
//get.setMaxVersions();顯示所有版本
//get.setTimeStamp();顯示指定時間戳的版本
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行鍵:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("時間戳:" + cell.getTimestamp());
}
}
擷取一行資料代碼筆記
public static void getData(String tableName, String rowkey) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(String.valueOf(rowkey)));
Result result = table.get(get);
Cell[] cells = result.rawCells();
//列印獲得到的資料
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
table.close();
}
public static void main(String[] args) throws IOException {
getData("staff","1001");
}
6.2.9 擷取某一行指定“列族:列”的資料
public static void getRowQualifier(String tableName, String rowKey, String family, String
qualifier) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行鍵:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
擷取指定列族的一行資料代碼筆記:
private static void getDataByCN(String tableName, String rowkey, String cf, String cn) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//擷取多個版本的資料
get.setMaxVersions(3);
Result result = table.get(get);
Cell[] cells = result.rawCells();
//列印獲得到的資料
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
table.close();
}
public static void main(String[] args) throws IOException {
//擷取一樣資料(指定列族:列)
getDataByCN("student","1002","info","name");
}
測試student表的截圖
以上測試代碼合集
package hbase.API;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* HBase的API操作,可作為工具類使用
* @author cherry
* @create 2019-09-03-11:32
*/
@SuppressWarnings("all")
public class TestHBase {
static Connection connection = null;
static Admin admin = null;
static {
//擷取HBase配置資訊
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop102");
//擷取admin
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
try {
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 關閉資源
*
* @param connection
* @param admin
* @throws IOException
*/
public static void close(Connection connection, Admin admin) throws IOException {
if (connection != null) {
connection.close();
}
if (admin != null) {
admin.close();
}
}
/**
* 判斷表是否存在
*
* @param tableName
* @return
* @throws IOException
*/
public static boolean tableExist(String tableName) throws IOException {
return admin.tableExists(TableName.valueOf(tableName));
}
/**
* 建立表
*
* @param tableName
* @param columnFamilys
* @throws IOException
*/
private static void createTable(String tableName, List<String> columnFamilys) throws IOException {
//先判斷表是否存在
if (!tableExist(tableName)) {
System.out.println("表 " + tableName + "不存在!可以建立。");
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String columnFamily : columnFamilys) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
hTableDescriptor.addFamily(hColumnDescriptor);
}
admin.createTable(hTableDescriptor);
} else System.out.println("表 " + tableName + "存在!不能被建立。");
}
/**
* 删除表
*
* @param tableName
* @throws IOException
*/
private static void deleteTable(String tableName) throws IOException {
//判斷表存在
if (tableExist(tableName)) {
System.out.println("表存在,現在删除");
//先disable,讓表不可用
admin.disableTable(TableName.valueOf(tableName));
//删除表
admin.deleteTable(TableName.valueOf(tableName));
} else System.out.println("表不存在");
}
/**
* 插入一條資料
*
* @param tableName
* @param rowKey
* @param cf
* @param cn
* @param value
* @throws IOException
*/
private static void putData(String tableName, String rowKey, String cf, String cn, String value) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
//利用HBase的Bytes工具類轉化rowKey
Put put = new Put(Bytes.toBytes(rowKey));
//添加列族、列名、值
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
//執行put操作
table.put(put);
table.close();
}
/**
* 查詢一條資料
*
* @param tableName
* @param rowkey
* @throws IOException
*/
public static void getData(String tableName, String rowkey) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(String.valueOf(rowkey)));
Result result = table.get(get);
Cell[] cells = result.rawCells();
//列印獲得到的資料
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
table.close();
}
/**
* 擷取一行資料值(指定列族:列)
*
* @param tableName
* @param rowkey
* @param cf
* @param cn
* @throws IOException
*/
private static void getDataByCN(String tableName, String rowkey, String cf, String cn) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
//get.addFamily();//擷取一個列族下的所有資料
get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//擷取多個版本的資料
get.setMaxVersions(3);
Result result = table.get(get);
Cell[] cells = result.rawCells();
//列印獲得到的資料
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
table.close();
}
/**
* 全表掃描(scan)
*
* @param tableName
* @throws IOException
*/
private static void getScanData(String tableName) throws IOException {
//擷取table對象
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();//空參對象進行全表掃描
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
//列印獲得到的資料
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",VALUE:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
table.close();
}
/**
* 删除一條資料(delete)
*
* @param tableName
* @param rowkey
* @param cf
* @param cn
* @throws IOException
*/
private static void deleteData(String tableName, String rowkey, String cf, String cn) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
//建立要被删除的對象
Delete delete = new Delete(Bytes.toBytes(rowkey));
//給delete對象添加具體的列族、列
delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//删除
table.delete(delete);
table.close();
}
/**
* 删除多條資料
*
* @param tableName
* @param rowKeys
*/
private static void deleteDatas(String tableName, String... rowKeys) throws IOException {
//擷取表對象
Table table = connection.getTable(TableName.valueOf(tableName));
ArrayList<Delete> deletes = new ArrayList<>();
for (String rowKey : rowKeys) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
//添加多個要被删除的對象
deletes.add(delete);
}
table.delete(deletes);
table.close();
}
/**
* main方法用來測試上述方法
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
/*//插入三個版本的資料
putData("student","1001","info","name", "cr");
putData("student","1001","info","name", "wq");
putData("student","1001","info","name", "zs");*/
/*//删除多行資料
deleteDatas("student","1001","1002");*/
/* //删除一條資料
deleteData("student","1001","info","age");*/
/*//掃描整張表
getScanData("student");*/
//擷取一樣資料(指定列族:列)
//getDataByCN("student", "1001", "info", "name");
//getData("student","1001");
/* //插入一條資料
putData("staff", "1001", "f1", "name", "cr");*/
//deleteTable("staff1");
//判斷表是否存在,以下判斷表是否建立成功
/*System.out.println(tableExist("staff"));
createTable("staff", Collections.singletonList("f1"));
System.out.println(tableExist("staff"));*/
/*ArrayList<String> cfs = new ArrayList<>();
cfs.add("f1");
cfs.add("f2");
cfs.add("f3");
createTable("staff1",cfs);
close(connection,admin);*/
}
}
6.3 MapReduce
通過HBase的相關JavaAPI,我們可以實作伴随HBase操作的MapReduce過程,比如使用MapReduce将資料從本地檔案系統導入到HBase的表中,比如我們從HBase中讀取一些原始資料後使用MapReduce做資料分析。
6.3.1 官方HBase-MapReduce
1.檢視HBase的MapReduce任務的執行 (hbase想從hdfs讀資料,所需要的jar包)
$ bin/hbase mapredcp
2.環境變量的導入
(1)執行環境變量的導入(臨時生效,在指令行執行下述操作)
$ export HBASE_HOME=/opt/module/hbase-1.3.1
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
反引号指的是把執行結果指派,也就是把HBASE_HOME下面的這個指令所顯示的jar包給到HADOOP_CLASSPATH下面
(2)永久生效:在/etc/profile配置
export HBASE_HOME=/opt/module/hbase
export HADOOP_HOME=/opt/module/hadoop-2.7.2
并在hadoop-env.sh中配置:(注意:在for循環之後配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
别忘記分發/etc/profile 和hadoop-env.sh
3.運作官方的MapReduce任務 (用官方案例的目的是測試 是否打通hdfs 和 hbase)
-- 案例一:統計Student表中有多少行資料
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
通過HBase shell驗證
-- 案例二:使用MapReduce将本地資料導入到HBase
1)在本地建立一個tsv格式的檔案:fruit.tsv
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
2)建立HBase表
hbase(main):001:0> create 'fruit','info'
3)在HDFS中建立input_fruit檔案夾并上傳fruit.tsv檔案
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
執行MapReduce到HBase的fruit表中
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit
使用scan指令檢視導入後的結果
hbase(main):001:0> scan ‘fruit’
6.3.2 自定義HBase-MapReduce1
目标:将fruit表中的一部分資料,通過MR遷入到fruit_mr表中。
分步實作:
1.建構ReadFruitMapper類,用于讀取fruit表中的資料
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//将fruit的name和color提取出來,相當于将每一行資料讀取出來放入到Put對象中。
Put put = new Put(key.get());
//周遊添加column行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//将該列cell加入到put對象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向該列cell加入到put對象中
put.add(cell);
}
}
}
//将從fruit讀取到的每行資料寫入到context中作為map的輸出
context.write(key, put);
}
}
2. 建構WriteFruitMRReducer類,用于将讀取到的fruit表中的資料寫入到fruit_mr表中
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//讀出來的每一行資料寫入到fruit_mr表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
3.建構Fruit2FruitMRRunner extends Configured implements Tool用于組裝運作Job任務
//組裝Job
public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//建立Job任務
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Fruit2FruitMRRunner.class);
//配置Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//設定Mapper,注意導入的是mapreduce包下的,不是mapred包下的,後者是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit", //資料源的表名
scan, //scan掃描控制器
ReadFruitMapper.class,//設定Mapper類
ImmutableBytesWritable.class,//設定Mapper輸出key類型
Put.class,//設定Mapper輸出value值類型
job//設定給哪個JOB
);
//設定Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);
//設定Reduce數量,最少1個
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
4.主函數中調用運作該Job任務
public static void main( String[] args ) throws Exception{
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
System.exit(status);
}
5.打包運作任務
$ /opt/module/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar
com.z.hbase.mr1.Fruit2FruitMRRunner
先掃描确定資料表中的資料
執行MR報錯,是因為輸出表fruit_mr需要提前被建立
建立輸出表
重複執行MR程式,檢視執行結果
提示:運作任務前,如果待資料導入的表不存在,則需要提前建立。
提示:maven打包指令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
MR1代碼筆記:
FruitMapper類
package hbase.MR;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* @author cherry
* @create 2019-09-04-14:27
*/
public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//擷取put對象
Put put = new Put(key.get());
//擷取該rowkey下所有的cell
Cell[] cells = value.rawCells();
//周遊,找出所需的資料
for (Cell cell : cells) {
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
context.write(key,put);
}
}
}
}
FruitReducer類:
package hbase.MR;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import java.io.IOException;
/**
* @author cherry
* @create 2019-09-04-14:53
*/
public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(key,value);
}
}
}
FruitDriver類
package hbase.MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
/**
* @author cherry
* @create 2019-09-04-14:57
*/
public class FruitDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.擷取job
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf);
//2.指定jar所在路徑
job.setJarByClass(FruitDriver.class);
//3.指定MR
Scan scan = new Scan();//全表掃描
TableMapReduceUtil.initTableMapperJob("fruit", scan, FruitMapper.class, ImmutableBytesWritable.class, Put.class, job);
TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job);
//4.指定Mapper的輸出
//5.指定最終輸出
job.setNumReduceTasks(1);//Reduce數量最少為1,該類不能被省略
//6.指定輸入輸出路徑,456步已寫好
//7.送出
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
6.3.3 自定義HBase-MapReduce2
目标:實作将HDFS中的資料寫入到HBase表中。
分步實作:
1.建構ReadFruitFromHDFSMapper于讀取HDFS中的檔案資料
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//從HDFS中讀取的資料
String lineValue = value.toString();
//讀取出來的每行資料使用\t進行分割,存于String數組
String[] values = lineValue.split("\t");
//根據資料中值的含義取值
String rowKey = values[0];
String name = values[1];
String color = values[2];
//初始化rowKey
ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
//初始化put對象
Put put = new Put(Bytes.toBytes(rowKey));
//參數分别:列族、列、值
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
context.write(rowKeyWritable, put);
}
}
2.建構WriteFruitMRFromTxtReducer類
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//讀出來的每一行資料寫入到fruit_hdfs表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
3.建立Txt2FruitRunner組裝Job
public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//建立Job任務
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Txt2FruitRunner.class);
Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv");
FileInputFormat.addInputPath(job, inPath);
//設定Mapper
job.setMapperClass(ReadFruitFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//設定Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);
//設定Reduce數量,最少1個
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
4.調用執行Job
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
System.exit(status);
}
5.打包運作
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.hbase.mr2.Txt2FruitRunner
先建表
确認HDFS上原始檔案的路徑
運作jar後掃描表fruit_hdfs
看到1002的color為空值,是因為建表時多打了一個制表符,Map是根據單個制表符進行切片的,删掉多餘制表符即可解決問題
提示:運作任務前,如果待資料導入的表不存在,則需要提前建立之。
提示:maven打包指令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
MR2代碼筆記:
HDFSMapper類:
package hbase.MR2;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 目标:實作将HDFS中的資料寫入到HBase表中。
*
* @author cherry
* @create 2019-09-04-15:29
*/
public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> {
/**
* 原資料:1001 Apple Red
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
context.write(NullWritable.get(), put);
}
}
HDFSReducer類:
package hbase.MR2;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
/**
* @author cherry
* @create 2019-09-04-15:50
*/
public class HDFSReducer extends TableReducer<NullWritable, Put,NullWritable> {
@Override
protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(NullWritable.get(),value);
}
}
}
HDFSDriver類:
package hbase.MR2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author cherry
* @create 2019-09-04-15:51
*/
public class HDFSDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
//擷取任務對象
Job job = Job.getInstance(conf);
job.setJarByClass(HDFSDriver.class);
//關聯MR
job.setMapperClass(HDFSMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob("fruit_hdfs", HDFSReducer.class, job);
FileInputFormat.setInputPaths(job, new Path(args[0]));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//一定要用HBase的conf
Configuration conf = HBaseConfiguration.create();
int run = ToolRunner.run(conf, new HDFSDriver(), args);
if (run == 1) System.out.println("任務失敗!");
}
}
6.4 與Hive的內建
6.4.1 HBase與Hive的對比
1.Hive (分析)
(1) 資料倉庫
Hive的本質其實就相當于将HDFS中已經存儲的檔案在Mysql中做了一個雙射關系,以友善使用HQL去管理查詢。
(2) 用于資料分析、清洗
Hive适用于離線的資料分析和清洗,延遲較高。
(3) 基于HDFS、MapReduce
Hive存儲的資料依舊在DataNode上,編寫的HQL語句終将是轉換為MapReduce代碼執行。
2.HBase (存儲)
(1) 資料庫
是一種面向列存儲的非關系型資料庫。
(2) 用于存儲結構化和非結構化的資料
适用于單表非關系型資料的存儲,不适合做關聯查詢,類似JOIN等操作。
(3) 基于HDFS
資料持久化存儲的展現形式是Hfile,存放于DataNode中,被ResionServer以region的形式進行管理。
(4) 延遲較低,接入線上業務使用
面對大量的企業資料,HBase可以直線單表大量資料的存儲,同時提供了高效的資料通路速度。
6.4.2 HBase與Hive內建使用
尖叫提示:HBase與Hive的內建在最新的兩個版本中無法相容。是以,我們隻能含着淚勇敢的重新編譯:hive-hbase-handler-1.2.2.jar!!好氣!!
環境準備
因為我們後續可能會在操作Hive的同時對HBase也會産生影響,是以Hive需要持有操作HBase的Jar,那麼接下來拷貝Hive所依賴的Jar包(或者使用軟連接配接的形式)。
$ export HBASE_HOME=/opt/module/hbase
$ export HIVE_HOME=/opt/module/hive
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
同時在hive-site.xml中修改zookeeper的屬性,如下:
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
1.案例一
目标:建立Hive表,關聯HBase表,插入資料到Hive表的同時能夠影響HBase表。
分步實作:
(1) 在Hive中建立表同時關聯HBase
CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
第一次不會成功,報錯:FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V
解決方法:編譯hive-hbase-handler-1.2.1.jar并覆寫掉原來的jar
下載下傳位址:被編譯的hive-hbase-handler-1.2.1.jar,用于在Hive中建立關聯HBase表的jar,解決建立Hive關聯HBase時報FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 錯誤的問題
退出并重新進入hive,重新執行HQL,執行成功:
進入hbase shell中檢查hbase_emp_table的表結構
提示:完成之後,可以分别進入Hive和HBase檢視,都生成了對應的表
(2) 在Hive中建立臨時中間表,用于load檔案中的資料
提示:不能将資料直接load進Hive所關聯HBase的那張表中
CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
(3) 向Hive中間表中load資料
hive> load data local inpath '/home/admin/softwares/data/emp.txt' into table emp; |
(4) 通過insert指令将中間表中的資料導入到Hive關聯HBase的那張表中
hive> insert into table hive_hbase_emp_table select * from emp; |
(5) 檢視Hive以及關聯的HBase表中是否已經成功的同步插入了資料
Hive:
hive> select * from hive_hbase_emp_table; |
HBase:
hbase> scan ‘hbase_emp_table’ 注意:如果删除其中一個表,則與之關聯的另一張表也會被删除 |
2.案例二
目标:在HBase中已經存儲了某一張表hbase_emp_table,然後在Hive中建立一個外部表來關聯HBase中的hbase_emp_table這張表,使之可以借助Hive來分析HBase這張表中的資料。
注:該案例2緊跟案例1的腳步,是以完成此案例前,請先完成案例1。
分步實作:
(1) 在Hive中建立外部表
CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
(2) 關聯後就可以使用Hive函數進行一些分析操作了
hive (default)> select * from relevance_hbase_emp;
筆記:建立與HBase中fruit關聯表
建表HQL:
CREATE EXTERNAL TABLE relevance_hbase_fruit(
id int,
name string,
color string)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,info:name,info:color")
TBLPROPERTIES("hbase.table.name"="fruit");
hive上檢視HBase中fruit表的資料: