天天看點

hdfs api demo

工作中常用到的HDFS API操作

package xxx.hdfs;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;



public class HdfsApiDemo {

    static  FileSystem fs = null;
    static Configuration conf = null;

    static {
      conf = new Configuration();
    }


    public static void main(String[] args) throws Exception {
		//這裡的URI位址預設端口,如果是cdh版就是8020,如果是Apache版就是9000 當然core-site.xml中 fs.defaultFS 配置也能檢視到
        fs =  FileSystem.get(new URI("hdfs://cdh01:8020"), conf, "root");

        //建立檔案夾
        //fs.mkdirs(new Path("/testDir/"));

        Path src = new Path("F:/testDir/coding/G7-01/bigdata/data/wc.txt");
        Path dest = new Path("/testDir/");
        // fs.copyFromLocalFile(src,dest);

       // boolean delete = fs.delete(new Path("/out"), true);
        //System.out.println(delete);

        //下載下傳
        //fs.copyToLocalFile(new Path("/testDir/test.log"),new Path("out/"));

        //重命名
        //boolean rename = fs.rename(new Path("/testDir/aow_drv.log"), new Path("/testDir/rename.log"));
        // System.out.println(reName);

        //listFiles()方法用來列出某個檔案夾下的所有檔案,參數1是路徑,參數2是表示是否級聯(該檔案夾下面還有 子檔案 要不要看,注意沒有 子檔案夾!!)
   /*   RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/apps/"), true);
        while (listFiles.hasNext()){
            LocatedFileStatus files = listFiles.next();
            Path path = files.getPath();
            FsPermission permission = files.getPermission();
            String owner = files.getOwner();
            boolean directory = files.isDirectory();
            String name = files.getPath().getName();
            System.out.println(name+": 1"+path+" 2: "+permission+": "+owner+": "+directory);
        }*/


       //uploadByStream("data/consumerinfo.log","/testDir/consumerinfo.log");

        //downloadByStream("/testDir/consumerinfo.log","out/down.log");

        
        /**
         * HDFS上的目錄結構:20191001這個參數不是寫死的,是外面傳進去,對裡面的檔案重命名
         */
        //allReName("/testDir/","20190827");

        fs.close();

    }

    /**
     * 
     * 修改某檔案夾中所有的檔案名稱
     * @param source
     * @param filename
     * @throws IOException
     */
    public static void allReName(String source,String filename) throws IOException{
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(source), true);
        int i = 0;
        while (listFiles.hasNext()){
            i++;
            LocatedFileStatus files = listFiles.next();
            Path path = files.getPath();
            String name = files.getPath().getName();
            //截取檔案字尾
            String suffix = name.substring(name.lastIndexOf("."));
            //拼接目标檔案名
            String pinname = i + "-" + filename + suffix;

            String newName = path.toString().replaceAll(name,pinname);
            fs.rename(path,new Path(newName));
        }
    }

    /**
     * 重命名
     * @param source
     * @param desc
     * @return
     * @throws IOException
     */
    public static boolean reName(String source,String desc) throws IOException {

        return fs.rename(new Path(source),new Path(desc));
    }

    /**
     * 普通api方式上傳
     * @param srcFile
     * @param destFile
     * @throws IOException
     */
    public static void copyFromLocalFile(String srcFile,String destFile) throws IOException {
        fs.copyFromLocalFile(new Path(srcFile),new Path(destFile));

    }

    /**
     * 普通api方式下載下傳
     * @param srcFile
     * @param destFile
     * @throws IOException
     */
    public static void downloadFile(String srcFile,String destFile) throws IOException{
        fs.copyToLocalFile(new Path(srcFile),new Path(destFile));
    }

    /**
     * 流的方式實作上傳
     * @param srcfile
     * @param descfile
     * @throws IOException
     */
    public static void uploadByStream(String srcfile,String descfile) throws IOException {
        FileInputStream fileInputStream = new FileInputStream(srcfile);
        final FSDataOutputStream fsout = fs.create(new Path(descfile));
        IOUtils.copyBytes(fileInputStream,fsout,4096);
    }

    /**
     * 通過流的方式實作下載下傳
     * @param scrfile
     * @param descfile
     * @throws IOException
     */
    public static void downloadByStream(String scrfile,String descfile) throws IOException{
        FSDataInputStream input = fs.open(new Path(scrfile));
        FileOutputStream out = new FileOutputStream(new File(descfile));
        IOUtils.copyBytes(input,out,4096);
    }

}

           

繼續閱讀