public class HbaseReader {
publicstatic String flow_fields_import = "flow_fields_import";
staticclass HdfsSinkMapper extends TableMapper{
@Override
protectedvoid map(ImmutableBytesWritable key, Result value, Context context) throwsIOException, InterruptedException {
byte[]bytes = key.copyBytes();
Stringphone = new String(bytes);
byte[]urlbytes = value.getValue("f1".getBytes(),"url".getBytes());
Stringurl = new String(urlbytes);
context.write(newText(phone + "\t" + url), NullWritable.get());
}
}
staticclass HdfsSinkReducer extends Reducer{
@Override
protectedvoid reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
publicstatic void main(String[] args) throws Exception {
Configurationconf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","spark01");
Jobjob = Job.getInstance(conf);
job.setJarByClass(HbaseReader.class);
// job.setMapperClass(HdfsSinkMapper.class);
Scanscan = new Scan();
TableMapReduceUtil.initTableMapperJob(flow_fields_import,scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
job.setReducerClass(HdfsSinkReducer.class);
FileOutputFormat.setOutputPath(job,new Path("c:/hbasetest/output"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
}
}
public class HbaseSinker {
publicstatic String flow_fields_import = "flow_fields_import";
staticclass HbaseSinkMrMapper extends Mapper{
@Override
protectedvoid map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
Stringline = value.toString();
String[] fields =line.split("\t");
Stringphone = fields[0];
Stringurl = fields[1];
FlowBeanbean = new FlowBean(phone,url);
context.write(bean,NullWritable.get());
}
}
staticclass HbaseSinkMrReducer extends TableReducer{
@Override
protectedvoid reduce(FlowBean key, Iterable values, Context context)throws IOException, InterruptedException {
Putput = new Put(key.getPhone().getBytes());
put.add("f1".getBytes(),"url".getBytes(), key.getUrl().getBytes());
context.write(newImmutableBytesWritable(key.getPhone().getBytes()), put);
}
}
publicstatic void main(String[] args) throws Exception {
Configurationconf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","spark01");
HBaseAdminhBaseAdmin = new HBaseAdmin(conf);
booleantableExists = hBaseAdmin.tableExists(flow_fields_import);
if(tableExists){
hBaseAdmin.disableTable(flow_fields_import);
hBaseAdmin.deleteTable(flow_fields_import);
}
HTableDescriptordesc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
HColumnDescriptorhColumnDescriptor = new HColumnDescriptor ("f1".getBytes());
desc.addFamily(hColumnDescriptor);
hBaseAdmin.createTable(desc);
Jobjob = Job.getInstance(conf);
job.setJarByClass(HbaseSinker.class);
job.setMapperClass(HbaseSinkMrMapper.class);
TableMapReduceUtil.initTableReducerJob(flow_fields_import,HbaseSinkMrReducer.class, job);
FileInputFormat.setInputPaths(job,new Path("c:/hbasetest/data"));
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Mutation.class);
job.waitForCompletion(true);
}
}
本文出自 “為了手指那個方向” 部落格,謝絕轉載!