Hadoop分布式檔案系統HDFS
HDFS前言
設計思想
分而治之:将大檔案、大批量檔案,分布式存放在大量伺服器上,以便于采取分而治之的方式對海量資料進行運算分析
在大資料系統中作用
為各類分布式運算架構(如:mapreduce,spark,tez,……)提供資料存儲服務
重點概念:檔案切塊,副本存放,中繼資料
HDFS的概念和特性
首先,它是一個檔案系統,用于存儲檔案,通過統一的命名空間——目錄樹來定位檔案。
其次,它是分布式的,由很多伺服器聯合起來實作其功能,叢集中的伺服器有各自的角色。
重要特性如下:
- HDFS中的檔案在實體上是分塊存儲(block),塊的大小可以通過配置參數( dfs.blocksize)來規定,預設大小在hadoop2.x版本中是128M,老版本中是64M
- HDFS檔案系統會給用戶端提供一個統一的抽象目錄樹,用戶端通過路徑來通路檔案,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
目錄結構及檔案分塊資訊(中繼資料)的管理由namenode節點承擔
——namenode是HDFS叢集主節點,負責維護整個hdfs檔案系統的目錄樹,以及每一個路徑(檔案)所對應的block塊資訊(block的id,及所在的datanode伺服器)
檔案的各個block的存儲管理由datanode節點承擔
—- datanode是HDFS叢集從節點,每一個block都可以在多個datanode上存儲多個副本(副本數量也可以通過參數設定dfs.replication)
HDFS是設計成适應一次寫入,多次讀出的場景,且不支援檔案的修改
(注:适合用來做資料分析,并不适合用來做網盤應用,因為,不便修改,延遲大,網絡開銷大,成本太高)
HDFS基本操作
HDFS的shell(指令行用戶端)操作
指令行用戶端支援的指令參數
$hadoop fs -ls /
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...]
[-checksum <src> ...]
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-copyFromLocal [-f] [-p] <localsrc> ... <dst>]
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-count [-q] <path> ...]
[-cp [-f] [-p] <src> ... <dst>]
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] <path> ...]
[-expunge]
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-getfacl [-R] <path>]
[-getmerge [-nl] <src> <localdst>]
[-help [cmd ...]]
[-ls [-d] [-h] [-R] [<path> ...]]
[-mkdir [-p] <path> ...]
[-moveFromLocal <localsrc> ... <dst>]
[-moveToLocal <src> <localdst>]
[-mv <src> ... <dst>]
[-put [-f] [-p] <localsrc> ... <dst>]
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] <src> ...]
[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] <file>]
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...]
[-touchz <path> ...]
[-usage [cmd ...]]
常用指令參數介紹
-help
功能:輸出這個指令參數手冊
-ls
功能:顯示目錄資訊
示例: hadoop fs -ls hdfs://hadoop-server01:9000/
備注:這些參數中,所有的hdfs路徑都可以簡寫
hadoop fs -ls / 等同于上一條指令的效果
-mkdir
功能:在hdfs上建立目錄
示例:hadoop fs -mkdir -p /aaa/bbb/cc/dd
-moveFromLocal
功能:從本地剪切粘貼到hdfs
示例:hadoop fs - moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc/dd
-moveToLocal
功能:從hdfs剪切粘貼到本地
示例:hadoop fs - moveToLocal /aaa/bbb/cc/dd /home/hadoop/a.txt
--appendToFile
功能:追加一個檔案到已經存在的檔案末尾
示例:hadoop fs -appendToFile ./hello.txt
hdfs://hadoop-server01:9000/hello.txt
可以簡寫為:
hadoop fs -appendToFile ./hello.txt /hello.txt
-cat
功能:顯示檔案内容
示例:hadoop fs -cat /hello.txt
-tail
功能:顯示一個檔案的末尾
示例:hadoop fs -tail /weblog/access_log
-text
功能:以字元形式列印一個檔案的内容
示例:hadoop fs -text /weblog/access_log
-chgrp
-chmod
-chown
功能:linux檔案系統中的用法一樣,對檔案所屬權限
示例:
hadoop fs -chmod /hello.txt
hadoop fs -chown someuser:somegrp /hello.txt
-copyFromLocal
功能:從本地檔案系統中拷貝檔案到hdfs路徑去
示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/
-copyToLocal
功能:從hdfs拷貝到本地
示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz
-cp
功能:從hdfs的一個路徑拷貝hdfs的另一個路徑
示例: hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz
-mv
功能:在hdfs目錄中移動檔案
示例: hadoop fs -mv /aaa/jdk.tar.gz /
-get
功能:等同于copyToLocal,就是從hdfs下載下傳檔案到本地
示例:hadoop fs -get /aaa/jdk.tar.gz
-getmerge
功能:合并下載下傳多個檔案
示例:比如hdfs的目錄 /aaa/下有多個檔案:log, log,log,...
hadoop fs -getmerge /aaa/log.* ./log.sum
-put
功能:等同于copyFromLocal
示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz
-rm
功能:删除檔案或檔案夾
示例:hadoop fs -rm -r /aaa/bbb/
-rmdir
功能:删除空目錄
示例:hadoop fs -rmdir /aaa/bbb/ccc
-df
功能:統計檔案系統的可用空間資訊
示例:hadoop fs -df -h /
-count
功能:統計一個指定目錄下的檔案節點數量
示例:hadoop fs -count /aaa/
-setrep
功能:設定hdfs中檔案的副本數量
示例:hadoop fs -setrep /aaa/jdk.tar.gz
-du
功能:統計檔案夾的大小資訊
示例:hadoop fs -du -s -h /aaa/*
HDFS原理
hdfs的工作機制
工作機制的學習主要是為加深對分布式系統的了解,以及增強遇到各種問題時的分析解決能力,形成一定的叢集運維能力
很多不是真正了解hadoop技術體系的人會常常覺得HDFS可用于網盤類應用,但實際并非如此。要想将技術準确用在恰當的地方,必須對技術有深刻的了解
概述
- HDFS叢集分為兩大角色:NameNode、DataNode
- NameNode負責管理整個檔案系統的中繼資料
- DataNode 負責管理使用者的檔案資料塊
- 檔案會按照固定的大小(blocksize)切成若幹塊後分布式存儲在若幹台datanode上
- 每一個檔案塊可以有多個副本,并存放在不同的datanode上
- Datanode會定期向Namenode彙報自身所儲存的檔案block資訊,而namenode則會負責保持檔案的副本數量
- HDFS的内部工作機制對用戶端保持透明,用戶端請求通路HDFS都是通過向namenode申請來進行
HDFS寫資料流程
用戶端要向HDFS寫資料,首先要跟namenode通信以确認可以寫檔案并獲得接收檔案block的datanode,然後,用戶端按順序将檔案逐個block傳遞給相應datanode,并由接收到block的datanode負責向其他datanode複制block的副本
詳細步驟圖

詳細步驟解析
- 根namenode通信請求上傳檔案,namenode檢查目标檔案是否已存在,父目錄是否存在
- namenode傳回是否可以上傳
- client請求第一個 block該傳輸到哪些datanode伺服器上
- namenode傳回3個datanode伺服器ABC
- client請求3台dn中的一台A上傳資料(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然後B調用C,将真個pipeline建立完成,逐級傳回用戶端
- client開始往A上傳第一個block(先從磁盤讀取資料放到一個本地記憶體緩存),以packet為機關,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答
- 當一個block傳輸完成之後,client再次請求namenode上傳第二個block的伺服器。
HDFS讀資料流程
用戶端将要讀取的檔案路徑發送給namenode,namenode擷取檔案的元資訊(主要是block的存放位置資訊)傳回給用戶端,用戶端根據傳回的資訊找到相應datanode逐個擷取檔案的block并在用戶端本地進行資料追加合并進而獲得整個檔案
詳細步驟圖
詳細步驟解析
- 跟namenode通信查詢中繼資料,找到檔案塊所在的datanode伺服器
- 挑選一台datanode(就近原則,然後随機)伺服器,請求建立socket流
- datanode開始發送資料(從磁盤裡面讀取資料放入流,以packet為機關來做校驗)
- 用戶端以packet為機關接收,現在本地緩存,然後寫入目标檔案
NAMENODE工作機制
學習目标:
了解namenode的工作機制尤其是中繼資料管理機制,以增強對HDFS工作原理的了解,及培養hadoop叢集營運中“性能調優”、“namenode”故障問題的分析解決能力
問題場景:
1. 叢集啟動後,可以檢視檔案,但是上傳檔案時報錯,打開web頁面可看到namenode正處于safemode狀态,怎麼處理?
2. Namenode伺服器的磁盤故障導緻namenode當機,如何挽救叢集及資料?
3. Namenode是否可以有多個?namenode記憶體要配置多大?namenode跟叢集資料存儲能力有關系嗎?
4. 檔案的blocksize究竟調大好還是調小好?
……
諸如此類問題的回答,都需要基于對namenode自身的工作原理的深刻了解
NameNode職責
負責用戶端請求的響應
中繼資料的管理(查詢,修改)
中繼資料管理
namenode對資料的管理采用了三種存儲形式:
1. 記憶體中繼資料(NameSystem)
2. 磁盤中繼資料鏡像檔案
3. 資料記錄檔檔案(可通過日志運算出中繼資料)
中繼資料存儲機制
- 記憶體中有一份完整的中繼資料(記憶體meta data)
- 磁盤有一個“準完整”的中繼資料鏡像(fsimage)檔案(在namenode的工作目錄中)
- 用于銜接記憶體metadata和持久化中繼資料鏡像fsimage之間的記錄檔(edits檔案)注:當用戶端對hdfs中的檔案進行新增或者修改操作,操作記錄首先被記入edits日志檔案中,當用戶端操作成功後,相應的中繼資料會更新到記憶體meta.data中
中繼資料手動檢視
可以通過hdfs的一個工具來檢視edits中的資訊
bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
中繼資料的checkpoint
每隔一段時間,會由secondary namenode将namenode上積累的所有edits和一個最新的fsimage下載下傳到本地,并加載到記憶體進行merge(這個過程稱為checkpoint)
checkpoint的詳細過程
checkpoint操作的觸發條件配置參數
dfs.namenode.checkpoint.check.period= #檢查觸發條件是否滿足的頻率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上兩個參數做checkpoint操作時,secondary namenode的本地工作目錄
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
dfs.namenode.checkpoint.max-retries= #最大重試次數
dfs.namenode.checkpoint.period= #兩次checkpoint之間的時間間隔3600秒
dfs.namenode.checkpoint.txns= #兩次checkpoint之間最大的操作記錄
checkpoint的附帶作用
namenode和secondary namenode的工作目錄存儲結構完全相同,是以,當namenode故障退出需要重新恢複時,可以從secondary namenode的工作目錄中将fsimage拷貝到namenode的工作目錄,以恢複namenode的中繼資料
DataNode的工作機制
問題場景:
1. 叢集容量不夠,怎麼擴容?
2. 如果有一些datanode當機,該怎麼辦?
3. datanode明明已啟動,但是叢集中的可用datanode清單中就是沒有,怎麼辦?
……
以上這類問題的解答,有賴于對datanode工作機制的深刻了解
概述
1. Datanode工作職責:
存儲管理使用者的檔案塊資料
定期向namenode彙報自身所持有的block資訊(通過心跳資訊上報)
(這點很重要,因為,當叢集中發生某些block副本失效時,叢集如何恢複block初始副本數量的問題)
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>3600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>
2. DataNode掉線判斷時限參數
datanode程序死亡或者網絡故障造成datanode無法與namenode通信,namenode不會立即把該節點判定為死亡,要經過一段時間,這段時間暫稱作逾時時長。HDFS預設的逾時時長為10分鐘+30秒。如果定義逾時時間為timeout,則逾時時長的計算公式為:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
而預設的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval預設為3秒。
需要注意的是hdfs-site.xml 配置檔案中的heartbeat.recheck.interval的機關為毫秒,dfs.heartbeat.interval的機關為秒。是以,舉個例子,如果heartbeat.recheck.interval設定為5000(毫秒),dfs.heartbeat.interval設定為3(秒,預設),則總的逾時時間為40秒。
<property>
<name>heartbeat.recheck.interval</name>
<value>2000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>1</value>
</property>
觀察驗證DATANODE功能
上傳一個檔案,觀察檔案的block具體的實體存放情況:
在每一台datanode機器上的這個目錄中能找到檔案的切塊:
HDFS應用開發
HDFS的java操作
hdfs在生産應用中主要是用戶端的開發,其核心步驟是從hdfs提供的api中構造一個HDFS的通路用戶端對象,然後通過該用戶端對象操作(增删改查)HDFS上的檔案
搭建開發環境
1. 引入依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
注:如需手動引入jar包,hdfs的jar包—-hadoop的安裝目錄的share下
2. window下開發的說明
建議在linux下進行hadoop應用的開發,不會存在相容性問題。如在window上做用戶端應用開發,需要設定以下環境:
A、在windows的某個目錄下解壓一個hadoop的安裝包
B、将安裝包下的lib和bin目錄用對應windows版本平台編譯的本地庫替換
C、在window系統中配置HADOOP_HOME指向你解壓的安裝包
D、在windows系統的path變量中加入hadoop的bin目錄
擷取api中的用戶端對象
在java中操作hdfs,首先要獲得一個用戶端執行個體
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(conf)
而我們的操作目标是HDFS,是以擷取到的fs對象應該是DistributedFileSystem的執行個體。
get方法是從何處判斷具體執行個體化那種用戶端類呢?
從conf中的一個參數 fs.defaultFS的配置值判斷。
如果我們的代碼中沒有指定fs.defaultFS,并且工程classpath下也沒有給定相應的配置,conf中的預設值就來自于hadoop的jar包中的core-default.xml,預設值為: file:///,則擷取的将不是一個DistributedFileSystem的執行個體,而是一個本地檔案系統的用戶端對象
HDFS用戶端操作資料代碼示例:
檔案的增删改查
public class HdfsClient {
FileSystem fs = null;
@Before
public void init() throws Exception {
// 構造一個配置參數對象,設定一個參數:我們要通路的hdfs的URI
// 進而FileSystem.get()方法就知道應該是去構造一個通路hdfs檔案系統的用戶端,以及hdfs的通路位址
// new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml
// 然後再加載classpath下的hdfs-site.xml
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
/**
* 參數優先級: 1、用戶端代碼中設定的值 2、classpath下的使用者自定義配置檔案 3、然後是伺服器的預設配置
*/
conf.set("dfs.replication", "3");
// 擷取一個hdfs的通路用戶端,根據參數,這個執行個體應該是DistributedFileSystem的執行個體
// fs = FileSystem.get(conf);
// 如果這樣去擷取,那conf裡面就可以不要配"fs.defaultFS"參數,而且,這個用戶端的身份辨別已經是hadoop使用者
fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
/**
* 往hdfs上傳檔案
*
* @throws Exception
*/
@Test
public void testAddFileToHdfs() throws Exception {
// 要上傳的檔案所在的本地路徑
Path src = new Path("g:/redis-recommend.zip");
// 要上傳到hdfs的目标路徑
Path dst = new Path("/aaa");
fs.copyFromLocalFile(src, dst);
fs.close();
}
/**
* 從hdfs中複制檔案到本地檔案系統
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
fs.close();
}
@Test
public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 建立目錄
fs.mkdirs(new Path("/a1/b1/c1"));
// 删除檔案夾 ,如果是非空檔案夾,參數2必須給值true
fs.delete(new Path("/aaa"), true);
// 重命名檔案或檔案夾
fs.rename(new Path("/a1"), new Path("/a2"));
}
/**
* 檢視目錄資訊,隻顯示檔案
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:為什麼傳回疊代器,而不是List之類的容器
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println(fileStatus.getPath().getName());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getLen());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("--------------為angelababy列印的分割線--------------");
}
}
/**
* 檢視檔案及檔案夾資訊
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- ";
for (FileStatus fstatus : listStatus) {
if (fstatus.isFile()) flag = "f-- ";
System.out.println(flag + fstatus.getPath().getName());
}
}
}
通過流的方式通路hdfs
/**
* 相對那些封裝好的方法而言的更底層一些的操作方式
* 上層那些mapreduce spark等運算架構,去hdfs中擷取資料的時候,就是調的這種底層的api
* @author
*
*/
public class StreamAccess {
FileSystem fs = null;
@Before
public void init() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
@Test
public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
//先擷取一個檔案的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
//再構造一個檔案的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
//再将輸入流中資料傳輸到輸出流
IOUtils.copyBytes(in, out, );
}
/**
* hdfs支援随機定位進行檔案讀取,而且可以友善地讀取指定長度
* 用于上層分布式運算架構并發處理資料
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testRandomAccess() throws IllegalArgumentException, IOException{
//先擷取一個檔案的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//可以将流的起始偏移量進行自定義
in.seek();
//再構造一個檔案的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,L,true);
}
/**
* 顯示hdfs上檔案的内容
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, );
}
}
場景程式設計
/**在mapreduce 、spark等運算架構中,有一個核心思想就是将運算移往資料,或者說,就是要在并發計算中盡可能讓運算本地化,這就需要擷取資料所在位置的資訊并進行相應範圍讀取
以下模拟實作:擷取一個檔案的所有block位置資訊,然後讀取指定block中的内容*/
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
//拿到檔案資訊
FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
//擷取這個檔案的所有block的資訊
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[], L, listStatus[].getLen());
//第一個block的長度
long length = fileBlockLocations[].getLength();
//第一個block的起始偏移量
long offset = fileBlockLocations[].getOffset();
System.out.println(length);
System.out.println(offset);
//擷取第一個block寫入輸出流
// IOUtils.copyBytes(in, System.out, (int)length);
byte[] b = new byte[];
FileOutputStream os = new FileOutputStream(new File("d:/block0"));
while(in.read(offset, b, , )!=-){
os.write(b);
offset += ;
if(offset>=length) return;
};
os.flush();
os.close();
in.close();
}