天天看点

Hadoop相关操作,HDFS,HBase,MapreduceHadoop相关操作

Hadoop相关操作

部分转自:http://www.cnblogs.com/shadowalker/p/7350484.html

                 https://blog.csdn.net/zhangshk_/article/details/83690790

  1. HDFS操作
    1. Shell命令

表格中完整地列出了支持的命令选项:

选项名称 使用格式 含义
-ls -ls <路径> 查看指定路径的当前目录结构
-lsr -lsr <路径> 递归查看指定路径的目录结构
-du -du <路径> 统计目录下个文件大小
-dus -dus <路径> 汇总统计目录下文件(夹)大小
-count -count [-q] <路径> 统计文件(夹)数量
-mv -mv <源路径> <目的路径> 移动
-cp -cp <源路径> <目的路径> 复制
-rm -rm [-skipTrash] <路径> 删除文件/空白文件夹
-rmr -rmr [-skipTrash] <路径> 递归删除
-put -put <多个linux上的文件> <hdfs路径> 上传文件
-copyFromLocal -copyFromLocal <多个linux上的文件> <hdfs路径> 从本地复制
-moveFromLocal -moveFromLocal <多个linux上的文件> <hdfs路径> 从本地移动
-getmerge -getmerge <源路径> <linux路径> 合并到本地
-cat -cat <hdfs路径> 查看文件内容
-text -text <hdfs路径> 查看文件内容
-copyToLocal -copyToLocal [-ignoreCrc] [-crc] [hdfs源路径] [linux目的路径] 从本地复制
-moveToLocal -moveToLocal [-crc] <hdfs源路径> <linux目的路径> 从本地移动
-mkdir -mkdir <hdfs路径> 创建空白文件夹
-setrep -setrep [-R] [-w] <副本数> <路径> 修改副本数量
-touchz -touchz <文件路径> 创建空白文件
-stat -stat [format] <路径> 显示文件统计信息
-tail -tail [-f] <文件> 查看文件尾部信息
-chmod -chmod [-R] <权限模式> [路径] 修改权限
-chown -chown [-R] [属主][:[属组]] 路径 修改属主
-chgrp -chgrp [-R] 属组名称 路径 修改属组
-help -help [命令选项] 帮助

注意:以上表格中路径包括hdfs中的路径和linux中的路径。对于容易产生歧义的地方,会特别指出“linux路径”或者“hdfs路径”。如果没有明确指出,意味着是hdfs路径。

下面简单介绍几个常用命令选项的用法。

  1. -ls 功能:显示目录信息

hdfs dfs -ls /

  1. -mkdir 功能:在HDFS文件系统上创建目录。

hdfs dfs -mkdir /test

  1. -put 功能:上传本地文件到HDFS指定目录,info处是本地文件路径。

hdfs dfs -put info /test

  1. -get 功能:从hdfs下载文件到本地。

hdfs dfs -get /test/info ./

  1. -rm 功能:从HDFS删除文件。

hdfs dfs -rm /test/info

hdfs dfs -ls /test

  1. -moveFromLocal 功能:剪切本地文件到HDFS,info处是本地文件路径。

hdfs dfs -moveFromLocal info /test

  1. -cat 功能:显示文件内容。

hdfs dfs -cat /test/info

  1. -appendToFile 功能:在文件末尾追加数据,info处是本地文件路径。

hdfs dfs -appendToFile info /test/info

hdfs dfs -cat /test/info

  1. -chmod 功能:更改文件所属权限。

hdfs dfs -ls /test

  1. -cp 功能:实现HDFS目录中的文件的拷贝。

将/

test

/info

拷贝到/tmp下,这里要先在HDFS下创建好/tmp:

hdfs dfs -cp /test/info /tmp/

hdfs dfs -ls /tmp

  1. -mv 功能:在HDFS中移动文件。

将/

test

/info

移动到 /user下,需要先创建/user

hdfs dfs -mv /test/info /user/

hdfs dfs -ls /test

hdfs dfs -ls /user

  1. -df 功能:统计文件系统的可用空间信息。

hdfs dfs -df -h /

  1. -du 功能:统计文件夹的大小信息。

hdfs dfs -du /user

  1. -count 功能: 统计一个指定目录下的文件数量。

hdfs dfs -count /user

    1. Java API编程

public class Test {

    private FileSystem fs;

    private URI uri;

    Configuration cf;

