hbase是hadoop的数据库,能够对大数据提供随机、实时读写访问。他是开源的,分布式的,多版本的,面向列的,存储模型。
在讲解的时候我首先给大家讲解一下hbase的整体结构,如下图:
hbase master是服务器负责管理所有的hregion服务器,hbase master并不存储hbase服务器的任何数据,hbase逻辑上的表可能会划分为多个hregion,然后存储在hregion server群中,hbase master server中存储的是从数据到hregion server的映射。
一台机器只能运行一个hregion服务器,数据的操作会记录在hlog中,在读取数据时候,hregion会先访问hmemcache缓存,如果 缓存中没有数据才回到hstore中上找,没一个列都会有一个hstore集合,每个hstore集合包含了很多具体的hstorefile文件,这些文 件是b树结构的,方便快速读取。
再看下hbase数据物理视图如下:
row key
timestamp
column family
uri
parser
r1
t3
url=http://www.taobao.com
title=天天特价
t2
host=taobao.com
t1
r2
t5
url=http://www.alibaba.com
content=每天…
t4
host=alibaba.com
Ø row key: 行键,table的主键,table中的记录按照row key排序
Ø timestamp: 时间戳,每次数据操作对应的时间戳,可以看作是数据的version number
Ø column family:列簇,table在水平方向有一个或者多个column family组成,一个column family中可以由任意多个column组成,即column family支持动态扩展,无需预先定义column的数量以及类型,所有column均以二进制格式存储,用户需要自行进行类型转换。
了解了hbase的体系结构和hbase数据视图够,现在让我们一起看看怎样通过java来操作hbase数据吧!
先说说具体的api先,如下
hbaseconfiguration是每一个hbase client都会使用到的对象,它代表的是hbase配置信息。它有两种构造方式:
public hbaseconfiguration()
public hbaseconfiguration(final configuration c)
默认的构造方式会尝试从hbase-default.xml和hbase-site.xml中读取配置。如果classpath没有这两个文件,就需要你自己设置配置。
configuration hbase_config = new configuration();
hbase_config.set(“hbase.zookeeper.quorum”, “zkserver”);
hbase_config.set(“hbase.zookeeper.property.clientport”, “2181″);
hbaseconfiguration cfg = new hbaseconfiguration(hbase_config);
创建表
创建表是通过hbaseadmin对象来操作的。hbaseadmin负责表的meta信息处理。hbaseadmin提供了createtable这个方法:
public void createtable(htabledescriptor desc)
htabledescriptor 代表的是表的schema, 提供的方法中比较有用的有
setmaxfilesize,指定最大的region size
setmemstoreflushsize 指定memstore flush到hdfs上的文件大小
增加family通过 addfamily方法
public void addfamily(final hcolumndescriptor family)
hcolumndescriptor 代表的是column的schema,提供的方法比较常用的有
settimetolive:指定最大的ttl,单位是ms,过期数据会被自动删除。
setinmemory:指定是否放在内存中,对小表有用,可用于提高效率。默认关闭
setbloomfilter:指定是否使用bloomfilter,可提高随机查询效率。默认关闭
setcompressiontype:设定数据压缩类型。默认无压缩。
setmaxversions:指定数据最大保存的版本个数。默认为3。
一个简单的例子,创建了4个family的表:
hbaseadmin hadmin = new hbaseadmin(hbaseconfig);
htabledescriptor t = new htabledescriptor(tablename);
t.addfamily(new hcolumndescriptor(“f1″));
t.addfamily(new hcolumndescriptor(“f2″));
t.addfamily(new hcolumndescriptor(“f3″));
t.addfamily(new hcolumndescriptor(“f4″));
hadmin.createtable(t);
删除表
删除表也是通过hbaseadmin来操作,删除表之前首先要disable表。这是一个非常耗时的操作,所以不建议频繁删除表。
disabletable和deletetable分别用来disable和delete表。
example:
if (hadmin.tableexists(tablename)) {
hadmin.disabletable(tablename);
hadmin.deletetable(tablename);
}
查询数据
查询分为单条随机查询和批量查询。
单条查询是通过rowkey在table中查询某一行的数据。htable提供了get方法来完成单条查询。
批量查询是通过制定一段rowkey的范围来查询。htable提供了个getscanner方法来完成批量查询。
public result get(final get get)
public resultscanner getscanner(final scan scan)
get对象包含了一个get查询需要的信息。它的构造方法有两种:
public get(byte [] row)
public get(byte [] row, rowlock rowlock)
rowlock是为了保证读写的原子性,你可以传递一个已经存在rowlock,否则hbase会自动生成一个新的rowlock。
scan对象提供了默认构造函数,一般使用默认构造函数。
get/scan的常用方法有:
addfamily/addcolumn:指定需要的family或者column,如果没有调用任何addfamily或者column,会返回所有的columns.
setmaxversions:指定最大的版本个数。如果不带任何参数调用setmaxversions,表示取所有的版本。如果不掉用setmaxversions,只会取到最新的版本。
settimerange:指定最大的时间戳和最小的时间戳,只有在此范围内的cell才能被获取。
settimestamp:指定时间戳。
setfilter:指定filter来过滤掉不需要的信息
scan特有的方法:
setstartrow:指定开始的行。如果不调用,则从表头开始。
setstoprow:指定结束的行(不含此行)。
setbatch:指定最多返回的cell数目。用于防止一行中有过多的数据,导致outofmemory错误。
resultscanner是result的一个容器,每次调用resultscanner的next方法,会返回result.
public result next() throws ioexception;
public result [] next(int nbrows) throws ioexception;
result代表是一行的数据。常用方法有:
getrow:返回rowkey
raw:返回所有的key value数组。
getvalue:按照column来获取cell的值
scan s = new scan();
s.setmaxversions();
resultscanner ss = table.getscanner(s);
for(result r:ss){
system.out.println(new string(r.getrow()));
for(keyvalue kv:r.raw()){
system.out.println(new string(kv.getcolumn()));
}
插入数据
htable通过put方法来插入数据。
public void put(final put put) throws ioexception
public void put(final list puts) throws ioexception
可以传递单个批put对象或者list put对象来分别实现单条插入和批量插入。
put提供了3种构造方式:
public put(byte [] row)
public put(byte [] row, rowlock rowlock)
public put(put puttocopy)
put常用的方法有:
add:增加一个cell
settimestamp:指定所有cell默认的timestamp,如果一个cell没有指定timestamp,就会用到这个值。如果没有调用,hbase会将当前时间作为未指定timestamp的cell的timestamp.
setwritetowal: wal是write ahead log的缩写,指的是hbase在插入操作前是否写log。默认是打开,关掉会提高性能,但是如果系统出现故障(负责插入的region server挂掉),数据可能会丢失。
另外htable也有两个方法也会影响插入的性能
setautoflash: autoflush指的是在每次调用hbase的put操作,是否提交到hbase server。默认是true,每次会提交。如果此时是单条插入,就会有更多的io,从而降低性能.
setwritebuffersize: write buffer size在autoflush为false的时候起作用,默认是2mb,也就是当插入数据超过2mb,就会自动提交到server
htable table = new htable(hbaseconfig, tablename);
table.setautoflush(autoflush);
list lp = new arraylist();
int count = 10000;
byte[] buffer = new byte[1024];
random r = new random();
for (int i = 1; i <= count; ++i) {
put p = new put(string.format(“row%09d”,i).getbytes());
r.nextbytes(buffer);
p.add(“f1″.getbytes(), null, buffer);
p.add(“f2″.getbytes(), null, buffer);
p.add(“f3″.getbytes(), null, buffer);
p.add(“f4″.getbytes(), null, buffer);
p.setwritetowal(wal);
lp.add(p);
if(i%1000==0){
table.put(lp);
lp.clear();
}
删除数据
htable 通过delete方法来删除数据。
public void delete(final delete delete)
delete构造方法有:
public delete(byte [] row)
public delete(byte [] row, long timestamp, rowlock rowlock)
public delete(final delete d)
delete常用方法有
deletefamily/deletecolumns:指定要删除的family或者column的数据。如果不调用任何这样的方法,将会删除整行。
注意:如果某个cell的timestamp高于当前时间,这个cell将不会被删除,仍然可以查出来。
htable table = new htable(hbaseconfig, “mytest”);
delete d = new delete(“row1″.getbytes());
table.delete(d)
切分表
hbaseadmin提供split方法来将table 进行split.
public void split(final string tablenameorregionname)
如果提供的tablename,那么会将table所有region进行split ;如果提供的region name,那么只会split这个region.
由于split是一个异步操作,我们并不能确切的控制region的个数。
public void split(string tablename,int number,int timeout) throws exception {
configuration hbase_config = new configuration();
hbase_config.set(“hbase.zookeeper.quorum”, globalconf.zookeeper_quorum);
hbase_config.set(“hbase.zookeeper.property.clientport”, globalconf.zookeeper_port);
hbaseconfiguration cfg = new hbaseconfiguration(hbase_config);
hbaseadmin hadmin = new hbaseadmin(cfg);
htable htable = new htable(cfg,tablename);
int oldsize = 0;
t = system.currenttimemillis();
while(true){
int size = htable.getregionsinfo().size();
logger.info(“the region number=”+size);
if(size>=number ) break;
if(size!=oldsize){
hadmin.split(htable.gettablename());
oldsize = size;
} else if(system.currenttimemillis()-t>timeout){
break;
thread.sleep(1000*10);