在《Hadoop 2.2.0和HBase 0.98.11僞分布式》中已經安裝好了僞分布式的HBase,而且可以啟動起來了。
執行hbase shell指令進入shell,出現SLF4J: Class path contains multiple SLF4J bindings.錯誤,将其中一個SLF4J删掉即可:
mv apple/hbase/lib/slf4j-log4j12-1.6.4.jar apple/hbase/lib/slf4j-log4j12-1.6.4.jar.bak
然後依次執行下表中的指令來測試HBase:
作用 | 指令 |
---|---|
檢視有哪些表 | |
建立表test,隻有一個data列族 | |
插入資料 | |
掃描表 | |
禁用表 | |
删除表 | |
java API
通過java API來使用HBase。
先試試能不能建一個表。
import java.io.IOException;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
public class Test
{
public static void main(String[] args)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException
{
Configuration conf = new Configuration();//從配置檔案生成
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor tableDesc = new HTableDescriptor("test");
tableDesc.addFamily(new HColumnDescriptor("info"));
admin.createTable(tableDesc);
}
}
javac -cp $(hbase classpath) Test.java
java -cp .:$(hbase classpath) Test
這是我嘗試了多次才給整對的,尤其是
.:$(hbase classpath)
,如果沒有那個點,會提示找不到類,就是不知道Test是什麼東西。
帶插入資料的。
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
public class Test
{
public static void main(String[] args)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException
{
String tableName = "test";
String familyName = "info";
//建立表
Configuration conf = new Configuration();
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
tableDesc.addFamily(new HColumnDescriptor(familyName));
admin.createTable(tableDesc);
//單條插入
HTable table = new HTable(conf, tableName);
Put putRow = new Put("first".getBytes());
putRow.add(familyName.getBytes(), "name".getBytes(), "zhangsan".getBytes());
putRow.add(familyName.getBytes(), "age".getBytes(), "24".getBytes());
putRow.add(familyName.getBytes(), "city".getBytes(), "chengde".getBytes());
putRow.add(familyName.getBytes(), "sex".getBytes(), "male".getBytes());
table.put(putRow);
//多條插入
List<Put> list = new ArrayList<Put>();
Put p = null;
p = new Put("second".getBytes());
p.add(familyName.getBytes(), "name".getBytes(), "wangwu".getBytes());
p.add(familyName.getBytes(), "sex".getBytes(), "male".getBytes());
p.add(familyName.getBytes(), "city".getBytes(), "beijing".getBytes());
p.add(familyName.getBytes(), "age".getBytes(), "25".getBytes());
list.add(p);
p = new Put("third".getBytes());
p.add(familyName.getBytes(), "name".getBytes(), "zhangliu".getBytes());
p.add(familyName.getBytes(), "sex".getBytes(), "male".getBytes());
p.add(familyName.getBytes(), "city".getBytes(), "handan".getBytes());
p.add(familyName.getBytes(), "age".getBytes(), "28".getBytes());
list.add(p);
p = new Put("fourth".getBytes());
p.add(familyName.getBytes(), "name".getBytes(), "liqing".getBytes());
p.add(familyName.getBytes(), "sex".getBytes(), "female".getBytes());
p.add(familyName.getBytes(), "city".getBytes(), "guangzhou".getBytes());
p.add(familyName.getBytes(), "age".getBytes(), "18".getBytes());
list.add(p);
table.put(list);
}
}
attention:單條插入時,所有列的時間戳都是一樣的;多條插入時,多行資料的所有列的時間戳也都是一樣的。
其他API見參考文獻。
mr程式操作HBase
對前邊的wordcount進行改進,将結果存儲到HBase上,先來看代碼。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
public class WordCount
{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable();
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends TableReducer<Text, IntWritable, Put>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = ;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
Put putrow = new Put(key.toString().getBytes());
putrow.add("info".getBytes(), "name".getBytes(),result.toString().getBytes());
context.write(null, putrow);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[]));
TableMapReduceUtil.initTableReducerJob("test", IntSumReducer.class, job);
System.exit(job.waitForCompletion(true) ? : );
}
}
javac -cp $(hbase classpath) -d temp WordCount.java
jar -cvf sort.jar -C temp .
然後運作MapReduce程式,參考文獻中有兩種運作的方法,這裡選用第2種。打開hbase-env.sh,line 32左右,将其改為:
export HBASE_CLASSPATH=/home/tom/sort.jar$HBASE_CLASSPATH
因為
$HBASE_CLASSPATH
本身末尾就帶了個冒号,這種寫法把sort.jar放到了
$HBASE_CLASSPATH
後面,是以沒問題。
然後執行
hbase WordCount /input/
現在至少這個mr程式可以運作了,細節稍後描述。
在main函數裡建立表而不是用已存在的表:
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
public static void main(String[] args) throws Exception {
String tablename = "wordcount";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tablename)) {
System.out.println("table exists!recreating.......");
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor tcd = new HColumnDescriptor("info");
htd.addFamily(tcd);//建立列族
admin.createTable(htd);//建立表
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != ) {
System.err.println("Usage: WordCountHbase <in>");
System.exit();
}
Job job = new Job(conf, "WordCountHbase");
job.setJarByClass(WordCount.class);
//使用TokenizerMapper類完成Map過程;
job.setMapperClass(TokenizerMapper.class);
TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job);
//設定任務資料的輸入路徑
FileInputFormat.addInputPath(job, new Path(otherArgs[]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//調用job.waitForCompletion(true) 執行任務,執行成功後退出;
System.exit(job.waitForCompletion(true) ? : );
}
過時的API
幾乎所有的程式編譯時都提示了
注: *.java使用或覆寫了已過時的 API。
注: 有關詳細資訊, 請使用 -Xlint:deprecation重新編譯。
因為我使用了過時的API,但我現在不知道新的API是什麼,怎麼寫,以後再說。
參考文獻
HBase 常用Shell指令
http://www.cnblogs.com/nexiyi/p/hbase_shell.html
Hbase Java API詳解
http://www.cnblogs.com/tiantianbyconan/p/3557571.html
如何執行hbase的mapreduce job
http://blog.csdn.net/xiao_jun_0820/article/details/28636309
如何用MapReduce程式操作hbase
http://blog.csdn.net/liuyuan185442111/article/details/45306193