天天看點

hdfs 儲存到hbase_7.從Hbase中讀取資料寫入hdfs

public class HbaseReader {

publicstatic String flow_fields_import = "flow_fields_import";

staticclass HdfsSinkMapper extends TableMapper{

@Override

protectedvoid map(ImmutableBytesWritable key, Result value, Context context) throwsIOException, InterruptedException {

byte[]bytes = key.copyBytes();

Stringphone = new String(bytes);

byte[]urlbytes = value.getValue("f1".getBytes(),"url".getBytes());

Stringurl = new String(urlbytes);

context.write(newText(phone + "\t" + url), NullWritable.get());

}

}

staticclass HdfsSinkReducer extends Reducer{

@Override

protectedvoid reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {

context.write(key,NullWritable.get());

}

}

publicstatic void main(String[] args) throws Exception {

Configurationconf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum","spark01");

Jobjob = Job.getInstance(conf);

job.setJarByClass(HbaseReader.class);

//            job.setMapperClass(HdfsSinkMapper.class);

Scanscan = new Scan();

TableMapReduceUtil.initTableMapperJob(flow_fields_import,scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);

job.setReducerClass(HdfsSinkReducer.class);

FileOutputFormat.setOutputPath(job,new Path("c:/hbasetest/output"));

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

job.waitForCompletion(true);

}

}

public class HbaseSinker {

publicstatic String flow_fields_import = "flow_fields_import";

staticclass HbaseSinkMrMapper extends Mapper{

@Override

protectedvoid map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {

Stringline = value.toString();

String[] fields =line.split("\t");

Stringphone = fields[0];

Stringurl = fields[1];

FlowBeanbean = new FlowBean(phone,url);

context.write(bean,NullWritable.get());

}

}

staticclass HbaseSinkMrReducer extends TableReducer{

@Override

protectedvoid reduce(FlowBean key, Iterable values, Context context)throws IOException, InterruptedException {

Putput = new Put(key.getPhone().getBytes());

put.add("f1".getBytes(),"url".getBytes(), key.getUrl().getBytes());

context.write(newImmutableBytesWritable(key.getPhone().getBytes()), put);

}

}

publicstatic void main(String[] args) throws Exception {

Configurationconf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum","spark01");

HBaseAdminhBaseAdmin = new HBaseAdmin(conf);

booleantableExists = hBaseAdmin.tableExists(flow_fields_import);

if(tableExists){

hBaseAdmin.disableTable(flow_fields_import);

hBaseAdmin.deleteTable(flow_fields_import);

}

HTableDescriptordesc = new HTableDescriptor(TableName.valueOf(flow_fields_import));

HColumnDescriptorhColumnDescriptor = new HColumnDescriptor ("f1".getBytes());

desc.addFamily(hColumnDescriptor);

hBaseAdmin.createTable(desc);

Jobjob = Job.getInstance(conf);

job.setJarByClass(HbaseSinker.class);

job.setMapperClass(HbaseSinkMrMapper.class);

TableMapReduceUtil.initTableReducerJob(flow_fields_import,HbaseSinkMrReducer.class, job);

FileInputFormat.setInputPaths(job,new Path("c:/hbasetest/data"));

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(ImmutableBytesWritable.class);

job.setOutputValueClass(Mutation.class);

job.waitForCompletion(true);

}

}

本文出自 “為了手指那個方向” 部落格,謝絕轉載!