天天看點

Hadoop相關操作,HDFS,HBase,MapreduceHadoop相關操作

Hadoop相關操作

部分轉自:http://www.cnblogs.com/shadowalker/p/7350484.html

                 https://blog.csdn.net/zhangshk_/article/details/83690790

  1. HDFS操作
    1. Shell指令

表格中完整地列出了支援的指令選項:

選項名稱 使用格式 含義
-ls -ls <路徑> 檢視指定路徑的目前目錄結構
-lsr -lsr <路徑> 遞歸檢視指定路徑的目錄結構
-du -du <路徑> 統計目錄下個檔案大小
-dus -dus <路徑> 彙總統計目錄下檔案(夾)大小
-count -count [-q] <路徑> 統計檔案(夾)數量
-mv -mv <源路徑> <目的路徑> 移動
-cp -cp <源路徑> <目的路徑> 複制
-rm -rm [-skipTrash] <路徑> 删除檔案/空白檔案夾
-rmr -rmr [-skipTrash] <路徑> 遞歸删除
-put -put <多個linux上的檔案> <hdfs路徑> 上傳檔案
-copyFromLocal -copyFromLocal <多個linux上的檔案> <hdfs路徑> 從本地複制
-moveFromLocal -moveFromLocal <多個linux上的檔案> <hdfs路徑> 從本地移動
-getmerge -getmerge <源路徑> <linux路徑> 合并到本地
-cat -cat <hdfs路徑> 檢視檔案内容
-text -text <hdfs路徑> 檢視檔案内容
-copyToLocal -copyToLocal [-ignoreCrc] [-crc] [hdfs源路徑] [linux目的路徑] 從本地複制
-moveToLocal -moveToLocal [-crc] <hdfs源路徑> <linux目的路徑> 從本地移動
-mkdir -mkdir <hdfs路徑> 建立空白檔案夾
-setrep -setrep [-R] [-w] <副本數> <路徑> 修改副本數量
-touchz -touchz <檔案路徑> 建立空白檔案
-stat -stat [format] <路徑> 顯示檔案統計資訊
-tail -tail [-f] <檔案> 檢視檔案尾部資訊
-chmod -chmod [-R] <權限模式> [路徑] 修改權限
-chown -chown [-R] [屬主][:[屬組]] 路徑 修改屬主
-chgrp -chgrp [-R] 屬組名稱 路徑 修改屬組
-help -help [指令選項] 幫助

注意:以上表格中路徑包括hdfs中的路徑和linux中的路徑。對于容易産生歧義的地方,會特别指出“linux路徑”或者“hdfs路徑”。如果沒有明确指出,意味着是hdfs路徑。

下面簡單介紹幾個常用指令選項的用法。

  1. -ls 功能:顯示目錄資訊

hdfs dfs -ls /

  1. -mkdir 功能:在HDFS檔案系統上建立目錄。

hdfs dfs -mkdir /test

  1. -put 功能:上傳本地檔案到HDFS指定目錄,info處是本地檔案路徑。

hdfs dfs -put info /test

  1. -get 功能:從hdfs下載下傳檔案到本地。

hdfs dfs -get /test/info ./

  1. -rm 功能:從HDFS删除檔案。

hdfs dfs -rm /test/info

hdfs dfs -ls /test

  1. -moveFromLocal 功能:剪切本地檔案到HDFS,info處是本地檔案路徑。

hdfs dfs -moveFromLocal info /test

  1. -cat 功能:顯示檔案内容。

hdfs dfs -cat /test/info

  1. -appendToFile 功能:在檔案末尾追加資料,info處是本地檔案路徑。

hdfs dfs -appendToFile info /test/info

hdfs dfs -cat /test/info

  1. -chmod 功能:更改檔案所屬權限。

hdfs dfs -ls /test

  1. -cp 功能:實作HDFS目錄中的檔案的拷貝。

将/

test

/info

拷貝到/tmp下,這裡要先在HDFS下建立好/tmp:

hdfs dfs -cp /test/info /tmp/

hdfs dfs -ls /tmp

  1. -mv 功能:在HDFS中移動檔案。

将/

test

/info

移動到 /user下,需要先建立/user

hdfs dfs -mv /test/info /user/

hdfs dfs -ls /test

hdfs dfs -ls /user

  1. -df 功能:統計檔案系統的可用空間資訊。

hdfs dfs -df -h /

  1. -du 功能:統計檔案夾的大小資訊。

hdfs dfs -du /user

  1. -count 功能: 統計一個指定目錄下的檔案數量。

hdfs dfs -count /user

    1. Java API程式設計

