實驗環境
Hadoop2.8.0
Hbase-1.4.9
Centos7.2
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hbasestudy</artifactId>
<groupId>pers.haohan.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase_mr</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
mapper.ScanDataMapper.java
package pers.haohan.hbase.mr.mapper;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import java.io.IOException;
public class ScanDataMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
// 運作mapper, 查詢資料
// scan result ==> put
Put put = new Put(key.get());
for (Cell cell : result.rawCells()) {
put.addColumn(
CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
CellUtil.cloneValue(cell)
);
}
context.write(key, put);
}
}
reduce.InsertDataReducer.java
package pers.haohan.hbase.mr.reducer;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class InsertDataReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
// 運作reducer, 增加資料
for (Put value : values) {
context.write(NullWritable.get(), value);
}
}
}
tool.HbaseMapperReduceTool.java
package pers.haohan.hbase.mr.tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;
import pers.haohan.hbase.mr.mapper.ScanDataMapper;
import pers.haohan.hbase.mr.reducer.InsertDataReducer;
public class HbaseMapperReduceTool implements Tool {
@Override
public int run(String[] strings) throws Exception {
// 作業
Job job = Job.getInstance();
job.setJarByClass(HbaseMapperReduceTool.class);
// mapper
TableMapReduceUtil.initTableMapperJob(
"test:student",
new Scan(),
ScanDataMapper.class,
ImmutableBytesWritable.class,
Put.class,
job
);
// reducer
TableMapReduceUtil.initTableReducerJob("test:user", InsertDataReducer.class, job);
// 執行作業
boolean flg = job.waitForCompletion(true);
return flg ? JobStatus.State.SUCCEEDED.getValue():JobStatus.State.FAILED.getValue();
}
@Override
public void setConf(Configuration configuration) {
}
@Override
public Configuration getConf() {
return null;
}
}
TableApplication.java
package pers.haohan.hbase.mr;
import org.apache.hadoop.util.ToolRunner;
import pers.haohan.hbase.mr.tool.HbaseMapperReduceTool;
public class TableApplication {
public static void main(String[] args) throws Exception{
ToolRunner.run(new HbaseMapperReduceTool(), args);
}
}