天天看點

生成HFile以及入庫到HBase

參考位址:http://www.aboutyun.com/thread-8603-1-1.html

一、MR生成HFile檔案

[[email protected] hui]$ vi TestHFileToHBase.java

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestHFileToHBase {

        public static class TestHFileToHBaseMapper extends Mapper {

                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                        String[] values = value.toString().split(" ", 2);
                        byte[] row = Bytes.toBytes(values[0]);
                        ImmutableBytesWritable k = new ImmutableBytesWritable(row);
                        KeyValue kvProtocol = new KeyValue(row, "PROTOCOLID".getBytes(), "PROTOCOLID".getBytes(), values[1]
                                        .getBytes());
                        context.write(k, kvProtocol);
                }
        }

        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                Configuration conf = HBaseConfiguration.create();
                Job job = new Job(conf, "TestHFileToHBase");
                job.setJarByClass(TestHFileToHBase.class);

                job.setOutputKeyClass(ImmutableBytesWritable.class);
                job.setOutputValueClass(KeyValue.class);

                job.setMapperClass(TestHFileToHBaseMapper.class);
                job.setReducerClass(KeyValueSortReducer.class);
//                job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
                job.setOutputFormatClass(HFileOutputFormat.class);
                // job.setNumReduceTasks(4);
                // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);

//                 HBaseAdmin admin = new HBaseAdmin(conf);
                HTable table = new HTable(conf, "hua");

                 HFileOutputFormat.configureIncrementalLoad(job, table);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

}
           

[[email protected] ~]$ vi he.txt

hello world

hello hadoop

hello hive

[[email protected] ~]$ hadoop fs -mkdir /rawdata

[[email protected] ~]$ hadoop fs -put he.txt /rawdata

hbase(main):020:0> create 'hua','PROTOCOLID'

[had[email protected] hui]$ /usr/jdk1.7.0_25/bin/javac TestHFileToHBase.java 

[[email protected] hui]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar TestHFileToHBase*class

[hadoop@h71 hui]$ hadoop jar xx.jar TestHFileToHBase /rawdata /middata

報錯:

Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, received org.apache.hadoop.io.LongWritable
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
           

于是我把

public static class TestHFileToHBaseMapper extends Mapper {

修改為

public static class TestHFileToHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{

就好使了。。。

[[email protected] ~]$ hadoop fs -lsr /middata

drwxr-xr-x   - hadoop supergroup          0 2017-03-17 20:50 /middata/PROTOCOLID

-rw-r--r--   2 hadoop supergroup       1142 2017-03-17 20:50 /middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3

-rw-r--r--   2 hadoop supergroup          0 2017-03-17 20:50 /middata/_SUCCESS

(/middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3是個Hfile格式的檔案,無法用hadoop fs -cat檢視,否則會出現亂碼)

二、HFile入庫到HBase

原文代碼有很多問題,修改後為:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestLoadIncrementalHFileToHBase {

        public static void main(String[] args) throws Exception {
                Configuration conf = HBaseConfiguration.create();
                String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                HTable table = new HTable(conf,"hua");
                LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                loader.doBulkLoad(new Path(dfsArgs[0]), table);
        }
}
           

[[email protected] hui]$ /usr/jdk1.7.0_25/bin/javac TestLoadIncrementalHFileToHBase.java

[[email protected] hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata/PROTOCOLID

執行後在hbase shell端檢視表hua無資料。。因執行:

[[email protected] hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata

hbase(main):073:0> scan 'hua'

ROW                                         COLUMN+CELL                                                                                                                 

 hello                                      column=PROTOCOLID:PROTOCOLID, timestamp=1489758507378, value=hive

(檢視hua表h隻有一條資料,一開始還很困惑,我的he.txt中有三條資料啊,為何隻導入了一條資料啊,後來突然明白了hbase将he.txt中三行資料的hello作為rowkey,則三行資料的rowkey都一樣了啊)

繼續閱讀