    //private static String rec="hdfs://localhost:9000/test";

    private static String rec="hdfs://localhost:9000/";

    Test(String resource){

          cf=new Configuration();

        try {

            uri=new URI(resource);

            try {

                fs=FileSystem.newInstance(uri, cf);

            } catch (IOException e) {

                e.printStackTrace();

            }

        } catch (URISyntaxException e) {

            e.printStackTrace();

        }

    }

    public void createDir(String src){

        try {

            fs.mkdirs(new Path(src));

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void readFile(){

        InputStream input=null;

        ByteArrayOutputStream ouput=null;

        try {

            input=fs.open(new Path(rec+"/test"));

            ouput=new ByteArrayOutputStream(input.available());

            IOUtils.copyBytes(input, ouput, cf);

            System.out.print(ouput.toString());

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void listAll(String src){

        try {

            FileStatus[] status=fs.listStatus(new Path(src));

            for(int i=0;i<status.length;i++){

                System.out.println(status[i].getPath().toString());

            }

        } catch (IllegalArgumentException | IOException e) {

            e.printStackTrace();

        }

    }

    public void copyFromLocalDir(String src,String dst){

        try {

            fs.copyFromLocalFile(new Path(src), new Path(dst));

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void copyToLocalDir(String src,String dst){

        try {

            boolean isExist=fs.exists(new Path(src));

            if(isExist){

                fs.copyToLocalFile(new Path(src),new Path(dst));

            }

            else{

                System.out.println("文件不存在!");

            }

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void copyAllToLocalDir(String src,String dst){

        try {

            boolean isExist=fs.exists(new Path(src));

            if(isExist){

                FileStatus[] status=fs.listStatus(new Path(src));

                for(int i=0;i<status.length;i++){

            fs.copyToLocalFile(new Path(status[i].getPath().toString()),new Path(dst));

                }

            }

            else{

                System.out.println("文件不存在!");

            }

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void deleteFile(String src){

        try {

            boolean isExist=fs.exists(new Path(src));

            if(isExist){

                fs.delete(new Path(src));

            }

            else{

                System.out.println("文件不存在!");

            }

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public static void main(String[] args) {

        // TODO Auto-generated method stub

        Test ts=new Test(rec);

        System.out.println("All Files");

        ts.listAll(rec);

        ts.copyFromLocalDir("/home/wangzun/文档/hadoopts.pdf", rec+"/test");

        ts.listAll(rec+"/test");

        ts.copyAllToLocalDir(rec+"/test", "/home/wangzun/hadooptest/");

        //ts.deleteFile(rec+"/test/test");

        //ts.listAll(rec+"/test");

        //ts.copyToLocalDir(rec+"/test/大数据技术原理与应用.pdf", "/home/wangzun/hadooptest/");//这里的路径需要修改

        //ts.readFile();

    }

}

  1. HBase相关操作
    1. Shell命令操作

“hbase(main):002:0>”后的语句是输入的命令

  • 表结构操作
  1. 创建表

语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}

创建一个User表,并且有一个info列族

hbase(main):002:0> create 'User','info'

  1. 查看所有表

hbase(main):003:0> list

  1. 查看表详情

hbase(main):004:0> describe 'User'

hbase(main):025:0> desc 'User'

  1. 表修改

删除指定的列族

hbase(main):002:0> alter 'User', 'delete' => 'info'

  • 表数据操作
  1. 插入数据

语法:put <table>,<rowkey>,<family:column>,<value>

hbase(main):005:0> put 'User', 'row1', 'info:name', 'xiaoming'

hbase(main):006:0> put 'User', 'row2', 'info:age', '18'

hbase(main):007:0> put 'User', 'row3', 'info:sex', 'man'

  1. 根据rowKey查询某个记录

语法:get <table>,<rowkey>,[<family:column>,....]

hbase(main):008:0> get 'User', 'row2'

hbase(main):028:0> get 'User', 'row3', 'info:sex'

hbase(main):036:0> get 'User', 'row1', {COLUMN => 'info:name'}

  1. 查询所有记录

语法:scan <table>, {COLUMNS => [ <family:column>,...], LIMIT => num}

扫描所有记录

hbase(main):009:0> scan 'User'

扫描前2条

hbase(main):037:0> scan 'User', {LIMIT => 2}

范围查询

hbase(main):011:0> scan 'User', {STARTROW => 'row2'}

hbase(main):012:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row2'}

hbase(main):013:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}

另外,还可以添加TIMERANGE和FITLER等高级功能

STARTROW,ENDROW必须大写,否则报错;查询结果不包含等于ENDROW的结果集

  1. 统计表记录数

语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}

INTERVAL设置多少行显示一次及对应的rowkey,默认1000;CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度

hbase(main):020:0> count 'User'

  1. 删除

删除列

hbase(main):008:0> delete 'User', 'row1', 'info:age'

删除所有行

hbase(main):014:0> deleteall 'User', 'row2'

删除表中所有数据

hbase(main):016:0> truncate 'User'

  • 表管理操作
  1. 禁用表

hbase(main):014:0> disable 'User'

hbase(main):015:0> describe 'User'

hbase(main):016:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}

  1. 启用表

hbase(main):017:0> enable 'User'

hbase(main):018:0> describe 'User'

hbase(main):019:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}

  1. 测试表是否存在

hbase(main):022:0> exists 'User'

hbase(main):023:0> exists 'user'

hbase(main):024:0> exists user

报错NameError: undefined local variable or method `user' for main

  1. 删除表

删除前,必须先disable

hbase(main):030:0> drop 'User'

报错ERROR: Table TEST.USER is enabled. Disable it first.

hbase(main):031:0> disable 'User'

hbase(main):033:0> drop 'User'

查看结果

hbase(main):034:0> list

    1. Java API编程

需要将hbase文件夹下的lib文件夹下的包加入到工程中

public class HBaseConn {

    static Configuration conf = null;

    static Connection conn = null;

    static {

        conf = HBaseConfiguration.create();

        conf.set("hbase.zookeeper.quorum", "localhost");

        conf.set("hbase.zookeeper.property.clientPort", "2181");

        conf.set("hbase.master", "127.0.0.1:60000");

        try {

            conn = ConnectionFactory.createConnection(conf);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public static void testCreateTable() throws Exception {

        // 创建表管理类

        Admin admin = conn.getAdmin();

        // 创建表描述类

        TableName tableName = TableName.valueOf("user2");

        HTableDescriptor descriptor = new HTableDescriptor(tableName);

        // 创建列族描述类

        HColumnDescriptor info1 = new HColumnDescriptor("info1");

        // 列族加入表中

        descriptor.addFamily(info1);

        HColumnDescriptor info2 = new HColumnDescriptor("info2");

        descriptor.addFamily(info2);

        // 创建表

        admin.createTable(descriptor);

    }

    public static void testDeleteTable() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Admin admin = conn.getAdmin();

        admin.disableTable(t1);

        admin.deleteTable(t1);

    }

    public static void testPut() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        // rowkey

        Put put = new Put(Bytes.toBytes("1234"));

        // 列族,列,值

        put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("gender"),

                Bytes.toBytes("1"));

        put.addColumn(Bytes.toBytes("info2"), Bytes.toBytes("name"),

                Bytes.toBytes("wangwu"));

        table.put(put);

    }

    public static void testPut2() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        List<Put> putList = new ArrayList<>();

        for (int i = 20; i <= 30; i++) {

            // rowkey

            Put put = new Put(Bytes.toBytes("jbm_" + i));

            // 列族,列,值

            put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("age"),

                    Bytes.toBytes(Integer.toString(i)));

            put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"),

                    Bytes.toBytes("lucy" + i));

            putList.add(put);

        }

        table.put(putList);

    }

    public static void testUpdate() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Put put = new Put(Bytes.toBytes("1234"));

        put.addColumn(Bytes.toBytes("info2"), Bytes.toBytes("name"),

                Bytes.toBytes("tom"));

        table.put(put);

    }

    public static void testDeleteData() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Delete delete = new Delete(Bytes.toBytes("1234"));

        table.delete(delete);

    }

    public static void testGetSingle() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        // rowkey

        Get get = new Get(Bytes.toBytes("jbm_20"));

        Result result = table.get(get);

        // 列族,列名

        byte[] name = result.getValue(Bytes.toBytes("info1"),

                Bytes.toBytes("name"));

        byte[] age = result.getValue(Bytes.toBytes("info1"),

                Bytes.toBytes("age"));

        System.out.println(Bytes.toString(name));

        System.out.println(Bytes.toString(age));

    }

    public static void testGetMany() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 字典序 类似于分页

        scan.setStartRow(Bytes.toBytes("jbm_20"));

        scan.setStopRow(Bytes.toBytes("jbm_30"));

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            // Single row result of a Get or Scan query. Result

            // Result 一次获取一个rowkey对应的记录

            // 列族,列名

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            System.out.print(Bytes.toString(name) + ",");

            System.out.print(Bytes.toString(age));

            System.out.println();

        }

    }

    public static void testFilter() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 列值过滤器

        SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(

                Bytes.toBytes("info1"), Bytes.toBytes("name"), CompareOp.EQUAL,

                Bytes.toBytes("lucy25"));

        // 设置过滤器

        scan.setFilter(columnValueFilter);

        // 获取结果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            System.out.print(Bytes.toString(name) + ",");

            System.out.print(Bytes.toString(age));

            System.out.println();

        }

    }

    public static void testRowkeyFilter() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // rowkey过滤器

        // 匹配以jbm开头的

        RowFilter filter = new RowFilter(CompareOp.EQUAL,

                new RegexStringComparator("^jbm"));

        // 设置过滤器

        scan.setFilter(filter);

        // 获取结果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            System.out.print(Bytes.toString(name) + ",");

            System.out.print(Bytes.toString(age));

            System.out.println();

        }

    }

    public static void testColumnPrefixFilter() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 列名前缀过滤器 列名前缀为na(注:不是指值的前缀)

        ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na"));

        // 设置过滤器

        scan.setFilter(filter);

        // 获取结果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            if (name != null) {

                System.out.print(Bytes.toString(name) + " ");

            }

            if (age != null) {

                System.out.print(age);

            }

            System.out.println();

        }

    }

    public static void testFilterList() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)

        FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);

        // ROWKEY过滤器

        RowFilter rowFilter = new RowFilter(CompareOp.EQUAL,

                new RegexStringComparator("^jbm"));

        // 列值过滤器 age大于25

        SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(

                Bytes.toBytes("info1"), Bytes.toBytes("age"),

                CompareOp.GREATER, Bytes.toBytes("25"));

        filterList.addFilter(columnValueFilter);

        filterList.addFilter(rowFilter);

        // 设置过滤器

        scan.setFilter(filterList);

        // 获取结果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            if (name != null) {

                System.out.print(Bytes.toString(name) + " ");

            }

            if (age != null) {

                System.out.print(Bytes.toString(age) + " ");

            }

            System.out.println();

        }

    }

    public static void main(String[] arg) {

        System.setProperty("hadoop.home.dir",

                "/home/slave3/hadoop/hadoop-3.1.1");

        try {

            // testCreateTable();

            // testDeleteTable();

            // testPut();

            // testPut2();

            // testUpdate();

            // testDeleteData();

            // testGetSingle();

            // testGetMany();

            // testFilter();

            // testRowkeyFilter();

            // testColumnPrefixFilter();

            //testFilterList();

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

}

  1. Mapreduce相关操作

需要将hadoop文件夹中share文件夹下的所有包加入到工程。

    1. WordCount

统计文件中每个单词出现的次数。

MapTask.java

public class MapTask extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override

    protected void map(LongWritable key, Text value, Context context)

            throws IOException, InterruptedException {

        String[] split = value.toString().split(" ");

        for (String word : split) {

            context.write(new Text(word), new IntWritable(1));

        }

    }

}

ReduceTask.java

public class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,

            Context context) throws IOException, InterruptedException {

        // TODO Auto-generated method stub

        int count = 0;

        for (IntWritable value : values) {

            count += value.get();

        }

        context.write(key, new IntWritable(count));

    }

}

Mycombiner.java

public class Mycombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override

    protected void reduce(Text arg0, Iterable<IntWritable> arg1,

            Reducer<Text, IntWritable, Text, IntWritable>.Context arg2)

            throws IOException, InterruptedException {

        int count = 0;

        for (IntWritable intWritable : arg1) {

            count += intWritable.get();

        }

        arg2.write(arg0, new IntWritable(count));

    }

}

Driver.java

public class Driver {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // System.setProperty("HADOOP_USER_NAME", "hasee");

        Job job = Job.getInstance(conf);

        job.setMapperClass(MapTask.class);

        job.setReducerClass(ReduceTask.class);

        job.setJarByClass(Driver.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        // 设置combiner

        job.setCombinerClass(Mycombiner.class);

        // 设置输入和输出目录

        FileInputFormat.setInputPaths(job, new Path(

                "/home/slave3/Downloads/test"));

        FileOutputFormat.setOutputPath(job, new Path(

                "/home/slave3/Downloads/result"));

        File file = new File("/home/slave3/Downloads/test.txt");

        if (file.exists()) {

            FileUtils.deleteDirectory(file);

        }

        // 提交任务

        boolean b = job.waitForCompletion(true);

        System.out.println(b ? 0 : 1)

    }

}

    1. 多个文件中同一字符分别在某个文件中出现的次数

第一步:首先将每个文件中的字符数统计出来:例如hello-a.txt 3

CreateIndexOne.java

public class CreateIndexOne {

    public static class MapTask extends

            Mapper<LongWritable, Text, Text, IntWritable> {

        String pathname = null;

        @Override

        protected void setup(

            Mapper<LongWritable, Text, Text, IntWritable>.Context context)

                throws IOException, InterruptedException {

            // 获取当前文件名 计算切片

            FileSplit fileSplit = (FileSplit) context.getInputSplit();

            pathname = fileSplit.getPath().getName();

        }

        @Override

        protected void map(LongWritable key, Text value,

            Mapper<LongWritable, Text, Text, IntWritable>.Context context)

                throws IOException, InterruptedException {

            String[] words = value.toString().split(" ");

            for (String word : words) {

            context.write(new Text(word + "-" + pathname), new IntWritable(

                        1));

            }

        }

    }

    public static class ReduceTask extends

            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override

        protected void reduce(Text key, Iterable<IntWritable> values,

            Reducer<Text, IntWritable, Text, IntWritable>.Context context)

                throws IOException, InterruptedException {

                        int count = 0;

            for (IntWritable value : values) {

                count++;

            }

            context.write(key, new IntWritable(count));

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // System.setProperty("HADOOP_USER_NAME", "hasee");

        Job job = Job.getInstance(conf);

        job.setMapperClass(MapTask.class);

        job.setReducerClass(ReduceTask.class);

        job.setJarByClass(CreateIndexOne.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        // 设置输入和输出目录

        FileInputFormat.setInputPaths(job, new Path(

                "/home/slave3/Downloads/test2"));//在这个路径下创建多个txt文件,并分别输入若干单词

        FileOutputFormat.setOutputPath(job, new Path(

                "/home/slave3/Downloads/result2"));

        File file = new File("/home/slave3/Downloads/result2");

        if (file.exists()) {

            FileUtils.deleteDirectory(file);

        }

        boolean b = job.waitForCompletion(true);

        System.out.println(b ? 0 : 1);

    }

}

第二步:使用第一步的结果,合并每个文件在各个单词的次数。例如 hello    a.txt  3,b.txt  2

CreateIndexTwo.Java

public class CreateIndexTwo {

    public static class MapTask extends Mapper<LongWritable, Text, Text, Text> {

        Text outKey = new Text();

        Text outValue = new Text();

        @Override

        protected void map(LongWritable key, Text value,

                Mapper<LongWritable, Text, Text, Text>.Context context)

                throws IOException, InterruptedException {

            String[] split = value.toString().split("-");

            String word = split[0];

            String nameNum = split[1];

            outKey.set(word);

            outValue.set(nameNum);

            context.write(outKey, outValue);

        }

    }

    public static class ReduceTask extends Reducer<Text, Text, Text, Text> {

        @Override

        protected void reduce(Text key, Iterable<Text> values,

                Reducer<Text, Text, Text, Text>.Context context)

                throws IOException, InterruptedException {

            StringBuilder builder = new StringBuilder();

            boolean flag = true;

            for (Text text : values) {

                if (flag) {

                    builder.append(text.toString());

                    flag = false;

                } else {

                    builder.append(",");

                    builder.append(text.toString());

                }

            }

            context.write(key, new Text(builder.toString()));

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setMapperClass(MapTask.class);

        job.setReducerClass(ReduceTask.class);

        job.setJarByClass(CreateIndexTwo.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        // 设置输入和输出目录

        FileInputFormat.setInputPaths(job, new Path(

                "/home/slave3/Downloads/result2"));

        FileOutputFormat.setOutputPath(job, new Path(

                "/home/slave3/Downloads/result3"));

        File file = new File("/home/slave3/Downloads/result3");

        if (file.exists()) {

            FileUtils.deleteDirectory(file);

        }

        boolean b = job.waitForCompletion(true);

        System.out.println(b ? 0 : 1);

    }

}

继续阅读