public class Test {

    private FileSystem fs;

    private URI uri;

    Configuration cf;

    //private static String rec="hdfs://localhost:9000/test";

    private static String rec="hdfs://localhost:9000/";

    Test(String resource){

          cf=new Configuration();

        try {

            uri=new URI(resource);

            try {

                fs=FileSystem.newInstance(uri, cf);

            } catch (IOException e) {

                e.printStackTrace();

            }

        } catch (URISyntaxException e) {

            e.printStackTrace();

        }

    }

    public void createDir(String src){

        try {

            fs.mkdirs(new Path(src));

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void readFile(){

        InputStream input=null;

        ByteArrayOutputStream ouput=null;

        try {

            input=fs.open(new Path(rec+"/test"));

            ouput=new ByteArrayOutputStream(input.available());

            IOUtils.copyBytes(input, ouput, cf);

            System.out.print(ouput.toString());

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void listAll(String src){

        try {

            FileStatus[] status=fs.listStatus(new Path(src));

            for(int i=0;i<status.length;i++){

                System.out.println(status[i].getPath().toString());

            }

        } catch (IllegalArgumentException | IOException e) {

            e.printStackTrace();

        }

    }

    public void copyFromLocalDir(String src,String dst){

        try {

            fs.copyFromLocalFile(new Path(src), new Path(dst));

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void copyToLocalDir(String src,String dst){

        try {

            boolean isExist=fs.exists(new Path(src));

            if(isExist){

                fs.copyToLocalFile(new Path(src),new Path(dst));

            }

            else{

                System.out.println("檔案不存在!");

            }

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void copyAllToLocalDir(String src,String dst){

        try {

            boolean isExist=fs.exists(new Path(src));

            if(isExist){

                FileStatus[] status=fs.listStatus(new Path(src));

                for(int i=0;i<status.length;i++){

            fs.copyToLocalFile(new Path(status[i].getPath().toString()),new Path(dst));

                }

            }

            else{

                System.out.println("檔案不存在!");

            }

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public void deleteFile(String src){

        try {

            boolean isExist=fs.exists(new Path(src));

            if(isExist){

                fs.delete(new Path(src));

            }

            else{

                System.out.println("檔案不存在!");

            }

        } catch (IllegalArgumentException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public static void main(String[] args) {

        // TODO Auto-generated method stub

        Test ts=new Test(rec);

        System.out.println("All Files");

        ts.listAll(rec);

        ts.copyFromLocalDir("/home/wangzun/文檔/hadoopts.pdf", rec+"/test");

        ts.listAll(rec+"/test");

        ts.copyAllToLocalDir(rec+"/test", "/home/wangzun/hadooptest/");

        //ts.deleteFile(rec+"/test/test");

        //ts.listAll(rec+"/test");

        //ts.copyToLocalDir(rec+"/test/大資料技術原理與應用.pdf", "/home/wangzun/hadooptest/");//這裡的路徑需要修改

        //ts.readFile();

    }

}

  1. HBase相關操作
    1. Shell指令操作

“hbase(main):002:0>”後的語句是輸入的指令

  • 表結構操作
  1. 建立表

文法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}

建立一個User表,并且有一個info列族

hbase(main):002:0> create 'User','info'

  1. 檢視所有表

hbase(main):003:0> list

  1. 檢視表詳情

hbase(main):004:0> describe 'User'

hbase(main):025:0> desc 'User'

  1. 表修改

删除指定的列族

hbase(main):002:0> alter 'User', 'delete' => 'info'

  • 表資料操作
  1. 插入資料

文法:put <table>,<rowkey>,<family:column>,<value>

hbase(main):005:0> put 'User', 'row1', 'info:name', 'xiaoming'

hbase(main):006:0> put 'User', 'row2', 'info:age', '18'

hbase(main):007:0> put 'User', 'row3', 'info:sex', 'man'

  1. 根據rowKey查詢某個記錄

文法:get <table>,<rowkey>,[<family:column>,....]

hbase(main):008:0> get 'User', 'row2'

hbase(main):028:0> get 'User', 'row3', 'info:sex'

hbase(main):036:0> get 'User', 'row1', {COLUMN => 'info:name'}

  1. 查詢所有記錄

文法:scan <table>, {COLUMNS => [ <family:column>,...], LIMIT => num}

掃描所有記錄

hbase(main):009:0> scan 'User'

掃描前2條

hbase(main):037:0> scan 'User', {LIMIT => 2}

範圍查詢

hbase(main):011:0> scan 'User', {STARTROW => 'row2'}

hbase(main):012:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row2'}

hbase(main):013:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}

另外,還可以添加TIMERANGE和FITLER等進階功能

STARTROW,ENDROW必須大寫,否則報錯;查詢結果不包含等于ENDROW的結果集

  1. 統計表記錄數

文法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}

INTERVAL設定多少行顯示一次及對應的rowkey,預設1000;CACHE每次去取的緩存區大小,預設是10,調整該參數可提高查詢速度

hbase(main):020:0> count 'User'

  1. 删除

删除列

hbase(main):008:0> delete 'User', 'row1', 'info:age'

删除所有行

hbase(main):014:0> deleteall 'User', 'row2'

删除表中所有資料

hbase(main):016:0> truncate 'User'

  • 表管理操作
  1. 禁用表

hbase(main):014:0> disable 'User'

hbase(main):015:0> describe 'User'

hbase(main):016:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}

  1. 啟用表

hbase(main):017:0> enable 'User'

hbase(main):018:0> describe 'User'

hbase(main):019:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}

  1. 測試表是否存在

hbase(main):022:0> exists 'User'

hbase(main):023:0> exists 'user'

hbase(main):024:0> exists user

報錯NameError: undefined local variable or method `user' for main

  1. 删除表

删除前,必須先disable

hbase(main):030:0> drop 'User'

報錯ERROR: Table TEST.USER is enabled. Disable it first.

hbase(main):031:0> disable 'User'

hbase(main):033:0> drop 'User'

檢視結果

hbase(main):034:0> list

    1. Java API程式設計

需要将hbase檔案夾下的lib檔案夾下的包加入到工程中

public class HBaseConn {

    static Configuration conf = null;

    static Connection conn = null;

    static {

        conf = HBaseConfiguration.create();

        conf.set("hbase.zookeeper.quorum", "localhost");

        conf.set("hbase.zookeeper.property.clientPort", "2181");

        conf.set("hbase.master", "127.0.0.1:60000");

        try {

            conn = ConnectionFactory.createConnection(conf);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public static void testCreateTable() throws Exception {

        // 建立表管理類

        Admin admin = conn.getAdmin();

        // 建立表描述類

        TableName tableName = TableName.valueOf("user2");

        HTableDescriptor descriptor = new HTableDescriptor(tableName);

        // 建立列族描述類

        HColumnDescriptor info1 = new HColumnDescriptor("info1");

        // 列族加入表中

        descriptor.addFamily(info1);

        HColumnDescriptor info2 = new HColumnDescriptor("info2");

        descriptor.addFamily(info2);

        // 建立表

        admin.createTable(descriptor);

    }

    public static void testDeleteTable() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Admin admin = conn.getAdmin();

        admin.disableTable(t1);

        admin.deleteTable(t1);

    }

    public static void testPut() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        // rowkey

        Put put = new Put(Bytes.toBytes("1234"));

        // 列族,列,值

        put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("gender"),

                Bytes.toBytes("1"));

        put.addColumn(Bytes.toBytes("info2"), Bytes.toBytes("name"),

                Bytes.toBytes("wangwu"));

        table.put(put);

    }

    public static void testPut2() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        List<Put> putList = new ArrayList<>();

        for (int i = 20; i <= 30; i++) {

            // rowkey

            Put put = new Put(Bytes.toBytes("jbm_" + i));

            // 列族,列,值

            put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("age"),

                    Bytes.toBytes(Integer.toString(i)));

            put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"),

                    Bytes.toBytes("lucy" + i));

            putList.add(put);

        }

        table.put(putList);

    }

    public static void testUpdate() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Put put = new Put(Bytes.toBytes("1234"));

        put.addColumn(Bytes.toBytes("info2"), Bytes.toBytes("name"),

                Bytes.toBytes("tom"));

        table.put(put);

    }

    public static void testDeleteData() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Delete delete = new Delete(Bytes.toBytes("1234"));

        table.delete(delete);

    }

    public static void testGetSingle() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        // rowkey

        Get get = new Get(Bytes.toBytes("jbm_20"));

        Result result = table.get(get);

        // 列族,列名

        byte[] name = result.getValue(Bytes.toBytes("info1"),

                Bytes.toBytes("name"));

        byte[] age = result.getValue(Bytes.toBytes("info1"),

                Bytes.toBytes("age"));

        System.out.println(Bytes.toString(name));

        System.out.println(Bytes.toString(age));

    }

    public static void testGetMany() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 字典序 類似于分頁

        scan.setStartRow(Bytes.toBytes("jbm_20"));

        scan.setStopRow(Bytes.toBytes("jbm_30"));

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            // Single row result of a Get or Scan query. Result

            // Result 一次擷取一個rowkey對應的記錄

            // 列族,列名

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            System.out.print(Bytes.toString(name) + ",");

            System.out.print(Bytes.toString(age));

            System.out.println();

        }

    }

    public static void testFilter() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 列值過濾器

        SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(

                Bytes.toBytes("info1"), Bytes.toBytes("name"), CompareOp.EQUAL,

                Bytes.toBytes("lucy25"));

        // 設定過濾器

        scan.setFilter(columnValueFilter);

        // 擷取結果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            System.out.print(Bytes.toString(name) + ",");

            System.out.print(Bytes.toString(age));

            System.out.println();

        }

    }

    public static void testRowkeyFilter() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // rowkey過濾器

        // 比對以jbm開頭的

        RowFilter filter = new RowFilter(CompareOp.EQUAL,

                new RegexStringComparator("^jbm"));

        // 設定過濾器

        scan.setFilter(filter);

        // 擷取結果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            System.out.print(Bytes.toString(name) + ",");

            System.out.print(Bytes.toString(age));

            System.out.println();

        }

    }

    public static void testColumnPrefixFilter() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 列名字首過濾器 列名字首為na(注:不是指值的字首)

        ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na"));

        // 設定過濾器

        scan.setFilter(filter);

        // 擷取結果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            if (name != null) {

                System.out.print(Bytes.toString(name) + " ");

            }

            if (age != null) {

                System.out.print(age);

            }

            System.out.println();

        }

    }

    public static void testFilterList() throws Exception {

        TableName t1 = TableName.valueOf("user2");

        Table table = conn.getTable(t1);

        Scan scan = new Scan();

        // 過濾器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)

        FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);

        // ROWKEY過濾器

        RowFilter rowFilter = new RowFilter(CompareOp.EQUAL,

                new RegexStringComparator("^jbm"));

        // 列值過濾器 age大于25

        SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(

                Bytes.toBytes("info1"), Bytes.toBytes("age"),

                CompareOp.GREATER, Bytes.toBytes("25"));

        filterList.addFilter(columnValueFilter);

        filterList.addFilter(rowFilter);

        // 設定過濾器

        scan.setFilter(filterList);

        // 擷取結果集

        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {

            byte[] name = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("name"));

            byte[] age = result.getValue(Bytes.toBytes("info1"),

                    Bytes.toBytes("age"));

            if (name != null) {

                System.out.print(Bytes.toString(name) + " ");

            }

            if (age != null) {

                System.out.print(Bytes.toString(age) + " ");

            }

            System.out.println();

        }

    }

    public static void main(String[] arg) {

        System.setProperty("hadoop.home.dir",

                "/home/slave3/hadoop/hadoop-3.1.1");

        try {

            // testCreateTable();

            // testDeleteTable();

            // testPut();

            // testPut2();

            // testUpdate();

            // testDeleteData();

            // testGetSingle();

            // testGetMany();

            // testFilter();

            // testRowkeyFilter();

            // testColumnPrefixFilter();

            //testFilterList();

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

}

  1. Mapreduce相關操作

需要将hadoop檔案夾中share檔案夾下的所有包加入到工程。

    1. WordCount

統計檔案中每個單詞出現的次數。

MapTask.java

public class MapTask extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override

    protected void map(LongWritable key, Text value, Context context)

            throws IOException, InterruptedException {

        String[] split = value.toString().split(" ");

        for (String word : split) {

            context.write(new Text(word), new IntWritable(1));

        }

    }

}

ReduceTask.java

public class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,

            Context context) throws IOException, InterruptedException {

        // TODO Auto-generated method stub

        int count = 0;

        for (IntWritable value : values) {

            count += value.get();

        }

        context.write(key, new IntWritable(count));

    }

}

Mycombiner.java

public class Mycombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override

    protected void reduce(Text arg0, Iterable<IntWritable> arg1,

            Reducer<Text, IntWritable, Text, IntWritable>.Context arg2)

            throws IOException, InterruptedException {

        int count = 0;

        for (IntWritable intWritable : arg1) {

            count += intWritable.get();

        }

        arg2.write(arg0, new IntWritable(count));

    }

}

Driver.java

public class Driver {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // System.setProperty("HADOOP_USER_NAME", "hasee");

        Job job = Job.getInstance(conf);

        job.setMapperClass(MapTask.class);

        job.setReducerClass(ReduceTask.class);

        job.setJarByClass(Driver.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        // 設定combiner

        job.setCombinerClass(Mycombiner.class);

        // 設定輸入和輸出目錄

        FileInputFormat.setInputPaths(job, new Path(

                "/home/slave3/Downloads/test"));

        FileOutputFormat.setOutputPath(job, new Path(

                "/home/slave3/Downloads/result"));

        File file = new File("/home/slave3/Downloads/test.txt");

        if (file.exists()) {

            FileUtils.deleteDirectory(file);

        }

        // 送出任務

        boolean b = job.waitForCompletion(true);

        System.out.println(b ? 0 : 1)

    }

}

    1. 多個檔案中同一字元分别在某個檔案中出現的次數

第一步:首先将每個檔案中的字元數統計出來:例如hello-a.txt 3

CreateIndexOne.java

public class CreateIndexOne {

    public static class MapTask extends

            Mapper<LongWritable, Text, Text, IntWritable> {

        String pathname = null;

        @Override

        protected void setup(

            Mapper<LongWritable, Text, Text, IntWritable>.Context context)

                throws IOException, InterruptedException {

            // 擷取目前檔案名 計算切片

            FileSplit fileSplit = (FileSplit) context.getInputSplit();

            pathname = fileSplit.getPath().getName();

        }

        @Override

        protected void map(LongWritable key, Text value,

            Mapper<LongWritable, Text, Text, IntWritable>.Context context)

                throws IOException, InterruptedException {

            String[] words = value.toString().split(" ");

            for (String word : words) {

            context.write(new Text(word + "-" + pathname), new IntWritable(

                        1));

            }

        }

    }

    public static class ReduceTask extends

            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override

        protected void reduce(Text key, Iterable<IntWritable> values,

            Reducer<Text, IntWritable, Text, IntWritable>.Context context)

                throws IOException, InterruptedException {

                        int count = 0;

            for (IntWritable value : values) {

                count++;

            }

            context.write(key, new IntWritable(count));

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // System.setProperty("HADOOP_USER_NAME", "hasee");

        Job job = Job.getInstance(conf);

        job.setMapperClass(MapTask.class);

        job.setReducerClass(ReduceTask.class);

        job.setJarByClass(CreateIndexOne.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        // 設定輸入和輸出目錄

        FileInputFormat.setInputPaths(job, new Path(

                "/home/slave3/Downloads/test2"));//在這個路徑下建立多個txt檔案,并分别輸入若幹單詞

        FileOutputFormat.setOutputPath(job, new Path(

                "/home/slave3/Downloads/result2"));

        File file = new File("/home/slave3/Downloads/result2");

        if (file.exists()) {

            FileUtils.deleteDirectory(file);

        }

        boolean b = job.waitForCompletion(true);

        System.out.println(b ? 0 : 1);

    }

}

第二步:使用第一步的結果,合并每個檔案在各個單詞的次數。例如 hello    a.txt  3,b.txt  2

CreateIndexTwo.Java

public class CreateIndexTwo {

    public static class MapTask extends Mapper<LongWritable, Text, Text, Text> {

        Text outKey = new Text();

        Text outValue = new Text();

        @Override

        protected void map(LongWritable key, Text value,

                Mapper<LongWritable, Text, Text, Text>.Context context)

                throws IOException, InterruptedException {

            String[] split = value.toString().split("-");

            String word = split[0];

            String nameNum = split[1];

            outKey.set(word);

            outValue.set(nameNum);

            context.write(outKey, outValue);

        }

    }

    public static class ReduceTask extends Reducer<Text, Text, Text, Text> {

        @Override

        protected void reduce(Text key, Iterable<Text> values,

                Reducer<Text, Text, Text, Text>.Context context)

                throws IOException, InterruptedException {

            StringBuilder builder = new StringBuilder();

            boolean flag = true;

            for (Text text : values) {

                if (flag) {

                    builder.append(text.toString());

                    flag = false;

                } else {

                    builder.append(",");

                    builder.append(text.toString());

                }

            }

            context.write(key, new Text(builder.toString()));

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setMapperClass(MapTask.class);

        job.setReducerClass(ReduceTask.class);

        job.setJarByClass(CreateIndexTwo.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        // 設定輸入和輸出目錄

        FileInputFormat.setInputPaths(job, new Path(

                "/home/slave3/Downloads/result2"));

        FileOutputFormat.setOutputPath(job, new Path(

                "/home/slave3/Downloads/result3"));

        File file = new File("/home/slave3/Downloads/result3");

        if (file.exists()) {

            FileUtils.deleteDirectory(file);

        }

        boolean b = job.waitForCompletion(true);

        System.out.println(b ? 0 : 1);

    }

}

繼續閱讀