Scala操作HDFS
基本的HDFS操作包括:擷取檔案系統、檢查檔案是否存在、列舉目前目錄下的所有檔案路徑、列舉目前目錄下的所有檔案名稱、删除目前路徑、建立新的路徑
import org.apache.hadoop.conf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
object MyHDFSUtils {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("MyHDFSUtils")
val sc = new SparkContext(sparkConf)
val path = args(0)
println(listDirPath(sc, path).mkString("\n"))
}
/**
* 獲得HDFS檔案系統
*
* @param sc SparkContext
* @param filePath 檔案路徑 (String)
* @return
*/
private def getFileSystem(sc: SparkContext, filePath: String): FileSystem = {
new Path(filePath).getFileSystem(new Configuration())
}
/**
* 判斷檔案是否存在
*
* @param sc SparkContext
* @param filePath 檔案路徑 (String)
* @return
*/
def existsFile(sc: SparkContext, filePath: String): Boolean = {
getFileSystem(sc, filePath)
.exists(new Path(filePath))
}
/**
* 擷取目錄大小
*
* @param sc SparkContext
* @param filePath 檔案路徑 (String)
* @return
*/
def getDirSize(sc: SparkContext, filePath: String): Long = {
val fs: FileSystem = getFileSystem(sc, filePath)
fs.getContentSummary(new Path(filePath)).getSpaceConsumed
}
/**
* 列出目錄下檔案絕對路徑
*
* @param sc SparkContext
* @param filePath 檔案路徑 (String)
* @return
*/
def listDirPath(sc: SparkContext, filePath: String): Array[String] = {
val fs = getFileSystem(sc, filePath)
val fileStatus: Array[FileStatus] = fs.listStatus(new Path(filePath))
fileStatus.map(
status => {
status.getPath.toString
}
)
}
/**
* 列出目錄下所有檔案或者目錄名
*
* @param sc
* @param filePath
* @return
*/
def listDirNames(sc: SparkContext, filePath: String): Array[String] = {
val fs = getFileSystem(sc, filePath)
val fileStatus: Array[FileStatus] = fs.listStatus(new Path(filePath))
fileStatus.map(
status => {
status.getPath.getName
}
)
}
/**
* 删除HDFS路徑
*
* @param filePath 檔案或者目錄路徑
* @param recursive 是否遞歸删除
* @return
*/
def deleteHDFSPath(filePath: String, recursive: Boolean = true): Boolean = {
var flag = true
val path = new Path(filePath)
val fs = FileSystem.get(path.toUri, new Configuration())
if (fs.exists(path)) {
flag = fs.delete(path, recursive)
}
flag
}
/**
* 建立HDFS目錄,如果已存在就不建立
*
* @param filePath
* @return
*/
def makeHDFSDirs(filePath: String): Boolean = {
var flag: Boolean = true
val path = new Path(filePath)
val fs = FileSystem.get(path.toUri, new conf.Configuration())
if (!fs.exists(path)) {
flag = fs.mkdirs(path)
}
flag
}
}