天天看點

基于Hbase的MapReduce檢視表資料

實驗環境

Hadoop2.8.0
Hbase-1.4.9
Centos7.2
           
基于Hbase的MapReduce檢視表資料

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>hbasestudy</artifactId>
        <groupId>pers.haohan.bigdata</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hbase_mr</artifactId>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>
           

mapper.ScanDataMapper.java

package pers.haohan.hbase.mr.mapper;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;

import java.io.IOException;

public class ScanDataMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
        // 運作mapper, 查詢資料

        // scan result ==> put
        Put put = new Put(key.get());

        for (Cell cell : result.rawCells()) {
            put.addColumn(
                    CellUtil.cloneFamily(cell),
                    CellUtil.cloneQualifier(cell),
                    CellUtil.cloneValue(cell)
            );
        }

        context.write(key, put);

    }
}

           

reduce.InsertDataReducer.java

package pers.haohan.hbase.mr.reducer;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class InsertDataReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        // 運作reducer, 增加資料
        for (Put value : values) {

        context.write(NullWritable.get(), value);
        }

    }
}

           

tool.HbaseMapperReduceTool.java

package pers.haohan.hbase.mr.tool;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;
import pers.haohan.hbase.mr.mapper.ScanDataMapper;
import pers.haohan.hbase.mr.reducer.InsertDataReducer;

public class HbaseMapperReduceTool implements Tool {

    @Override
    public int run(String[] strings) throws Exception {

        // 作業
        Job job = Job.getInstance();
        job.setJarByClass(HbaseMapperReduceTool.class);

        // mapper
        TableMapReduceUtil.initTableMapperJob(
                "test:student",
                new Scan(),
                ScanDataMapper.class,
                ImmutableBytesWritable.class,
                Put.class,
                job
        );

        // reducer
        TableMapReduceUtil.initTableReducerJob("test:user", InsertDataReducer.class, job);

        // 執行作業
        boolean flg = job.waitForCompletion(true);

        return flg ? JobStatus.State.SUCCEEDED.getValue():JobStatus.State.FAILED.getValue();


    }

    @Override
    public void setConf(Configuration configuration) {

    }

    @Override
    public Configuration getConf() {
        return null;
    }
}
           

TableApplication.java

package pers.haohan.hbase.mr;

import org.apache.hadoop.util.ToolRunner;

import pers.haohan.hbase.mr.tool.HbaseMapperReduceTool;

public class TableApplication {
    public static void main(String[] args) throws Exception{

        ToolRunner.run(new HbaseMapperReduceTool(), args);

    }
}
           

繼續閱讀