天天看點

Scala操作HDFS

Scala操作HDFS

基本的HDFS操作包括:擷取檔案系統、檢查檔案是否存在、列舉目前目錄下的所有檔案路徑、列舉目前目錄下的所有檔案名稱、删除目前路徑、建立新的路徑

import org.apache.hadoop.conf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object MyHDFSUtils {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("MyHDFSUtils")
    val sc = new SparkContext(sparkConf)
    val path = args(0)
    println(listDirPath(sc, path).mkString("\n"))
  }

  /**
    * 獲得HDFS檔案系統
    *
    * @param sc       SparkContext
    * @param filePath 檔案路徑 (String)
    * @return
    */
  private def getFileSystem(sc: SparkContext, filePath: String): FileSystem = {
    new Path(filePath).getFileSystem(new Configuration())
  }


  /**
    * 判斷檔案是否存在
    *
    * @param sc       SparkContext
    * @param filePath 檔案路徑 (String)
    * @return
    */
  def existsFile(sc: SparkContext, filePath: String): Boolean = {
    getFileSystem(sc, filePath)
      .exists(new Path(filePath))
  }


  /**
    * 擷取目錄大小
    *
    * @param sc       SparkContext
    * @param filePath 檔案路徑 (String)
    * @return
    */
  def getDirSize(sc: SparkContext, filePath: String): Long = {
    val fs: FileSystem = getFileSystem(sc, filePath)
    fs.getContentSummary(new Path(filePath)).getSpaceConsumed
  }

  /**
    * 列出目錄下檔案絕對路徑
    *
    * @param sc       SparkContext
    * @param filePath 檔案路徑 (String)
    * @return
    */
  def listDirPath(sc: SparkContext, filePath: String): Array[String] = {
    val fs = getFileSystem(sc, filePath)
    val fileStatus: Array[FileStatus] = fs.listStatus(new Path(filePath))
    fileStatus.map(
      status => {
        status.getPath.toString
      }
    )
  }

  /**
    * 列出目錄下所有檔案或者目錄名
    *
    * @param sc
    * @param filePath
    * @return
    */
  def listDirNames(sc: SparkContext, filePath: String): Array[String] = {
    val fs = getFileSystem(sc, filePath)
    val fileStatus: Array[FileStatus] = fs.listStatus(new Path(filePath))
    fileStatus.map(
      status => {
        status.getPath.getName
      }
    )
  }

  /**
    * 删除HDFS路徑
    *
    * @param filePath  檔案或者目錄路徑
    * @param recursive 是否遞歸删除
    * @return
    */
  def deleteHDFSPath(filePath: String, recursive: Boolean = true): Boolean = {
    var flag = true
    val path = new Path(filePath)
    val fs = FileSystem.get(path.toUri, new Configuration())
    if (fs.exists(path)) {
      flag = fs.delete(path, recursive)
    }
    flag
  }

  /**
    * 建立HDFS目錄,如果已存在就不建立
    *
    * @param filePath
    * @return
    */
  def makeHDFSDirs(filePath: String): Boolean = {
    var flag: Boolean = true
    val path = new Path(filePath)
    val fs = FileSystem.get(path.toUri, new conf.Configuration())
    if (!fs.exists(path)) {
      flag = fs.mkdirs(path)
    }
    flag
  }
}

           

繼續閱讀