Hadoop相关操作
部分转自:http://www.cnblogs.com/shadowalker/p/7350484.html
https://blog.csdn.net/zhangshk_/article/details/83690790
- HDFS操作
- Shell命令
表格中完整地列出了支持的命令选项:
选项名称 | 使用格式 | 含义 |
-ls | -ls <路径> | 查看指定路径的当前目录结构 |
-lsr | -lsr <路径> | 递归查看指定路径的目录结构 |
-du | -du <路径> | 统计目录下个文件大小 |
-dus | -dus <路径> | 汇总统计目录下文件(夹)大小 |
-count | -count [-q] <路径> | 统计文件(夹)数量 |
-mv | -mv <源路径> <目的路径> | 移动 |
-cp | -cp <源路径> <目的路径> | 复制 |
-rm | -rm [-skipTrash] <路径> | 删除文件/空白文件夹 |
-rmr | -rmr [-skipTrash] <路径> | 递归删除 |
-put | -put <多个linux上的文件> <hdfs路径> | 上传文件 |
-copyFromLocal | -copyFromLocal <多个linux上的文件> <hdfs路径> | 从本地复制 |
-moveFromLocal | -moveFromLocal <多个linux上的文件> <hdfs路径> | 从本地移动 |
-getmerge | -getmerge <源路径> <linux路径> | 合并到本地 |
-cat | -cat <hdfs路径> | 查看文件内容 |
-text | -text <hdfs路径> | 查看文件内容 |
-copyToLocal | -copyToLocal [-ignoreCrc] [-crc] [hdfs源路径] [linux目的路径] | 从本地复制 |
-moveToLocal | -moveToLocal [-crc] <hdfs源路径> <linux目的路径> | 从本地移动 |
-mkdir | -mkdir <hdfs路径> | 创建空白文件夹 |
-setrep | -setrep [-R] [-w] <副本数> <路径> | 修改副本数量 |
-touchz | -touchz <文件路径> | 创建空白文件 |
-stat | -stat [format] <路径> | 显示文件统计信息 |
-tail | -tail [-f] <文件> | 查看文件尾部信息 |
-chmod | -chmod [-R] <权限模式> [路径] | 修改权限 |
-chown | -chown [-R] [属主][:[属组]] 路径 | 修改属主 |
-chgrp | -chgrp [-R] 属组名称 路径 | 修改属组 |
-help | -help [命令选项] | 帮助 |
注意:以上表格中路径包括hdfs中的路径和linux中的路径。对于容易产生歧义的地方,会特别指出“linux路径”或者“hdfs路径”。如果没有明确指出,意味着是hdfs路径。
下面简单介绍几个常用命令选项的用法。
- -ls 功能:显示目录信息
hdfs dfs -ls /
- -mkdir 功能:在HDFS文件系统上创建目录。
hdfs dfs -mkdir /test
- -put 功能:上传本地文件到HDFS指定目录,info处是本地文件路径。
hdfs dfs -put info /test
- -get 功能:从hdfs下载文件到本地。
hdfs dfs -get /test/info ./
- -rm 功能:从HDFS删除文件。
hdfs dfs -rm /test/info
hdfs dfs -ls /test
- -moveFromLocal 功能:剪切本地文件到HDFS,info处是本地文件路径。
hdfs dfs -moveFromLocal info /test
- -cat 功能:显示文件内容。
hdfs dfs -cat /test/info
- -appendToFile 功能:在文件末尾追加数据,info处是本地文件路径。
hdfs dfs -appendToFile info /test/info
hdfs dfs -cat /test/info
- -chmod 功能:更改文件所属权限。
hdfs dfs -ls /test
- -cp 功能:实现HDFS目录中的文件的拷贝。
将/
test
/info
拷贝到/tmp下,这里要先在HDFS下创建好/tmp:
hdfs dfs -cp /test/info /tmp/
hdfs dfs -ls /tmp
- -mv 功能:在HDFS中移动文件。
将/
test
/info
移动到 /user下,需要先创建/user
hdfs dfs -mv /test/info /user/
hdfs dfs -ls /test
hdfs dfs -ls /user
- -df 功能:统计文件系统的可用空间信息。
hdfs dfs -df -h /
- -du 功能:统计文件夹的大小信息。
hdfs dfs -du /user
- -count 功能: 统计一个指定目录下的文件数量。
hdfs dfs -count /user
-
- Java API编程
public class Test { private FileSystem fs; private URI uri; Configuration cf; //private static String rec="hdfs://localhost:9000/test"; private static String rec="hdfs://localhost:9000/"; Test(String resource){ cf=new Configuration(); try { uri=new URI(resource); try { fs=FileSystem.newInstance(uri, cf); } catch (IOException e) { e.printStackTrace(); } } catch (URISyntaxException e) { e.printStackTrace(); } } public void createDir(String src){ try { fs.mkdirs(new Path(src)); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void readFile(){ InputStream input=null; ByteArrayOutputStream ouput=null; try { input=fs.open(new Path(rec+"/test")); ouput=new ByteArrayOutputStream(input.available()); IOUtils.copyBytes(input, ouput, cf); System.out.print(ouput.toString()); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void listAll(String src){ try { FileStatus[] status=fs.listStatus(new Path(src)); for(int i=0;i<status.length;i++){ System.out.println(status[i].getPath().toString()); } } catch (IllegalArgumentException | IOException e) { e.printStackTrace(); } } public void copyFromLocalDir(String src,String dst){ try { fs.copyFromLocalFile(new Path(src), new Path(dst)); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void copyToLocalDir(String src,String dst){ try { boolean isExist=fs.exists(new Path(src)); if(isExist){ fs.copyToLocalFile(new Path(src),new Path(dst)); } else{ System.out.println("文件不存在!"); } } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void copyAllToLocalDir(String src,String dst){ try { boolean isExist=fs.exists(new Path(src)); if(isExist){ FileStatus[] status=fs.listStatus(new Path(src)); for(int i=0;i<status.length;i++){ fs.copyToLocalFile(new Path(status[i].getPath().toString()),new Path(dst)); } } else{ System.out.println("文件不存在!"); } } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void deleteFile(String src){ try { boolean isExist=fs.exists(new Path(src)); if(isExist){ fs.delete(new Path(src)); } else{ System.out.println("文件不存在!"); } } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { // TODO Auto-generated method stub Test ts=new Test(rec); System.out.println("All Files"); ts.listAll(rec); ts.copyFromLocalDir("/home/wangzun/文档/hadoopts.pdf", rec+"/test"); ts.listAll(rec+"/test"); ts.copyAllToLocalDir(rec+"/test", "/home/wangzun/hadooptest/"); //ts.deleteFile(rec+"/test/test"); //ts.listAll(rec+"/test"); //ts.copyToLocalDir(rec+"/test/大数据技术原理与应用.pdf", "/home/wangzun/hadooptest/");//这里的路径需要修改 //ts.readFile(); } } |
- HBase相关操作
- Shell命令操作
“hbase(main):002:0>”后的语句是输入的命令
- 表结构操作
- 创建表
语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}
创建一个User表,并且有一个info列族
hbase(main):002:0> create 'User','info'
- 查看所有表
hbase(main):003:0> list
- 查看表详情
hbase(main):004:0> describe 'User'
或
hbase(main):025:0> desc 'User'
- 表修改
删除指定的列族
hbase(main):002:0> alter 'User', 'delete' => 'info'
- 表数据操作
- 插入数据
语法:put <table>,<rowkey>,<family:column>,<value>
hbase(main):005:0> put 'User', 'row1', 'info:name', 'xiaoming'
hbase(main):006:0> put 'User', 'row2', 'info:age', '18'
hbase(main):007:0> put 'User', 'row3', 'info:sex', 'man'
- 根据rowKey查询某个记录
语法:get <table>,<rowkey>,[<family:column>,....]
hbase(main):008:0> get 'User', 'row2'
hbase(main):028:0> get 'User', 'row3', 'info:sex'
hbase(main):036:0> get 'User', 'row1', {COLUMN => 'info:name'}
- 查询所有记录
语法:scan <table>, {COLUMNS => [ <family:column>,...], LIMIT => num}
扫描所有记录
hbase(main):009:0> scan 'User'
扫描前2条
hbase(main):037:0> scan 'User', {LIMIT => 2}
范围查询
hbase(main):011:0> scan 'User', {STARTROW => 'row2'}
hbase(main):012:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row2'}
hbase(main):013:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}
另外,还可以添加TIMERANGE和FITLER等高级功能
STARTROW,ENDROW必须大写,否则报错;查询结果不包含等于ENDROW的结果集
- 统计表记录数
语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
INTERVAL设置多少行显示一次及对应的rowkey,默认1000;CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
hbase(main):020:0> count 'User'
- 删除
删除列
hbase(main):008:0> delete 'User', 'row1', 'info:age'
删除所有行
hbase(main):014:0> deleteall 'User', 'row2'
删除表中所有数据
hbase(main):016:0> truncate 'User'
- 表管理操作
- 禁用表
hbase(main):014:0> disable 'User'
hbase(main):015:0> describe 'User'
hbase(main):016:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}
- 启用表
hbase(main):017:0> enable 'User'
hbase(main):018:0> describe 'User'
hbase(main):019:0> scan 'User', {STARTROW => 'row2', ENDROW => 'row3'}
- 测试表是否存在
hbase(main):022:0> exists 'User'
hbase(main):023:0> exists 'user'
hbase(main):024:0> exists user
报错NameError: undefined local variable or method `user' for main
- 删除表
删除前,必须先disable
hbase(main):030:0> drop 'User'
报错ERROR: Table TEST.USER is enabled. Disable it first.
hbase(main):031:0> disable 'User'
hbase(main):033:0> drop 'User'
查看结果
hbase(main):034:0> list
-
- Java API编程
需要将hbase文件夹下的lib文件夹下的包加入到工程中
public class HBaseConn { static Configuration conf = null; static Connection conn = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.master", "127.0.0.1:60000"); try { conn = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } } public static void testCreateTable() throws Exception { // 创建表管理类 Admin admin = conn.getAdmin(); // 创建表描述类 TableName tableName = TableName.valueOf("user2"); HTableDescriptor descriptor = new HTableDescriptor(tableName); // 创建列族描述类 HColumnDescriptor info1 = new HColumnDescriptor("info1"); // 列族加入表中 descriptor.addFamily(info1); HColumnDescriptor info2 = new HColumnDescriptor("info2"); descriptor.addFamily(info2); // 创建表 admin.createTable(descriptor); } public static void testDeleteTable() throws Exception { TableName t1 = TableName.valueOf("user2"); Admin admin = conn.getAdmin(); admin.disableTable(t1); admin.deleteTable(t1); } public static void testPut() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); // rowkey Put put = new Put(Bytes.toBytes("1234")); // 列族,列,值 put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("gender"), Bytes.toBytes("1")); put.addColumn(Bytes.toBytes("info2"), Bytes.toBytes("name"), Bytes.toBytes("wangwu")); table.put(put); } public static void testPut2() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); List<Put> putList = new ArrayList<>(); for (int i = 20; i <= 30; i++) { // rowkey Put put = new Put(Bytes.toBytes("jbm_" + i)); // 列族,列,值 put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("age"), Bytes.toBytes(Integer.toString(i))); put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes("lucy" + i)); putList.add(put); } table.put(putList); } public static void testUpdate() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); Put put = new Put(Bytes.toBytes("1234")); put.addColumn(Bytes.toBytes("info2"), Bytes.toBytes("name"), Bytes.toBytes("tom")); table.put(put); } public static void testDeleteData() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); Delete delete = new Delete(Bytes.toBytes("1234")); table.delete(delete); } public static void testGetSingle() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); // rowkey Get get = new Get(Bytes.toBytes("jbm_20")); Result result = table.get(get); // 列族,列名 byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name")); byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age")); System.out.println(Bytes.toString(name)); System.out.println(Bytes.toString(age)); } public static void testGetMany() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); Scan scan = new Scan(); // 字典序 类似于分页 scan.setStartRow(Bytes.toBytes("jbm_20")); scan.setStopRow(Bytes.toBytes("jbm_30")); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { // Single row result of a Get or Scan query. Result // Result 一次获取一个rowkey对应的记录 // 列族,列名 byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name")); byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age")); System.out.print(Bytes.toString(name) + ","); System.out.print(Bytes.toString(age)); System.out.println(); } } public static void testFilter() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); Scan scan = new Scan(); // 列值过滤器 SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter( Bytes.toBytes("info1"), Bytes.toBytes("name"), CompareOp.EQUAL, Bytes.toBytes("lucy25")); // 设置过滤器 scan.setFilter(columnValueFilter); // 获取结果集 ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name")); byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age")); System.out.print(Bytes.toString(name) + ","); System.out.print(Bytes.toString(age)); System.out.println(); } } public static void testRowkeyFilter() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); Scan scan = new Scan(); // rowkey过滤器 // 匹配以jbm开头的 RowFilter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^jbm")); // 设置过滤器 scan.setFilter(filter); // 获取结果集 ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name")); byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age")); System.out.print(Bytes.toString(name) + ","); System.out.print(Bytes.toString(age)); System.out.println(); } } public static void testColumnPrefixFilter() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); Scan scan = new Scan(); // 列名前缀过滤器 列名前缀为na(注:不是指值的前缀) ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na")); // 设置过滤器 scan.setFilter(filter); // 获取结果集 ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name")); byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age")); if (name != null) { System.out.print(Bytes.toString(name) + " "); } if (age != null) { System.out.print(age); } System.out.println(); } } public static void testFilterList() throws Exception { TableName t1 = TableName.valueOf("user2"); Table table = conn.getTable(t1); Scan scan = new Scan(); // 过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or) FilterList filterList = new FilterList(Operator.MUST_PASS_ALL); // ROWKEY过滤器 RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^jbm")); // 列值过滤器 age大于25 SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter( Bytes.toBytes("info1"), Bytes.toBytes("age"), CompareOp.GREATER, Bytes.toBytes("25")); filterList.addFilter(columnValueFilter); filterList.addFilter(rowFilter); // 设置过滤器 scan.setFilter(filterList); // 获取结果集 ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name")); byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age")); if (name != null) { System.out.print(Bytes.toString(name) + " "); } if (age != null) { System.out.print(Bytes.toString(age) + " "); } System.out.println(); } } public static void main(String[] arg) { System.setProperty("hadoop.home.dir", "/home/slave3/hadoop/hadoop-3.1.1"); try { // testCreateTable(); // testDeleteTable(); // testPut(); // testPut2(); // testUpdate(); // testDeleteData(); // testGetSingle(); // testGetMany(); // testFilter(); // testRowkeyFilter(); // testColumnPrefixFilter(); //testFilterList(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
- Mapreduce相关操作
需要将hadoop文件夹中share文件夹下的所有包加入到工程。
-
- WordCount
统计文件中每个单词出现的次数。
MapTask.java public class MapTask extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(" "); for (String word : split) { context.write(new Text(word), new IntWritable(1)); } } } |
ReduceTask.java public class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } } |
Mycombiner.java public class Mycombiner extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : arg1) { count += intWritable.get(); } arg2.write(arg0, new IntWritable(count)); } } |
Driver.java public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // System.setProperty("HADOOP_USER_NAME", "hasee"); Job job = Job.getInstance(conf); job.setMapperClass(MapTask.class); job.setReducerClass(ReduceTask.class); job.setJarByClass(Driver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置combiner job.setCombinerClass(Mycombiner.class); // 设置输入和输出目录 FileInputFormat.setInputPaths(job, new Path( "/home/slave3/Downloads/test")); FileOutputFormat.setOutputPath(job, new Path( "/home/slave3/Downloads/result")); File file = new File("/home/slave3/Downloads/test.txt"); if (file.exists()) { FileUtils.deleteDirectory(file); } // 提交任务 boolean b = job.waitForCompletion(true); System.out.println(b ? 0 : 1) } } |
-
- 多个文件中同一字符分别在某个文件中出现的次数
第一步:首先将每个文件中的字符数统计出来:例如hello-a.txt 3
CreateIndexOne.java public class CreateIndexOne { public static class MapTask extends Mapper<LongWritable, Text, Text, IntWritable> { String pathname = null; @Override protected void setup( Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 获取当前文件名 计算切片 FileSplit fileSplit = (FileSplit) context.getInputSplit(); pathname = fileSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for (String word : words) { context.write(new Text(word + "-" + pathname), new IntWritable( 1)); } } } public static class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count++; } context.write(key, new IntWritable(count)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // System.setProperty("HADOOP_USER_NAME", "hasee"); Job job = Job.getInstance(conf); job.setMapperClass(MapTask.class); job.setReducerClass(ReduceTask.class); job.setJarByClass(CreateIndexOne.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入和输出目录 FileInputFormat.setInputPaths(job, new Path( "/home/slave3/Downloads/test2"));//在这个路径下创建多个txt文件,并分别输入若干单词 FileOutputFormat.setOutputPath(job, new Path( "/home/slave3/Downloads/result2")); File file = new File("/home/slave3/Downloads/result2"); if (file.exists()) { FileUtils.deleteDirectory(file); } boolean b = job.waitForCompletion(true); System.out.println(b ? 0 : 1); } } |
第二步:使用第一步的结果,合并每个文件在各个单词的次数。例如 hello a.txt 3,b.txt 2
CreateIndexTwo.Java public class CreateIndexTwo { public static class MapTask extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("-"); String word = split[0]; String nameNum = split[1]; outKey.set(word); outValue.set(nameNum); context.write(outKey, outValue); } } public static class ReduceTask extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuilder builder = new StringBuilder(); boolean flag = true; for (Text text : values) { if (flag) { builder.append(text.toString()); flag = false; } else { builder.append(","); builder.append(text.toString()); } } context.write(key, new Text(builder.toString())); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setMapperClass(MapTask.class); job.setReducerClass(ReduceTask.class); job.setJarByClass(CreateIndexTwo.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置输入和输出目录 FileInputFormat.setInputPaths(job, new Path( "/home/slave3/Downloads/result2")); FileOutputFormat.setOutputPath(job, new Path( "/home/slave3/Downloads/result3")); File file = new File("/home/slave3/Downloads/result3"); if (file.exists()) { FileUtils.deleteDirectory(file); } boolean b = job.waitForCompletion(true); System.out.println(b ? 0 : 1); } } |