HDFS
HDFS,分布式檔案系統
HDFS,Hadoop Distributed File System,分布式檔案系統,有着高容錯性(fault-tolerent)的特點,并且設計用來部署在低廉的(low-cost)硬體上,而且它提供高吞吐量(high throughput)來通路應用程式的資料,适合那些有着超大資料集(large data set)的應用程式。HDFS 放寬了(relax)POSIX 的要求(requirements)這樣可以實作流的形式通路(streaming access)檔案系統中的資料,HDFS 開始是為開源的 apache 項目nutch的基礎結構而建立,HDFS是 hadoop項目的一部分,而hadoop又是lucene的一部分。
HDFS,環境搭建
上一節:大資料叢集Hadoop搭建
Java操作HDFS檔案系統
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
代碼案例1:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
public class HdfsDemo {
public static void main(String[] args) throws Exception {
FileSystem fileSystem = getHdfs();
// 建立一個檔案夾
fileSystem.mkdirs(new Path("/demo/hello"));
// 上傳檔案
Path localPath1 = new Path("D:\\Temp\\hello.txt");
Path remotePath1 = new Path("/demo/hello/");
fileSystem.copyFromLocalFile(false, true, localPath1, remotePath1);
Path localPath2 = new Path("D:\\Temp\\hello2.txt");
Path remotePath2 = new Path("/demo/hello/");
fileSystem.copyFromLocalFile(false, true, localPath2, remotePath2);
// 下載下傳檔案
Path remotePath3 = new Path("/demo/hello/hello2.txt");
Path remotePath4 = new Path("/demo/hello/helloworld2.txt");
fileSystem.rename(remotePath3, remotePath4);
// 下載下傳檔案
Path remoteDownPath1 = new Path("/demo/hello/hello.txt");
Path localDownPath1 = new Path("D:/Temp/hello3.txt");
// fileSystem.copyToLocalFile(false, remoteDownPath1, localDownPath1, true);
// 删除檔案
fileSystem.delete(remoteDownPath1, false);
// 擷取檔案詳細資訊
fileDetail(fileSystem);
// 判斷是檔案夾還是檔案
file(fileSystem);
close(fileSystem);
}
public static void file(FileSystem fileSystem) throws IOException {
FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));
for (FileStatus status : listStatus) {
if (status.isFile()) {
System.out.println("檔案:" + status.getPath().getName());
} else {
System.out.println("目錄:" + status.getPath().getName());
}
}
}
public static void fileDetail(FileSystem fileSystem) throws IOException {
// 擷取所有檔案資訊
RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
// 周遊疊代器
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("==========" + fileStatus.getPath() + "==========");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
// 擷取塊資訊
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
/**
* @return
* @throws Exception
*/
public static FileSystem getHdfs() throws Exception {
// 擷取連接配接叢集的位址
URI uri = new URI("hdfs://192.168.2.53:8020");
Configuration configuration = new Configuration();
//設定配置檔案中副本的數量
configuration.set("dfs.replication", "2");
configuration.set("fs.defaultFS", "hdfs://192.168.2.53:8020");
// 使用者
String user = "admin";
FileSystem fileSystem = FileSystem.get(uri, configuration, user);
return fileSystem;
}
/**
* @param fileSystem
* @throws IOException
*/
public static void close(FileSystem fileSystem) throws IOException {
if (fileSystem != null) {
fileSystem.close();
}
fileSystem = null;
}
}
代碼案例2:
import java.io.FileInputStream;
import java.io.FileOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
//操作HDFS
public class FileHdfsDownLoad {
//下載下傳
public static void main(String[] args) throws Exception {
//建立一個操作HDFS的對象
Configuration config = new Configuration();
//設定java代碼連接配接哪個hdfs
config.set("fs.defaultFS", "hdfs://192.168.2.53:8020");
//擷取操作HDFS的對象
FileSystem fileSystem = FileSystem.get(config);
//下載下傳檔案
downloadFile(fileSystem);
// 上傳檔案
putFile(fileSystem);
// 關閉
fileSystem.close();
}
private static void putFile(FileSystem fileSystem) throws Exception {
//進行上傳檔案操作
//擷取輸入流
FileInputStream fileInputStream = new FileInputStream("D:/Temp/Anaconda.txt");
//擷取hdfs中的輸出流
Path remoteDownPath1 = new Path("/demo/hello/Anaconda.txt");
FSDataOutputStream fsDataOutputStream = fileSystem.create(remoteDownPath1);
//第一個參數是一個輸入流、第二個參數是輸出流,第三個是緩存區大小,第四個參數是是否關閉
IOUtils.copyBytes(fileInputStream, fsDataOutputStream, 1024, true);
}
private static void downloadFile(FileSystem fileSystem) throws Exception {
//擷取hdfs分布式檔案系統中的輸入流
Path remoteDownPath1 = new Path("/demo/hello/hello2.txt");
FSDataInputStream fsDataInputStream = fileSystem.open(remoteDownPath1);
//擷取本地檔案輸出流
FileOutputStream foFileOutputStream = new FileOutputStream("D:/Temp/hello3.txt");
//下載下傳檔案
IOUtils.copyBytes(fsDataInputStream, foFileOutputStream, 1024, true);
}
}
檢視效果: